diff options
| author | 2022-04-28 13:23:11 +0100 | |
|---|---|---|
| committer | 2022-04-28 13:23:11 +0100 | |
| commit | 420e2fb22bc7aa4967ddadb11e444079efdf5117 (patch) | |
| tree | 413842c5df646c30a8079671ade5e677e3825fb8 /internal/federation | |
| parent | [bugfix] Fix possible race condition in federatingdb (#490) (diff) | |
| download | gotosocial-420e2fb22bc7aa4967ddadb11e444079efdf5117.tar.xz | |
replace async client API / federator msg processing with worker pools (#497)
* replace async client API / federator msg processing with worker pools
* appease our lord-and-saviour, the linter
Diffstat (limited to 'internal/federation')
| -rw-r--r-- | internal/federation/dereferencing/dereferencer_test.go | 5 | ||||
| -rw-r--r-- | internal/federation/federatingdb/accept.go | 14 | ||||
| -rw-r--r-- | internal/federation/federatingdb/announce.go | 10 | ||||
| -rw-r--r-- | internal/federation/federatingdb/create.go | 46 | ||||
| -rw-r--r-- | internal/federation/federatingdb/create_test.go | 11 | ||||
| -rw-r--r-- | internal/federation/federatingdb/db.go | 6 | ||||
| -rw-r--r-- | internal/federation/federatingdb/delete.go | 14 | ||||
| -rw-r--r-- | internal/federation/federatingdb/federatingdb_test.go | 21 | ||||
| -rw-r--r-- | internal/federation/federatingdb/reject.go | 4 | ||||
| -rw-r--r-- | internal/federation/federatingdb/reject_test.go | 6 | ||||
| -rw-r--r-- | internal/federation/federatingdb/undo.go | 6 | ||||
| -rw-r--r-- | internal/federation/federatingdb/update.go | 10 | ||||
| -rw-r--r-- | internal/federation/federatingdb/util.go | 12 | ||||
| -rw-r--r-- | internal/federation/federator_test.go | 14 | 
14 files changed, 93 insertions, 86 deletions
| diff --git a/internal/federation/dereferencing/dereferencer_test.go b/internal/federation/dereferencing/dereferencer_test.go index cabb3d6a8..441019866 100644 --- a/internal/federation/dereferencing/dereferencer_test.go +++ b/internal/federation/dereferencing/dereferencer_test.go @@ -32,7 +32,9 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/transport" +	"github.com/superseriousbusiness/gotosocial/internal/worker"  	"github.com/superseriousbusiness/gotosocial/testrig"  ) @@ -148,6 +150,7 @@ func (suite *DereferencerStandardTestSuite) mockTransportController() transport.  		return response, nil  	} +	fedWorker := worker.New[messages.FromFederator](-1, -1)  	mockClient := testrig.NewMockHTTPClient(do) -	return testrig.NewTestTransportController(mockClient, suite.db) +	return testrig.NewTestTransportController(mockClient, suite.db, fedWorker)  } diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go index 25dd2bce0..f22db38a5 100644 --- a/internal/federation/federatingdb/accept.go +++ b/internal/federation/federatingdb/accept.go @@ -48,9 +48,9 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA  		l.Debug("entering Accept")  	} -	receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) -	if receivingAccount == nil || fromFederatorChan == nil { -		// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass +	receivingAccount, _ := extractFromCtx(ctx) +	if receivingAccount == nil { +		// If the receiving account  wasn't set on the context, that means this request didn't pass  		// through the API, but came from inside GtS as the result of another activity on this instance. That being so,  		// we can safely just ignore this activity, since we know we've already processed it elsewhere.  		return nil @@ -82,12 +82,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA  					return err  				} -				fromFederatorChan <- messages.FromFederator{ +				f.fedWorker.Queue(messages.FromFederator{  					APObjectType:     ap.ActivityFollow,  					APActivityType:   ap.ActivityAccept,  					GTSModel:         follow,  					ReceivingAccount: receivingAccount, -				} +				})  				return nil  			} @@ -117,12 +117,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA  				return err  			} -			fromFederatorChan <- messages.FromFederator{ +			f.fedWorker.Queue(messages.FromFederator{  				APObjectType:     ap.ActivityFollow,  				APActivityType:   ap.ActivityAccept,  				GTSModel:         follow,  				ReceivingAccount: receivingAccount, -			} +			})  			return nil  		} diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go index 61a85bbc6..b70fa1913 100644 --- a/internal/federation/federatingdb/announce.go +++ b/internal/federation/federatingdb/announce.go @@ -44,9 +44,9 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre  		l.Debug("entering Announce")  	} -	receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) -	if receivingAccount == nil || fromFederatorChan == nil { -		// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass +	receivingAccount, _ := extractFromCtx(ctx) +	if receivingAccount == nil { +		// If the receiving account wasn't set on the context, that means this request didn't pass  		// through the API, but came from inside GtS as the result of another activity on this instance. That being so,  		// we can safely just ignore this activity, since we know we've already processed it elsewhere.  		return nil @@ -63,12 +63,12 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre  	}  	// it's a new announce so pass it back to the processor async for dereferencing etc -	fromFederatorChan <- messages.FromFederator{ +	f.fedWorker.Queue(messages.FromFederator{  		APObjectType:     ap.ActivityAnnounce,  		APActivityType:   ap.ActivityCreate,  		GTSModel:         boost,  		ReceivingAccount: receivingAccount, -	} +	})  	return nil  } diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 6c86151f3..625d75603 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -61,9 +61,9 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {  		l.Debug("entering Create")  	} -	receivingAccount, requestingAccount, fromFederatorChan := extractFromCtx(ctx) -	if receivingAccount == nil || fromFederatorChan == nil { -		// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass +	receivingAccount, requestingAccount := extractFromCtx(ctx) +	if receivingAccount == nil { +		// If the receiving account wasn't set on the context, that means this request didn't pass  		// through the API, but came from inside GtS as the result of another activity on this instance. That being so,  		// we can safely just ignore this activity, since we know we've already processed it elsewhere.  		return nil @@ -72,16 +72,16 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {  	switch asType.GetTypeName() {  	case ap.ActivityBlock:  		// BLOCK SOMETHING -		return f.activityBlock(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) +		return f.activityBlock(ctx, asType, receivingAccount, requestingAccount)  	case ap.ActivityCreate:  		// CREATE SOMETHING -		return f.activityCreate(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) +		return f.activityCreate(ctx, asType, receivingAccount, requestingAccount)  	case ap.ActivityFollow:  		// FOLLOW SOMETHING -		return f.activityFollow(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) +		return f.activityFollow(ctx, asType, receivingAccount, requestingAccount)  	case ap.ActivityLike:  		// LIKE SOMETHING -		return f.activityLike(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan) +		return f.activityLike(ctx, asType, receivingAccount, requestingAccount)  	}  	return nil  } @@ -90,7 +90,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {  	BLOCK HANDLERS  */ -func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {  	blockable, ok := asType.(vocab.ActivityStreamsBlock)  	if !ok {  		return errors.New("activityBlock: could not convert type to block") @@ -111,12 +111,12 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec  		return fmt.Errorf("activityBlock: database error inserting block: %s", err)  	} -	fromFederatorChan <- messages.FromFederator{ +	f.fedWorker.Queue(messages.FromFederator{  		APObjectType:     ap.ActivityBlock,  		APActivityType:   ap.ActivityCreate,  		GTSModel:         block,  		ReceivingAccount: receiving, -	} +	})  	return nil  } @@ -124,7 +124,7 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec  	CREATE HANDLERS  */ -func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {  	create, ok := asType.(vocab.ActivityStreamsCreate)  	if !ok {  		return errors.New("activityCreate: could not convert type to create") @@ -152,7 +152,7 @@ func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, re  		switch asObjectTypeName {  		case ap.ObjectNote:  			// CREATE A NOTE -			if err := f.createNote(ctx, objectIter.GetActivityStreamsNote(), receivingAccount, requestingAccount, fromFederatorChan); err != nil { +			if err := f.createNote(ctx, objectIter.GetActivityStreamsNote(), receivingAccount, requestingAccount); err != nil {  				errs = append(errs, err.Error())  			}  		default: @@ -168,7 +168,7 @@ func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, re  }  // createNote handles a Create activity with a Note type. -func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStreamsNote, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStreamsNote, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {  	l := logrus.WithFields(logrus.Fields{  		"func":              "createNote",  		"receivingAccount":  receivingAccount.URI, @@ -206,13 +206,13 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream  			return nil  		}  		// pass the note iri into the processor and have it do the dereferencing instead of doing it here -		fromFederatorChan <- messages.FromFederator{ +		f.fedWorker.Queue(messages.FromFederator{  			APObjectType:     ap.ObjectNote,  			APActivityType:   ap.ActivityCreate,  			APIri:            id.GetIRI(),  			GTSModel:         nil,  			ReceivingAccount: receivingAccount, -		} +		})  		return nil  	} @@ -241,12 +241,12 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream  		return fmt.Errorf("createNote: database error inserting status: %s", err)  	} -	fromFederatorChan <- messages.FromFederator{ +	f.fedWorker.Queue(messages.FromFederator{  		APObjectType:     ap.ObjectNote,  		APActivityType:   ap.ActivityCreate,  		GTSModel:         status,  		ReceivingAccount: receivingAccount, -	} +	})  	return nil  } @@ -255,7 +255,7 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream  	FOLLOW HANDLERS  */ -func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {  	follow, ok := asType.(vocab.ActivityStreamsFollow)  	if !ok {  		return errors.New("activityFollow: could not convert type to follow") @@ -276,12 +276,12 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re  		return fmt.Errorf("activityFollow: database error inserting follow request: %s", err)  	} -	fromFederatorChan <- messages.FromFederator{ +	f.fedWorker.Queue(messages.FromFederator{  		APObjectType:     ap.ActivityFollow,  		APActivityType:   ap.ActivityCreate,  		GTSModel:         followRequest,  		ReceivingAccount: receivingAccount, -	} +	})  	return nil  } @@ -290,7 +290,7 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re  	LIKE HANDLERS  */ -func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error { +func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {  	like, ok := asType.(vocab.ActivityStreamsLike)  	if !ok {  		return errors.New("activityLike: could not convert type to like") @@ -311,12 +311,12 @@ func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, rece  		return fmt.Errorf("activityLike: database error inserting fave: %s", err)  	} -	fromFederatorChan <- messages.FromFederator{ +	f.fedWorker.Queue(messages.FromFederator{  		APObjectType:     ap.ActivityLike,  		APActivityType:   ap.ActivityCreate,  		GTSModel:         fave,  		ReceivingAccount: receivingAccount, -	} +	})  	return nil  } diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go index 3be19c45f..d4f277c37 100644 --- a/internal/federation/federatingdb/create_test.go +++ b/internal/federation/federatingdb/create_test.go @@ -25,7 +25,6 @@ import (  	"github.com/stretchr/testify/suite"  	"github.com/superseriousbusiness/gotosocial/internal/ap"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/messages"  )  type CreateTestSuite struct { @@ -35,9 +34,8 @@ type CreateTestSuite struct {  func (suite *CreateTestSuite) TestCreateNote() {  	receivingAccount := suite.testAccounts["local_account_1"]  	requestingAccount := suite.testAccounts["remote_account_1"] -	fromFederatorChan := make(chan messages.FromFederator, 10) -	ctx := createTestContext(receivingAccount, requestingAccount, fromFederatorChan) +	ctx := createTestContext(receivingAccount, requestingAccount)  	create := suite.testActivities["dm_for_zork"].Activity @@ -45,7 +43,7 @@ func (suite *CreateTestSuite) TestCreateNote() {  	suite.NoError(err)  	// should be a message heading to the processor now, which we can intercept here -	msg := <-fromFederatorChan +	msg := <-suite.fromFederator  	suite.Equal(ap.ObjectNote, msg.APObjectType)  	suite.Equal(ap.ActivityCreate, msg.APActivityType) @@ -65,9 +63,8 @@ func (suite *CreateTestSuite) TestCreateNote() {  func (suite *CreateTestSuite) TestCreateNoteForward() {  	receivingAccount := suite.testAccounts["local_account_1"]  	requestingAccount := suite.testAccounts["remote_account_1"] -	fromFederatorChan := make(chan messages.FromFederator, 10) -	ctx := createTestContext(receivingAccount, requestingAccount, fromFederatorChan) +	ctx := createTestContext(receivingAccount, requestingAccount)  	create := suite.testActivities["forwarded_message"].Activity @@ -75,7 +72,7 @@ func (suite *CreateTestSuite) TestCreateNoteForward() {  	suite.NoError(err)  	// should be a message heading to the processor now, which we can intercept here -	msg := <-fromFederatorChan +	msg := <-suite.fromFederator  	suite.Equal(ap.ObjectNote, msg.APObjectType)  	suite.Equal(ap.ActivityCreate, msg.APActivityType) diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index 36df2593f..60f09b909 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -25,7 +25,9 @@ import (  	"github.com/superseriousbusiness/activity/pub"  	"github.com/superseriousbusiness/activity/streams/vocab"  	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/internal/worker"  )  // DB wraps the pub.Database interface with a couple of custom functions for GoToSocial. @@ -42,14 +44,16 @@ type DB interface {  type federatingDB struct {  	locks         mutexes.MutexMap  	db            db.DB +	fedWorker     *worker.Worker[messages.FromFederator]  	typeConverter typeutils.TypeConverter  }  // New returns a DB interface using the given database and config -func New(db db.DB) DB { +func New(db db.DB, fedWorker *worker.Worker[messages.FromFederator]) DB {  	fdb := federatingDB{  		locks:         mutexes.NewMap(-1, -1), // use defaults  		db:            db, +		fedWorker:     fedWorker,  		typeConverter: typeutils.NewConverter(db),  	}  	return &fdb diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go index 7293701aa..bd0184f76 100644 --- a/internal/federation/federatingdb/delete.go +++ b/internal/federation/federatingdb/delete.go @@ -44,9 +44,9 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {  	)  	l.Debug("entering Delete") -	receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) -	if receivingAccount == nil || fromFederatorChan == nil { -		// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass +	receivingAccount, _ := extractFromCtx(ctx) +	if receivingAccount == nil { +		// If the receiving account wasn't set on the context, that means this request didn't pass  		// through the API, but came from inside GtS as the result of another activity on this instance. That being so,  		// we can safely just ignore this activity, since we know we've already processed it elsewhere.  		return nil @@ -61,24 +61,24 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {  		if err := f.db.DeleteByID(ctx, s.ID, >smodel.Status{}); err != nil {  			return fmt.Errorf("DELETE: err deleting status: %s", err)  		} -		fromFederatorChan <- messages.FromFederator{ +		f.fedWorker.Queue(messages.FromFederator{  			APObjectType:     ap.ObjectNote,  			APActivityType:   ap.ActivityDelete,  			GTSModel:         s,  			ReceivingAccount: receivingAccount, -		} +		})  	}  	a, err := f.db.GetAccountByURI(ctx, id.String())  	if err == nil {  		// it's an account  		l.Debugf("uri is for an account with id %s, passing delete message to the processor", a.ID) -		fromFederatorChan <- messages.FromFederator{ +		f.fedWorker.Queue(messages.FromFederator{  			APObjectType:     ap.ObjectProfile,  			APActivityType:   ap.ActivityDelete,  			GTSModel:         a,  			ReceivingAccount: receivingAccount, -		} +		})  	}  	return nil diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go index 3f1af7d78..d53294c1c 100644 --- a/internal/federation/federatingdb/federatingdb_test.go +++ b/internal/federation/federatingdb/federatingdb_test.go @@ -28,14 +28,17 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/internal/worker"  	"github.com/superseriousbusiness/gotosocial/testrig"  )  type FederatingDBTestSuite struct {  	suite.Suite -	db           db.DB -	tc           typeutils.TypeConverter -	federatingDB federatingdb.DB +	db            db.DB +	tc            typeutils.TypeConverter +	fedWorker     *worker.Worker[messages.FromFederator] +	fromFederator chan messages.FromFederator +	federatingDB  federatingdb.DB  	testTokens       map[string]*gtsmodel.Token  	testClients      map[string]*gtsmodel.Client @@ -62,10 +65,17 @@ func (suite *FederatingDBTestSuite) SetupSuite() {  func (suite *FederatingDBTestSuite) SetupTest() {  	testrig.InitTestLog()  	testrig.InitTestConfig() +	suite.fedWorker = worker.New[messages.FromFederator](-1, -1) +	suite.fromFederator = make(chan messages.FromFederator, 10) +	suite.fedWorker.SetProcessor(func(ctx context.Context, msg messages.FromFederator) error { +		suite.fromFederator <- msg +		return nil +	}) +	_ = suite.fedWorker.Start()  	suite.db = testrig.NewTestDB()  	suite.testActivities = testrig.NewTestActivities(suite.testAccounts)  	suite.tc = testrig.NewTestTypeConverter(suite.db) -	suite.federatingDB = testrig.NewTestFederatingDB(suite.db) +	suite.federatingDB = testrig.NewTestFederatingDB(suite.db, suite.fedWorker)  	testrig.StandardDBSetup(suite.db, suite.testAccounts)  } @@ -73,10 +83,9 @@ func (suite *FederatingDBTestSuite) TearDownTest() {  	testrig.StandardDBTeardown(suite.db)  } -func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) context.Context { +func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) context.Context {  	ctx := context.Background()  	ctx = context.WithValue(ctx, ap.ContextReceivingAccount, receivingAccount)  	ctx = context.WithValue(ctx, ap.ContextRequestingAccount, requestingAccount) -	ctx = context.WithValue(ctx, ap.ContextFromFederatorChan, fromFederatorChan)  	return ctx  } diff --git a/internal/federation/federatingdb/reject.go b/internal/federation/federatingdb/reject.go index 8df1b853a..9cb81c267 100644 --- a/internal/federation/federatingdb/reject.go +++ b/internal/federation/federatingdb/reject.go @@ -47,8 +47,8 @@ func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsR  		l.Debug("entering Reject")  	} -	receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) -	if receivingAccount == nil || fromFederatorChan == nil { +	receivingAccount, _ := extractFromCtx(ctx) +	if receivingAccount == nil {  		// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass  		// through the API, but came from inside GtS as the result of another activity on this instance. That being so,  		// we can safely just ignore this activity, since we know we've already processed it elsewhere. diff --git a/internal/federation/federatingdb/reject_test.go b/internal/federation/federatingdb/reject_test.go index 825ff92b3..52730925d 100644 --- a/internal/federation/federatingdb/reject_test.go +++ b/internal/federation/federatingdb/reject_test.go @@ -26,7 +26,6 @@ import (  	"github.com/superseriousbusiness/activity/streams"  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/uris"  	"github.com/superseriousbusiness/gotosocial/testrig"  ) @@ -40,8 +39,7 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() {  	// remote_account_2 rejects the follow request  	followingAccount := suite.testAccounts["local_account_1"]  	followedAccount := suite.testAccounts["remote_account_2"] -	fromFederatorChan := make(chan messages.FromFederator, 10) -	ctx := createTestContext(followingAccount, followedAccount, fromFederatorChan) +	ctx := createTestContext(followingAccount, followedAccount)  	// put the follow request in the database  	fr := >smodel.FollowRequest{ @@ -84,7 +82,7 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() {  	suite.NoError(err)  	// there should be nothing in the federator channel since nothing needs to be passed -	suite.Empty(fromFederatorChan) +	suite.Empty(suite.fromFederator)  	// the follow request should not be in the database anymore -- it's been rejected  	err = suite.db.GetByID(ctx, fr.ID, >smodel.FollowRequest{}) diff --git a/internal/federation/federatingdb/undo.go b/internal/federation/federatingdb/undo.go index 94a0d74ce..92f24f315 100644 --- a/internal/federation/federatingdb/undo.go +++ b/internal/federation/federatingdb/undo.go @@ -46,9 +46,9 @@ func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo)  		l.Debug("entering Undo")  	} -	receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) -	if receivingAccount == nil || fromFederatorChan == nil { -		// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass +	receivingAccount, _ := extractFromCtx(ctx) +	if receivingAccount == nil { +		// If the receiving account wasn't set on the context, that means this request didn't pass  		// through the API, but came from inside GtS as the result of another activity on this instance. That being so,  		// we can safely just ignore this activity, since we know we've already processed it elsewhere.  		return nil diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go index 3a48eabac..7930cde12 100644 --- a/internal/federation/federatingdb/update.go +++ b/internal/federation/federatingdb/update.go @@ -57,9 +57,9 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {  		l.Debug("entering Update")  	} -	receivingAccount, _, fromFederatorChan := extractFromCtx(ctx) -	if receivingAccount == nil || fromFederatorChan == nil { -		// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass +	receivingAccount, _ := extractFromCtx(ctx) +	if receivingAccount == nil { +		// If the receiving account wasn't set on the context, that means this request didn't pass  		// through the API, but came from inside GtS as the result of another activity on this instance. That being so,  		// we can safely just ignore this activity, since we know we've already processed it elsewhere.  		return nil @@ -148,12 +148,12 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {  		}  		// pass to the processor for further processing of eg., avatar/header -		fromFederatorChan <- messages.FromFederator{ +		f.fedWorker.Queue(messages.FromFederator{  			APObjectType:     ap.ObjectProfile,  			APActivityType:   ap.ActivityUpdate,  			GTSModel:         updatedAcct,  			ReceivingAccount: receivingAccount, -		} +		})  	}  	return nil diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go index 74262fea4..5a3a65a0c 100644 --- a/internal/federation/federatingdb/util.go +++ b/internal/federation/federatingdb/util.go @@ -34,7 +34,6 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/id" -	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/uris"  ) @@ -310,7 +309,7 @@ func (f *federatingDB) collectIRIs(ctx context.Context, iris []*url.URL) (vocab.  //   - The requesting account that posted to the inbox.  //   - A channel that messages for the processor can be placed into.  // If a value is not present, nil will be returned for it. It's up to the caller to check this and respond appropriately. -func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) { +func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *gtsmodel.Account) {  	receivingAccountI := ctx.Value(ap.ContextReceivingAccount)  	if receivingAccountI != nil {  		var ok bool @@ -329,15 +328,6 @@ func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *g  		}  	} -	fromFederatorChanI := ctx.Value(ap.ContextFromFederatorChan) -	if fromFederatorChanI != nil { -		var ok bool -		fromFederatorChan, ok = fromFederatorChanI.(chan messages.FromFederator) -		if !ok { -			logrus.Panicf("extractFromCtx: context entry with key %s could not be asserted to chan messages.FromFederator", ap.ContextFromFederatorChan) -		} -	} -  	return  } diff --git a/internal/federation/federator_test.go b/internal/federation/federator_test.go index 6dac76c05..220c3a193 100644 --- a/internal/federation/federator_test.go +++ b/internal/federation/federator_test.go @@ -34,7 +34,9 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/federation"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/internal/worker"  	"github.com/superseriousbusiness/gotosocial/testrig"  ) @@ -73,12 +75,14 @@ func (suite *ProtocolTestSuite) TestPostInboxRequestBodyHook() {  	// the activity we're gonna use  	activity := suite.activities["dm_for_zork"] +	fedWorker := worker.New[messages.FromFederator](-1, -1) +  	// setup transport controller with a no-op client so we don't make external calls  	tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {  		return nil, nil -	}), suite.db) +	}), suite.db, fedWorker)  	// setup module being tested -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage)) +	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db, fedWorker), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage))  	// setup request  	ctx := context.Background() @@ -105,9 +109,11 @@ func (suite *ProtocolTestSuite) TestAuthenticatePostInbox() {  	sendingAccount := suite.accounts["remote_account_1"]  	inboxAccount := suite.accounts["local_account_1"] -	tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db) +	fedWorker := worker.New[messages.FromFederator](-1, -1) + +	tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker)  	// now setup module being tested, with the mock transport controller -	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage)) +	federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db, fedWorker), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage))  	request := httptest.NewRequest(http.MethodPost, "http://localhost:8080/users/the_mighty_zork/inbox", nil)  	// we need these headers for the request to be validated | 
