diff options
author | 2022-04-28 13:23:11 +0100 | |
---|---|---|
committer | 2022-04-28 13:23:11 +0100 | |
commit | 420e2fb22bc7aa4967ddadb11e444079efdf5117 (patch) | |
tree | 413842c5df646c30a8079671ade5e677e3825fb8 /internal/federation | |
parent | [bugfix] Fix possible race condition in federatingdb (#490) (diff) | |
download | gotosocial-420e2fb22bc7aa4967ddadb11e444079efdf5117.tar.xz |
replace async client API / federator msg processing with worker pools (#497)
* replace async client API / federator msg processing with worker pools
* appease our lord-and-saviour, the linter
Diffstat (limited to 'internal/federation')
-rw-r--r-- | internal/federation/dereferencing/dereferencer_test.go | 5 | ||||
-rw-r--r-- | internal/federation/federatingdb/accept.go | 14 | ||||
-rw-r--r-- | internal/federation/federatingdb/announce.go | 10 | ||||
-rw-r--r-- | internal/federation/federatingdb/create.go | 46 | ||||
-rw-r--r-- | internal/federation/federatingdb/create_test.go | 11 | ||||
-rw-r--r-- | internal/federation/federatingdb/db.go | 6 | ||||
-rw-r--r-- | internal/federation/federatingdb/delete.go | 14 | ||||
-rw-r--r-- | internal/federation/federatingdb/federatingdb_test.go | 21 | ||||
-rw-r--r-- | internal/federation/federatingdb/reject.go | 4 | ||||
-rw-r--r-- | internal/federation/federatingdb/reject_test.go | 6 | ||||
-rw-r--r-- | internal/federation/federatingdb/undo.go | 6 | ||||
-rw-r--r-- | internal/federation/federatingdb/update.go | 10 | ||||
-rw-r--r-- | internal/federation/federatingdb/util.go | 12 | ||||
-rw-r--r-- | internal/federation/federator_test.go | 14 |
14 files changed, 93 insertions, 86 deletions
diff --git a/internal/federation/dereferencing/dereferencer_test.go b/internal/federation/dereferencing/dereferencer_test.go index cabb3d6a8..441019866 100644 --- a/internal/federation/dereferencing/dereferencer_test.go +++ b/internal/federation/dereferencing/dereferencer_test.go @@ -32,7 +32,9 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/transport" + "github.com/superseriousbusiness/gotosocial/internal/worker" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -148,6 +150,7 @@ func (suite *DereferencerStandardTestSuite) mockTransportController() transport. return response, nil } + fedWorker := worker.New[messages.FromFederator](-1, -1) mockClient := testrig.NewMockHTTPClient(do) - return testrig.NewTestTransportController(mockClient, suite.db) + return testrig.NewTestTransportController(mockClient, suite.db, fedWorker) } diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go index 25dd2bce0..f22db38a5 100644 --- a/internal/federation/federatingdb/accept.go +++ b/internal/federation/federatingdb/accept.go @@ -48,9 +48,9 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA l.Debug("entering Accept") } - receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) - if receivingAccount == nil || fromFederatorChan == nil { - // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass + receivingAccount, _ := extractFromCtx(ctx) + if receivingAccount == nil { + // If the receiving account wasn't set on the context, that means this request didn't pass // through the API, but came from inside GtS as the result of another activity on this instance. That being so, // we can safely just ignore this activity, since we know we've already processed it elsewhere. return nil @@ -82,12 +82,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA return err } - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityAccept, GTSModel: follow, ReceivingAccount: receivingAccount, - } + }) return nil } @@ -117,12 +117,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA return err } - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityAccept, GTSModel: follow, ReceivingAccount: receivingAccount, - } + }) return nil } diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go index 61a85bbc6..b70fa1913 100644 --- a/internal/federation/federatingdb/announce.go +++ b/internal/federation/federatingdb/announce.go @@ -44,9 +44,9 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre l.Debug("entering Announce") } - receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) - if receivingAccount == nil || fromFederatorChan == nil { - // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass + receivingAccount, _ := extractFromCtx(ctx) + if receivingAccount == nil { + // If the receiving account wasn't set on the context, that means this request didn't pass // through the API, but came from inside GtS as the result of another activity on this instance. That being so, // we can safely just ignore this activity, since we know we've already processed it elsewhere. return nil @@ -63,12 +63,12 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre } // it's a new announce so pass it back to the processor async for dereferencing etc - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ActivityAnnounce, APActivityType: ap.ActivityCreate, GTSModel: boost, ReceivingAccount: receivingAccount, - } + }) return nil } diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 6c86151f3..625d75603 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -61,9 +61,9 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error { l.Debug("entering Create") } - receivingAccount, requestingAccount, fromFederatorChan := extractFromCtx(ctx) - if receivingAccount == nil || fromFederatorChan == nil { - // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass + receivingAccount, requestingAccount := extractFromCtx(ctx) + if receivingAccount == nil { + // If the receiving account wasn't set on the context, that means this request didn't pass // through the API, but came from inside GtS as the result of another activity on this instance. That being so, // we can safely just ignore this activity, since we know we've already processed it elsewhere. return nil @@ -72,16 +72,16 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error { switch asType.GetTypeName() { case ap.ActivityBlock: // BLOCK SOMETHING - return f.activityBlock(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) + return f.activityBlock(ctx, asType, receivingAccount, requestingAccount) case ap.ActivityCreate: // CREATE SOMETHING - return f.activityCreate(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) + return f.activityCreate(ctx, asType, receivingAccount, requestingAccount) case ap.ActivityFollow: // FOLLOW SOMETHING - return f.activityFollow(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) + return f.activityFollow(ctx, asType, receivingAccount, requestingAccount) case ap.ActivityLike: // LIKE SOMETHING - return f.activityLike(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) + return f.activityLike(ctx, asType, receivingAccount, requestingAccount) } return nil } @@ -90,7 +90,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error { BLOCK HANDLERS */ -func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account) error { blockable, ok := asType.(vocab.ActivityStreamsBlock) if !ok { return errors.New("activityBlock: could not convert type to block") @@ -111,12 +111,12 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec return fmt.Errorf("activityBlock: database error inserting block: %s", err) } - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ActivityBlock, APActivityType: ap.ActivityCreate, GTSModel: block, ReceivingAccount: receiving, - } + }) return nil } @@ -124,7 +124,7 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec CREATE HANDLERS */ -func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error { create, ok := asType.(vocab.ActivityStreamsCreate) if !ok { return errors.New("activityCreate: could not convert type to create") @@ -152,7 +152,7 @@ func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, re switch asObjectTypeName { case ap.ObjectNote: // CREATE A NOTE - if err := f.createNote(ctx, objectIter.GetActivityStreamsNote(), receivingAccount, requestingAccount, fromFederatorChan); err != nil { + if err := f.createNote(ctx, objectIter.GetActivityStreamsNote(), receivingAccount, requestingAccount); err != nil { errs = append(errs, err.Error()) } default: @@ -168,7 +168,7 @@ func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, re } // createNote handles a Create activity with a Note type. -func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStreamsNote, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStreamsNote, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error { l := logrus.WithFields(logrus.Fields{ "func": "createNote", "receivingAccount": receivingAccount.URI, @@ -206,13 +206,13 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream return nil } // pass the note iri into the processor and have it do the dereferencing instead of doing it here - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, APIri: id.GetIRI(), GTSModel: nil, ReceivingAccount: receivingAccount, - } + }) return nil } @@ -241,12 +241,12 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream return fmt.Errorf("createNote: database error inserting status: %s", err) } - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, GTSModel: status, ReceivingAccount: receivingAccount, - } + }) return nil } @@ -255,7 +255,7 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream FOLLOW HANDLERS */ -func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error { follow, ok := asType.(vocab.ActivityStreamsFollow) if !ok { return errors.New("activityFollow: could not convert type to follow") @@ -276,12 +276,12 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re return fmt.Errorf("activityFollow: database error inserting follow request: %s", err) } - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityCreate, GTSModel: followRequest, ReceivingAccount: receivingAccount, - } + }) return nil } @@ -290,7 +290,7 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re LIKE HANDLERS */ -func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error { like, ok := asType.(vocab.ActivityStreamsLike) if !ok { return errors.New("activityLike: could not convert type to like") @@ -311,12 +311,12 @@ func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, rece return fmt.Errorf("activityLike: database error inserting fave: %s", err) } - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ActivityLike, APActivityType: ap.ActivityCreate, GTSModel: fave, ReceivingAccount: receivingAccount, - } + }) return nil } diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go index 3be19c45f..d4f277c37 100644 --- a/internal/federation/federatingdb/create_test.go +++ b/internal/federation/federatingdb/create_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/messages" ) type CreateTestSuite struct { @@ -35,9 +34,8 @@ type CreateTestSuite struct { func (suite *CreateTestSuite) TestCreateNote() { receivingAccount := suite.testAccounts["local_account_1"] requestingAccount := suite.testAccounts["remote_account_1"] - fromFederatorChan := make(chan messages.FromFederator, 10) - ctx := createTestContext(receivingAccount, requestingAccount, fromFederatorChan) + ctx := createTestContext(receivingAccount, requestingAccount) create := suite.testActivities["dm_for_zork"].Activity @@ -45,7 +43,7 @@ func (suite *CreateTestSuite) TestCreateNote() { suite.NoError(err) // should be a message heading to the processor now, which we can intercept here - msg := <-fromFederatorChan + msg := <-suite.fromFederator suite.Equal(ap.ObjectNote, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) @@ -65,9 +63,8 @@ func (suite *CreateTestSuite) TestCreateNote() { func (suite *CreateTestSuite) TestCreateNoteForward() { receivingAccount := suite.testAccounts["local_account_1"] requestingAccount := suite.testAccounts["remote_account_1"] - fromFederatorChan := make(chan messages.FromFederator, 10) - ctx := createTestContext(receivingAccount, requestingAccount, fromFederatorChan) + ctx := createTestContext(receivingAccount, requestingAccount) create := suite.testActivities["forwarded_message"].Activity @@ -75,7 +72,7 @@ func (suite *CreateTestSuite) TestCreateNoteForward() { suite.NoError(err) // should be a message heading to the processor now, which we can intercept here - msg := <-fromFederatorChan + msg := <-suite.fromFederator suite.Equal(ap.ObjectNote, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index 36df2593f..60f09b909 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -25,7 +25,9 @@ import ( "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/worker" ) // DB wraps the pub.Database interface with a couple of custom functions for GoToSocial. @@ -42,14 +44,16 @@ type DB interface { type federatingDB struct { locks mutexes.MutexMap db db.DB + fedWorker *worker.Worker[messages.FromFederator] typeConverter typeutils.TypeConverter } // New returns a DB interface using the given database and config -func New(db db.DB) DB { +func New(db db.DB, fedWorker *worker.Worker[messages.FromFederator]) DB { fdb := federatingDB{ locks: mutexes.NewMap(-1, -1), // use defaults db: db, + fedWorker: fedWorker, typeConverter: typeutils.NewConverter(db), } return &fdb diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go index 7293701aa..bd0184f76 100644 --- a/internal/federation/federatingdb/delete.go +++ b/internal/federation/federatingdb/delete.go @@ -44,9 +44,9 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error { ) l.Debug("entering Delete") - receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) - if receivingAccount == nil || fromFederatorChan == nil { - // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass + receivingAccount, _ := extractFromCtx(ctx) + if receivingAccount == nil { + // If the receiving account wasn't set on the context, that means this request didn't pass // through the API, but came from inside GtS as the result of another activity on this instance. That being so, // we can safely just ignore this activity, since we know we've already processed it elsewhere. return nil @@ -61,24 +61,24 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error { if err := f.db.DeleteByID(ctx, s.ID, >smodel.Status{}); err != nil { return fmt.Errorf("DELETE: err deleting status: %s", err) } - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityDelete, GTSModel: s, ReceivingAccount: receivingAccount, - } + }) } a, err := f.db.GetAccountByURI(ctx, id.String()) if err == nil { // it's an account l.Debugf("uri is for an account with id %s, passing delete message to the processor", a.ID) - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ObjectProfile, APActivityType: ap.ActivityDelete, GTSModel: a, ReceivingAccount: receivingAccount, - } + }) } return nil diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go index 3f1af7d78..d53294c1c 100644 --- a/internal/federation/federatingdb/federatingdb_test.go +++ b/internal/federation/federatingdb/federatingdb_test.go @@ -28,14 +28,17 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/worker" "github.com/superseriousbusiness/gotosocial/testrig" ) type FederatingDBTestSuite struct { suite.Suite - db db.DB - tc typeutils.TypeConverter - federatingDB federatingdb.DB + db db.DB + tc typeutils.TypeConverter + fedWorker *worker.Worker[messages.FromFederator] + fromFederator chan messages.FromFederator + federatingDB federatingdb.DB testTokens map[string]*gtsmodel.Token testClients map[string]*gtsmodel.Client @@ -62,10 +65,17 @@ func (suite *FederatingDBTestSuite) SetupSuite() { func (suite *FederatingDBTestSuite) SetupTest() { testrig.InitTestLog() testrig.InitTestConfig() + suite.fedWorker = worker.New[messages.FromFederator](-1, -1) + suite.fromFederator = make(chan messages.FromFederator, 10) + suite.fedWorker.SetProcessor(func(ctx context.Context, msg messages.FromFederator) error { + suite.fromFederator <- msg + return nil + }) + _ = suite.fedWorker.Start() suite.db = testrig.NewTestDB() suite.testActivities = testrig.NewTestActivities(suite.testAccounts) suite.tc = testrig.NewTestTypeConverter(suite.db) - suite.federatingDB = testrig.NewTestFederatingDB(suite.db) + suite.federatingDB = testrig.NewTestFederatingDB(suite.db, suite.fedWorker) testrig.StandardDBSetup(suite.db, suite.testAccounts) } @@ -73,10 +83,9 @@ func (suite *FederatingDBTestSuite) TearDownTest() { testrig.StandardDBTeardown(suite.db) } -func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) context.Context { +func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) context.Context { ctx := context.Background() ctx = context.WithValue(ctx, ap.ContextReceivingAccount, receivingAccount) ctx = context.WithValue(ctx, ap.ContextRequestingAccount, requestingAccount) - ctx = context.WithValue(ctx, ap.ContextFromFederatorChan, fromFederatorChan) return ctx } diff --git a/internal/federation/federatingdb/reject.go b/internal/federation/federatingdb/reject.go index 8df1b853a..9cb81c267 100644 --- a/internal/federation/federatingdb/reject.go +++ b/internal/federation/federatingdb/reject.go @@ -47,8 +47,8 @@ func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsR l.Debug("entering Reject") } - receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) - if receivingAccount == nil || fromFederatorChan == nil { + receivingAccount, _ := extractFromCtx(ctx) + if receivingAccount == nil { // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass // through the API, but came from inside GtS as the result of another activity on this instance. That being so, // we can safely just ignore this activity, since we know we've already processed it elsewhere. diff --git a/internal/federation/federatingdb/reject_test.go b/internal/federation/federatingdb/reject_test.go index 825ff92b3..52730925d 100644 --- a/internal/federation/federatingdb/reject_test.go +++ b/internal/federation/federatingdb/reject_test.go @@ -26,7 +26,6 @@ import ( "github.com/superseriousbusiness/activity/streams" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/uris" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -40,8 +39,7 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() { // remote_account_2 rejects the follow request followingAccount := suite.testAccounts["local_account_1"] followedAccount := suite.testAccounts["remote_account_2"] - fromFederatorChan := make(chan messages.FromFederator, 10) - ctx := createTestContext(followingAccount, followedAccount, fromFederatorChan) + ctx := createTestContext(followingAccount, followedAccount) // put the follow request in the database fr := >smodel.FollowRequest{ @@ -84,7 +82,7 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() { suite.NoError(err) // there should be nothing in the federator channel since nothing needs to be passed - suite.Empty(fromFederatorChan) + suite.Empty(suite.fromFederator) // the follow request should not be in the database anymore -- it's been rejected err = suite.db.GetByID(ctx, fr.ID, >smodel.FollowRequest{}) diff --git a/internal/federation/federatingdb/undo.go b/internal/federation/federatingdb/undo.go index 94a0d74ce..92f24f315 100644 --- a/internal/federation/federatingdb/undo.go +++ b/internal/federation/federatingdb/undo.go @@ -46,9 +46,9 @@ func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo) l.Debug("entering Undo") } - receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) - if receivingAccount == nil || fromFederatorChan == nil { - // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass + receivingAccount, _ := extractFromCtx(ctx) + if receivingAccount == nil { + // If the receiving account wasn't set on the context, that means this request didn't pass // through the API, but came from inside GtS as the result of another activity on this instance. That being so, // we can safely just ignore this activity, since we know we've already processed it elsewhere. return nil diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go index 3a48eabac..7930cde12 100644 --- a/internal/federation/federatingdb/update.go +++ b/internal/federation/federatingdb/update.go @@ -57,9 +57,9 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error { l.Debug("entering Update") } - receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) - if receivingAccount == nil || fromFederatorChan == nil { - // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass + receivingAccount, _ := extractFromCtx(ctx) + if receivingAccount == nil { + // If the receiving account wasn't set on the context, that means this request didn't pass // through the API, but came from inside GtS as the result of another activity on this instance. That being so, // we can safely just ignore this activity, since we know we've already processed it elsewhere. return nil @@ -148,12 +148,12 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error { } // pass to the processor for further processing of eg., avatar/header - fromFederatorChan <- messages.FromFederator{ + f.fedWorker.Queue(messages.FromFederator{ APObjectType: ap.ObjectProfile, APActivityType: ap.ActivityUpdate, GTSModel: updatedAcct, ReceivingAccount: receivingAccount, - } + }) } return nil diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go index 74262fea4..5a3a65a0c 100644 --- a/internal/federation/federatingdb/util.go +++ b/internal/federation/federatingdb/util.go @@ -34,7 +34,6 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" - "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/uris" ) @@ -310,7 +309,7 @@ func (f *federatingDB) collectIRIs(ctx context.Context, iris []*url.URL) (vocab. // - The requesting account that posted to the inbox. // - A channel that messages for the processor can be placed into. // If a value is not present, nil will be returned for it. It's up to the caller to check this and respond appropriately. -func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) { +func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *gtsmodel.Account) { receivingAccountI := ctx.Value(ap.ContextReceivingAccount) if receivingAccountI != nil { var ok bool @@ -329,15 +328,6 @@ func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *g } } - fromFederatorChanI := ctx.Value(ap.ContextFromFederatorChan) - if fromFederatorChanI != nil { - var ok bool - fromFederatorChan, ok = fromFederatorChanI.(chan messages.FromFederator) - if !ok { - logrus.Panicf("extractFromCtx: context entry with key %s could not be asserted to chan messages.FromFederator", ap.ContextFromFederatorChan) - } - } - return } diff --git a/internal/federation/federator_test.go b/internal/federation/federator_test.go index 6dac76c05..220c3a193 100644 --- a/internal/federation/federator_test.go +++ b/internal/federation/federator_test.go @@ -34,7 +34,9 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/federation" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/worker" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -73,12 +75,14 @@ func (suite *ProtocolTestSuite) TestPostInboxRequestBodyHook() { // the activity we're gonna use activity := suite.activities["dm_for_zork"] + fedWorker := worker.New[messages.FromFederator](-1, -1) + // setup transport controller with a no-op client so we don't make external calls tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) { return nil, nil - }), suite.db) + }), suite.db, fedWorker) // setup module being tested - federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage)) + federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db, fedWorker), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage)) // setup request ctx := context.Background() @@ -105,9 +109,11 @@ func (suite *ProtocolTestSuite) TestAuthenticatePostInbox() { sendingAccount := suite.accounts["remote_account_1"] inboxAccount := suite.accounts["local_account_1"] - tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db) + fedWorker := worker.New[messages.FromFederator](-1, -1) + + tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker) // now setup module being tested, with the mock transport controller - federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage)) + federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db, fedWorker), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage)) request := httptest.NewRequest(http.MethodPost, "http://localhost:8080/users/the_mighty_zork/inbox", nil) // we need these headers for the request to be validated |