diff options
Diffstat (limited to 'internal/federation')
-rw-r--r-- | internal/federation/dereferencing/account.go | 8 | ||||
-rw-r--r-- | internal/federation/dereferencing/status.go | 2 | ||||
-rw-r--r-- | internal/federation/dereferencing/thread.go | 4 | ||||
-rw-r--r-- | internal/federation/federatingdb/accept.go | 26 | ||||
-rw-r--r-- | internal/federation/federatingdb/announce.go | 12 | ||||
-rw-r--r-- | internal/federation/federatingdb/announce_test.go | 8 | ||||
-rw-r--r-- | internal/federation/federatingdb/create.go | 92 | ||||
-rw-r--r-- | internal/federation/federatingdb/create_test.go | 13 | ||||
-rw-r--r-- | internal/federation/federatingdb/delete.go | 148 | ||||
-rw-r--r-- | internal/federation/federatingdb/federatingdb_test.go | 31 | ||||
-rw-r--r-- | internal/federation/federatingdb/move.go | 12 | ||||
-rw-r--r-- | internal/federation/federatingdb/move_test.go | 23 | ||||
-rw-r--r-- | internal/federation/federatingdb/reject_test.go | 3 | ||||
-rw-r--r-- | internal/federation/federatingdb/update.go | 28 |
14 files changed, 239 insertions, 171 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index e8d32f58a..94df9538a 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -104,7 +104,7 @@ func (d *Dereferencer) GetAccountByURI(ctx context.Context, requestUser string, if accountable != nil { // This account was updated, enqueue re-dereference featured posts. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { log.Errorf(ctx, "error fetching account featured collection: %v", err) } @@ -201,7 +201,7 @@ func (d *Dereferencer) GetAccountByUsernameDomain(ctx context.Context, requestUs if accountable != nil { // This account was updated, enqueue re-dereference featured posts. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { log.Errorf(ctx, "error fetching account featured collection: %v", err) } @@ -322,7 +322,7 @@ func (d *Dereferencer) RefreshAccount( if accountable != nil { // This account was updated, enqueue re-dereference featured posts. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { log.Errorf(ctx, "error fetching account featured collection: %v", err) } @@ -362,7 +362,7 @@ func (d *Dereferencer) RefreshAccountAsync( } // Enqueue a worker function to enrich this account async. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { latest, accountable, err := d.enrichAccountSafely(ctx, requestUser, uri, account, accountable) if err != nil { log.Errorf(ctx, "error enriching remote account: %v", err) diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index 769539a2f..bd50a08fd 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -255,7 +255,7 @@ func (d *Dereferencer) RefreshStatusAsync( } // Enqueue a worker function to re-fetch this status entirely async. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { latest, statusable, _, err := d.enrichStatusSafely(ctx, requestUser, uri, diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index e528581c9..ed2c1a43f 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -56,14 +56,14 @@ func (d *Dereferencer) dereferenceThread( } // Enqueue dereferencing remaining status thread, (children), asychronously . - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { log.Error(ctx, err) } }) } else { // This is an existing status, dereference the WHOLE thread asynchronously. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil { log.Error(ctx, err) } diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go index 50a7c2db1..e26e5955b 100644 --- a/internal/federation/federatingdb/accept.go +++ b/internal/federation/federatingdb/accept.go @@ -89,13 +89,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA return err } - // Process side effects asynchronously. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ActivityFollow, - APActivityType: ap.ActivityAccept, - GTSModel: follow, - ReceivingAccount: receivingAcct, - RequestingAccount: requestingAcct, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ActivityFollow, + APActivityType: ap.ActivityAccept, + GTSModel: follow, + Receiving: receivingAcct, + Requesting: requestingAcct, }) } @@ -138,13 +137,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA return err } - // Process side effects asynchronously. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ActivityFollow, - APActivityType: ap.ActivityAccept, - GTSModel: follow, - ReceivingAccount: receivingAcct, - RequestingAccount: requestingAcct, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ActivityFollow, + APActivityType: ap.ActivityAccept, + GTSModel: follow, + Receiving: receivingAcct, + Requesting: requestingAcct, }) continue diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go index 2f5950a30..3a3b91236 100644 --- a/internal/federation/federatingdb/announce.go +++ b/internal/federation/federatingdb/announce.go @@ -81,12 +81,12 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre } // This is a new boost. Process side effects asynchronously. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ActivityAnnounce, - APActivityType: ap.ActivityCreate, - GTSModel: boost, - ReceivingAccount: receivingAcct, - RequestingAccount: requestingAcct, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ActivityAnnounce, + APActivityType: ap.ActivityCreate, + GTSModel: boost, + Receiving: receivingAcct, + Requesting: requestingAcct, }) return nil diff --git a/internal/federation/federatingdb/announce_test.go b/internal/federation/federatingdb/announce_test.go index 8dd5ce9da..2833c04c4 100644 --- a/internal/federation/federatingdb/announce_test.go +++ b/internal/federation/federatingdb/announce_test.go @@ -19,6 +19,7 @@ package federatingdb_test import ( "testing" + "time" "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/activity/streams/vocab" @@ -42,7 +43,7 @@ func (suite *AnnounceTestSuite) TestNewAnnounce() { suite.NoError(err) // should be a message heading to the processor now, which we can intercept here - msg := <-suite.fromFederator + msg, _ := suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ActivityAnnounce, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) @@ -69,7 +70,7 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() { suite.NoError(err) // should be a message heading to the processor now, which we can intercept here - msg := <-suite.fromFederator + msg, _ := suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ActivityAnnounce, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) boost, ok := msg.GTSModel.(*gtsmodel.Status) @@ -94,7 +95,8 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() { // since this is a repeat announce with the same URI, just delivered to a different inbox, // we should have nothing in the messages channel... - suite.Empty(suite.fromFederator) + _, ok = suite.getFederatorMsg(time.Second) + suite.False(ok) } func TestAnnounceTestSuite(t *testing.T) { diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 94261526e..44f3cd98c 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -99,7 +99,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error { BLOCK HANDLERS */ -func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account) error { +func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requesting *gtsmodel.Account) error { blockable, ok := asType.(vocab.ActivityStreamsBlock) if !ok { return errors.New("activityBlock: could not convert type to block") @@ -110,10 +110,10 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec return fmt.Errorf("activityBlock: could not convert Block to gts model block") } - if block.AccountID != requestingAccount.ID { + if block.AccountID != requesting.ID { return fmt.Errorf( "activityBlock: requestingAccount %s is not Block actor account %s", - requestingAccount.URI, block.Account.URI, + requesting.URI, block.Account.URI, ) } @@ -130,12 +130,12 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec return fmt.Errorf("activityBlock: database error inserting block: %s", err) } - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ActivityBlock, - APActivityType: ap.ActivityCreate, - GTSModel: block, - ReceivingAccount: receiving, - RequestingAccount: requestingAccount, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ActivityBlock, + APActivityType: ap.ActivityCreate, + GTSModel: block, + Receiving: receiving, + Requesting: requesting, }) return nil @@ -297,7 +297,7 @@ func (f *federatingDB) createPollOptionables( } // Enqueue message to the fedi API worker with poll vote(s). - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ APActivityType: ap.ActivityCreate, APObjectType: ap.ActivityQuestion, GTSModel: >smodel.PollVote{ @@ -308,8 +308,8 @@ func (f *federatingDB) createPollOptionables( PollID: inReplyTo.PollID, Poll: inReplyTo.Poll, }, - ReceivingAccount: receiver, - RequestingAccount: requester, + Receiving: receiver, + Requesting: requester, }) return nil @@ -377,28 +377,28 @@ func (f *federatingDB) createStatusable( // Pass the statusable URI (APIri) into the processor // worker and do the rest of the processing asynchronously. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ObjectNote, - APActivityType: ap.ActivityCreate, - APIri: ap.GetJSONLDId(statusable), - APObjectModel: nil, - GTSModel: nil, - ReceivingAccount: receiver, - RequestingAccount: requester, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + APIRI: ap.GetJSONLDId(statusable), + APObject: nil, + GTSModel: nil, + Receiving: receiver, + Requesting: requester, }) return nil } // Do the rest of the processing asynchronously. The processor // will handle inserting/updating + further dereferencing the status. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ObjectNote, - APActivityType: ap.ActivityCreate, - APIri: nil, - GTSModel: nil, - APObjectModel: statusable, - ReceivingAccount: receiver, - RequestingAccount: requester, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + APIRI: nil, + GTSModel: nil, + APObject: statusable, + Receiving: receiver, + Requesting: requester, }) return nil @@ -439,12 +439,12 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re return fmt.Errorf("activityFollow: database error inserting follow request: %s", err) } - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ActivityFollow, - APActivityType: ap.ActivityCreate, - GTSModel: followRequest, - ReceivingAccount: receivingAccount, - RequestingAccount: requestingAccount, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ActivityFollow, + APActivityType: ap.ActivityCreate, + GTSModel: followRequest, + Receiving: receivingAccount, + Requesting: requestingAccount, }) return nil @@ -484,12 +484,12 @@ func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, rece return fmt.Errorf("activityLike: database error inserting fave: %w", err) } - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ActivityLike, - APActivityType: ap.ActivityCreate, - GTSModel: fave, - ReceivingAccount: receivingAccount, - RequestingAccount: requestingAccount, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ActivityLike, + APActivityType: ap.ActivityCreate, + GTSModel: fave, + Receiving: receivingAccount, + Requesting: requestingAccount, }) return nil @@ -536,12 +536,12 @@ func (f *federatingDB) activityFlag(ctx context.Context, asType vocab.Type, rece return fmt.Errorf("activityFlag: database error inserting report: %w", err) } - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ActivityFlag, - APActivityType: ap.ActivityCreate, - GTSModel: report, - ReceivingAccount: receivingAccount, - RequestingAccount: requestingAccount, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ActivityFlag, + APActivityType: ap.ActivityCreate, + GTSModel: report, + Receiving: receivingAccount, + Requesting: requestingAccount, }) return nil diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go index 5f80812bf..fffee1432 100644 --- a/internal/federation/federatingdb/create_test.go +++ b/internal/federation/federatingdb/create_test.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/activity/streams" @@ -48,10 +49,10 @@ func (suite *CreateTestSuite) TestCreateNote() { suite.NoError(err) // should be a message heading to the processor now, which we can intercept here - msg := <-suite.fromFederator + msg, _ := suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ObjectNote, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) - suite.Equal(note, msg.APObjectModel) + suite.Equal(note, msg.APObject) } func (suite *CreateTestSuite) TestCreateNoteForward() { @@ -79,15 +80,15 @@ func (suite *CreateTestSuite) TestCreateNoteForward() { suite.NoError(err) // should be a message heading to the processor now, which we can intercept here - msg := <-suite.fromFederator + msg, _ := suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ObjectNote, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) // nothing should be set as the model since this is a forward - suite.Nil(msg.APObjectModel) + suite.Nil(msg.APObject) // but we should have a uri set - suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIri.String()) + suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIRI.String()) } func (suite *CreateTestSuite) TestCreateFlag1() { @@ -120,7 +121,7 @@ func (suite *CreateTestSuite) TestCreateFlag1() { } // should be a message heading to the processor now, which we can intercept here - msg := <-suite.fromFederator + msg, _ := suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ActivityFlag, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go index 622ef6d3d..7e9b66c5a 100644 --- a/internal/federation/federatingdb/delete.go +++ b/internal/federation/federatingdb/delete.go @@ -19,10 +19,13 @@ package federatingdb import ( "context" + "errors" "net/url" - "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" ) @@ -34,43 +37,130 @@ import ( // // The library makes this call only after acquiring a lock first. func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"id", id}, - }...) - l.Debug("entering Delete") - activityContext := getActivityContext(ctx) if activityContext.internal { return nil // Already processed. } - requestingAcct := activityContext.requestingAcct - receivingAcct := activityContext.receivingAcct - - // in a delete we only get the URI, we can't know if we have a status or a profile or something else, - // so we have to try a few different things... - if s, err := f.state.DB.GetStatusByURI(ctx, id.String()); err == nil && requestingAcct.ID == s.AccountID { - l.Debugf("deleting status: %s", s.ID) - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ObjectNote, - APActivityType: ap.ActivityDelete, - GTSModel: s, - ReceivingAccount: receivingAcct, - RequestingAccount: requestingAcct, + // Extract receiving / requesting accounts. + requesting := activityContext.requestingAcct + receiving := activityContext.receivingAcct + + // Serialize deleted ID URI. + // (may be status OR account) + uriStr := id.String() + + var ( + ok bool + err error + ) + + // Try delete as an account URI. + ok, err = f.deleteAccount(ctx, + requesting, + receiving, + uriStr, + ) + if err != nil { + return err + } else if ok { + // success! + return nil + } + + // Try delete as a status URI. + ok, err = f.deleteStatus(ctx, + requesting, + receiving, + uriStr, + ) + if err != nil { + return err + } else if ok { + // success! + return nil + } + + // Log at debug level, as lots of these could indicate federation + // issues between remote and this instance, or help with debugging. + log.Debugf(ctx, "received delete for unknown target: %s", uriStr) + return nil +} + +func (f *federatingDB) deleteAccount( + ctx context.Context, + requesting *gtsmodel.Account, + receiving *gtsmodel.Account, + uri string, // target account +) ( + bool, // success? + error, // any error +) { + account, err := f.state.DB.GetAccountByURI(ctx, uri) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return false, gtserror.Newf("error getting account: %w", err) + } + + if account != nil { + // Ensure requesting account is + // only trying to delete itself. + if account.ID != requesting.ID { + + // TODO: handled forwarded deletes, + // for now we silently drop this. + return true, nil + } + + log.Debugf(ctx, "deleting account: %s", account.URI) + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectProfile, + APActivityType: ap.ActivityDelete, + GTSModel: account, + Receiving: receiving, + Requesting: requesting, }) + + return true, nil + } + + return false, nil +} + +func (f *federatingDB) deleteStatus( + ctx context.Context, + requesting *gtsmodel.Account, + receiving *gtsmodel.Account, + uri string, // target status +) ( + bool, // success? + error, // any error +) { + status, err := f.state.DB.GetStatusByURI(ctx, uri) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return false, gtserror.Newf("error getting status: %w", err) } - if a, err := f.state.DB.GetAccountByURI(ctx, id.String()); err == nil && requestingAcct.ID == a.ID { - l.Debugf("deleting account: %s", a.ID) - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ObjectProfile, - APActivityType: ap.ActivityDelete, - GTSModel: a, - ReceivingAccount: receivingAcct, - RequestingAccount: requestingAcct, + if status != nil { + // Ensure requesting account is only + // trying to delete its own statuses. + if status.AccountID != requesting.ID { + + // TODO: handled forwarded deletes, + // for now we silently drop this. + return true, nil + } + + log.Debugf(ctx, "deleting status: %s", status.URI) + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityDelete, + GTSModel: status, + Receiving: receiving, + Requesting: requesting, }) + + return true, nil } - return nil + return false, nil } diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go index 0f227164d..360094887 100644 --- a/internal/federation/federatingdb/federatingdb_test.go +++ b/internal/federation/federatingdb/federatingdb_test.go @@ -19,6 +19,7 @@ package federatingdb_test import ( "context" + "time" "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/db" @@ -34,11 +35,10 @@ import ( type FederatingDBTestSuite struct { suite.Suite - db db.DB - tc *typeutils.Converter - fromFederator chan messages.FromFediAPI - federatingDB federatingdb.DB - state state.State + db db.DB + tc *typeutils.Converter + federatingDB federatingdb.DB + state state.State testTokens map[string]*gtsmodel.Token testClients map[string]*gtsmodel.Client @@ -51,6 +51,13 @@ type FederatingDBTestSuite struct { testActivities map[string]testrig.ActivityWithSignature } +func (suite *FederatingDBTestSuite) getFederatorMsg(timeout time.Duration) (*messages.FromFediAPI, bool) { + ctx := context.Background() + ctx, cncl := context.WithTimeout(ctx, timeout) + defer cncl() + return suite.state.Workers.Federator.Queue.PopCtx(ctx) +} + func (suite *FederatingDBTestSuite) SetupSuite() { suite.testTokens = testrig.NewTestTokens() suite.testClients = testrig.NewTestClients() @@ -69,13 +76,6 @@ func (suite *FederatingDBTestSuite) SetupTest() { suite.state.Caches.Init() testrig.StartNoopWorkers(&suite.state) - suite.fromFederator = make(chan messages.FromFediAPI, 10) - suite.state.Workers.EnqueueFediAPI = func(ctx context.Context, msgs ...messages.FromFediAPI) { - for _, msg := range msgs { - suite.fromFederator <- msg - } - } - suite.db = testrig.NewTestDB(&suite.state) suite.testActivities = testrig.NewTestActivities(suite.testAccounts) @@ -96,13 +96,6 @@ func (suite *FederatingDBTestSuite) SetupTest() { func (suite *FederatingDBTestSuite) TearDownTest() { testrig.StandardDBTeardown(suite.db) testrig.StopWorkers(&suite.state) - for suite.fromFederator != nil { - select { - case <-suite.fromFederator: - default: - return - } - } } func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) context.Context { diff --git a/internal/federation/federatingdb/move.go b/internal/federation/federatingdb/move.go index 2e8049e08..59dc2529c 100644 --- a/internal/federation/federatingdb/move.go +++ b/internal/federation/federatingdb/move.go @@ -170,12 +170,12 @@ func (f *federatingDB) Move(ctx context.Context, move vocab.ActivityStreamsMove) // We had a Move already or stored a new Move. // Pass back to a worker for async processing. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ObjectProfile, - APActivityType: ap.ActivityMove, - GTSModel: stubMove, - RequestingAccount: requestingAcct, - ReceivingAccount: receivingAcct, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectProfile, + APActivityType: ap.ActivityMove, + GTSModel: stubMove, + Requesting: requestingAcct, + Receiving: receivingAcct, }) return nil diff --git a/internal/federation/federatingdb/move_test.go b/internal/federation/federatingdb/move_test.go index 006dcf0dc..3e35dc97a 100644 --- a/internal/federation/federatingdb/move_test.go +++ b/internal/federation/federatingdb/move_test.go @@ -27,7 +27,6 @@ import ( "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/messages" ) type MoveTestSuite struct { @@ -78,13 +77,7 @@ func (suite *MoveTestSuite) TestMove() { suite.move(receivingAcct, requestingAcct, moveStr1) // Should be a message heading to the processor. - var msg messages.FromFediAPI - select { - case msg = <-suite.fromFederator: - // Fine. - case <-time.After(5 * time.Second): - suite.FailNow("", "timeout waiting for suite.fromFederator") - } + msg, _ := suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ObjectProfile, msg.APObjectType) suite.Equal(ap.ActivityMove, msg.APActivityType) @@ -101,12 +94,7 @@ func (suite *MoveTestSuite) TestMove() { // Should be a message heading to the processor // since this is just a straight up retry. - select { - case msg = <-suite.fromFederator: - // Fine. - case <-time.After(5 * time.Second): - suite.FailNow("", "timeout waiting for suite.fromFederator") - } + msg, _ = suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ObjectProfile, msg.APObjectType) suite.Equal(ap.ActivityMove, msg.APActivityType) @@ -126,12 +114,7 @@ func (suite *MoveTestSuite) TestMove() { // Should be a message heading to the processor // since this is just a retry with a different ID. - select { - case msg = <-suite.fromFederator: - // Fine. - case <-time.After(5 * time.Second): - suite.FailNow("", "timeout waiting for suite.fromFederator") - } + msg, _ = suite.getFederatorMsg(5 * time.Second) suite.Equal(ap.ObjectProfile, msg.APObjectType) suite.Equal(ap.ActivityMove, msg.APActivityType) } diff --git a/internal/federation/federatingdb/reject_test.go b/internal/federation/federatingdb/reject_test.go index d4c537a92..f51ffaf56 100644 --- a/internal/federation/federatingdb/reject_test.go +++ b/internal/federation/federatingdb/reject_test.go @@ -81,7 +81,8 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() { suite.NoError(err) // there should be nothing in the federator channel since nothing needs to be passed - suite.Empty(suite.fromFederator) + _, ok := suite.getFederatorMsg(time.Second) + suite.False(ok) // the follow request should not be in the database anymore -- it's been rejected err = suite.db.GetByID(ctx, fr.ID, >smodel.FollowRequest{}) diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go index 733abba0d..2f00e0867 100644 --- a/internal/federation/federatingdb/update.go +++ b/internal/federation/federatingdb/update.go @@ -98,13 +98,13 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts // was delivered along with the Update, for further asynchronous // updating of eg., avatar/header, emojis, etc. The actual db // inserts/updates will take place there. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ObjectProfile, - APActivityType: ap.ActivityUpdate, - GTSModel: requestingAcct, - APObjectModel: accountable, - ReceivingAccount: receivingAcct, - RequestingAccount: requestingAcct, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectProfile, + APActivityType: ap.ActivityUpdate, + GTSModel: requestingAcct, + APObject: accountable, + Receiving: receivingAcct, + Requesting: requestingAcct, }) return nil @@ -155,13 +155,13 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm // Queue an UPDATE NOTE activity to our fedi API worker, // this will handle necessary database insertions, etc. - f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ - APObjectType: ap.ObjectNote, - APActivityType: ap.ActivityUpdate, - GTSModel: status, // original status - APObjectModel: (ap.Statusable)(statusable), - ReceivingAccount: receivingAcct, - RequestingAccount: requestingAcct, + f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityUpdate, + GTSModel: status, // original status + APObject: (ap.Statusable)(statusable), + Receiving: receivingAcct, + Requesting: requestingAcct, }) return nil |