diff options
Diffstat (limited to 'internal/federation')
| -rw-r--r-- | internal/federation/dereferencing/account.go | 8 | ||||
| -rw-r--r-- | internal/federation/dereferencing/status.go | 2 | ||||
| -rw-r--r-- | internal/federation/dereferencing/thread.go | 4 | ||||
| -rw-r--r-- | internal/federation/federatingdb/accept.go | 26 | ||||
| -rw-r--r-- | internal/federation/federatingdb/announce.go | 12 | ||||
| -rw-r--r-- | internal/federation/federatingdb/announce_test.go | 8 | ||||
| -rw-r--r-- | internal/federation/federatingdb/create.go | 92 | ||||
| -rw-r--r-- | internal/federation/federatingdb/create_test.go | 13 | ||||
| -rw-r--r-- | internal/federation/federatingdb/delete.go | 148 | ||||
| -rw-r--r-- | internal/federation/federatingdb/federatingdb_test.go | 31 | ||||
| -rw-r--r-- | internal/federation/federatingdb/move.go | 12 | ||||
| -rw-r--r-- | internal/federation/federatingdb/move_test.go | 23 | ||||
| -rw-r--r-- | internal/federation/federatingdb/reject_test.go | 3 | ||||
| -rw-r--r-- | internal/federation/federatingdb/update.go | 28 | 
14 files changed, 239 insertions, 171 deletions
| diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index e8d32f58a..94df9538a 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -104,7 +104,7 @@ func (d *Dereferencer) GetAccountByURI(ctx context.Context, requestUser string,  	if accountable != nil {  		// This account was updated, enqueue re-dereference featured posts. -		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {  			if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {  				log.Errorf(ctx, "error fetching account featured collection: %v", err)  			} @@ -201,7 +201,7 @@ func (d *Dereferencer) GetAccountByUsernameDomain(ctx context.Context, requestUs  	if accountable != nil {  		// This account was updated, enqueue re-dereference featured posts. -		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {  			if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {  				log.Errorf(ctx, "error fetching account featured collection: %v", err)  			} @@ -322,7 +322,7 @@ func (d *Dereferencer) RefreshAccount(  	if accountable != nil {  		// This account was updated, enqueue re-dereference featured posts. -		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {  			if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {  				log.Errorf(ctx, "error fetching account featured collection: %v", err)  			} @@ -362,7 +362,7 @@ func (d *Dereferencer) RefreshAccountAsync(  	}  	// Enqueue a worker function to enrich this account async. -	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +	d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {  		latest, accountable, err := d.enrichAccountSafely(ctx, requestUser, uri, account, accountable)  		if err != nil {  			log.Errorf(ctx, "error enriching remote account: %v", err) diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index 769539a2f..bd50a08fd 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -255,7 +255,7 @@ func (d *Dereferencer) RefreshStatusAsync(  	}  	// Enqueue a worker function to re-fetch this status entirely async. -	d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +	d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {  		latest, statusable, _, err := d.enrichStatusSafely(ctx,  			requestUser,  			uri, diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index e528581c9..ed2c1a43f 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -56,14 +56,14 @@ func (d *Dereferencer) dereferenceThread(  		}  		// Enqueue dereferencing remaining status thread, (children), asychronously . -		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {  			if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {  				log.Error(ctx, err)  			}  		})  	} else {  		// This is an existing status, dereference the WHOLE thread asynchronously. -		d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {  			if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil {  				log.Error(ctx, err)  			} diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go index 50a7c2db1..e26e5955b 100644 --- a/internal/federation/federatingdb/accept.go +++ b/internal/federation/federatingdb/accept.go @@ -89,13 +89,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA  					return err  				} -				// Process side effects asynchronously. -				f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -					APObjectType:      ap.ActivityFollow, -					APActivityType:    ap.ActivityAccept, -					GTSModel:          follow, -					ReceivingAccount:  receivingAcct, -					RequestingAccount: requestingAcct, +				f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +					APObjectType:   ap.ActivityFollow, +					APActivityType: ap.ActivityAccept, +					GTSModel:       follow, +					Receiving:      receivingAcct, +					Requesting:     requestingAcct,  				})  			} @@ -138,13 +137,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA  				return err  			} -			// Process side effects asynchronously. -			f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -				APObjectType:      ap.ActivityFollow, -				APActivityType:    ap.ActivityAccept, -				GTSModel:          follow, -				ReceivingAccount:  receivingAcct, -				RequestingAccount: requestingAcct, +			f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +				APObjectType:   ap.ActivityFollow, +				APActivityType: ap.ActivityAccept, +				GTSModel:       follow, +				Receiving:      receivingAcct, +				Requesting:     requestingAcct,  			})  			continue diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go index 2f5950a30..3a3b91236 100644 --- a/internal/federation/federatingdb/announce.go +++ b/internal/federation/federatingdb/announce.go @@ -81,12 +81,12 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre  	}  	// This is a new boost. Process side effects asynchronously. -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ActivityAnnounce, -		APActivityType:    ap.ActivityCreate, -		GTSModel:          boost, -		ReceivingAccount:  receivingAcct, -		RequestingAccount: requestingAcct, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ActivityAnnounce, +		APActivityType: ap.ActivityCreate, +		GTSModel:       boost, +		Receiving:      receivingAcct, +		Requesting:     requestingAcct,  	})  	return nil diff --git a/internal/federation/federatingdb/announce_test.go b/internal/federation/federatingdb/announce_test.go index 8dd5ce9da..2833c04c4 100644 --- a/internal/federation/federatingdb/announce_test.go +++ b/internal/federation/federatingdb/announce_test.go @@ -19,6 +19,7 @@ package federatingdb_test  import (  	"testing" +	"time"  	"github.com/stretchr/testify/suite"  	"github.com/superseriousbusiness/activity/streams/vocab" @@ -42,7 +43,7 @@ func (suite *AnnounceTestSuite) TestNewAnnounce() {  	suite.NoError(err)  	// should be a message heading to the processor now, which we can intercept here -	msg := <-suite.fromFederator +	msg, _ := suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ActivityAnnounce, msg.APObjectType)  	suite.Equal(ap.ActivityCreate, msg.APActivityType) @@ -69,7 +70,7 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() {  	suite.NoError(err)  	// should be a message heading to the processor now, which we can intercept here -	msg := <-suite.fromFederator +	msg, _ := suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ActivityAnnounce, msg.APObjectType)  	suite.Equal(ap.ActivityCreate, msg.APActivityType)  	boost, ok := msg.GTSModel.(*gtsmodel.Status) @@ -94,7 +95,8 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() {  	// since this is a repeat announce with the same URI, just delivered to a different inbox,  	// we should have nothing in the messages channel... -	suite.Empty(suite.fromFederator) +	_, ok = suite.getFederatorMsg(time.Second) +	suite.False(ok)  }  func TestAnnounceTestSuite(t *testing.T) { diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 94261526e..44f3cd98c 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -99,7 +99,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {  	BLOCK HANDLERS  */ -func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account) error { +func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requesting *gtsmodel.Account) error {  	blockable, ok := asType.(vocab.ActivityStreamsBlock)  	if !ok {  		return errors.New("activityBlock: could not convert type to block") @@ -110,10 +110,10 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec  		return fmt.Errorf("activityBlock: could not convert Block to gts model block")  	} -	if block.AccountID != requestingAccount.ID { +	if block.AccountID != requesting.ID {  		return fmt.Errorf(  			"activityBlock: requestingAccount %s is not Block actor account %s", -			requestingAccount.URI, block.Account.URI, +			requesting.URI, block.Account.URI,  		)  	} @@ -130,12 +130,12 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec  		return fmt.Errorf("activityBlock: database error inserting block: %s", err)  	} -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ActivityBlock, -		APActivityType:    ap.ActivityCreate, -		GTSModel:          block, -		ReceivingAccount:  receiving, -		RequestingAccount: requestingAccount, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ActivityBlock, +		APActivityType: ap.ActivityCreate, +		GTSModel:       block, +		Receiving:      receiving, +		Requesting:     requesting,  	})  	return nil @@ -297,7 +297,7 @@ func (f *federatingDB) createPollOptionables(  	}  	// Enqueue message to the fedi API worker with poll vote(s). -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{  		APActivityType: ap.ActivityCreate,  		APObjectType:   ap.ActivityQuestion,  		GTSModel: >smodel.PollVote{ @@ -308,8 +308,8 @@ func (f *federatingDB) createPollOptionables(  			PollID:    inReplyTo.PollID,  			Poll:      inReplyTo.Poll,  		}, -		ReceivingAccount:  receiver, -		RequestingAccount: requester, +		Receiving:  receiver, +		Requesting: requester,  	})  	return nil @@ -377,28 +377,28 @@ func (f *federatingDB) createStatusable(  		// Pass the statusable URI (APIri) into the processor  		// worker and do the rest of the processing asynchronously. -		f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -			APObjectType:      ap.ObjectNote, -			APActivityType:    ap.ActivityCreate, -			APIri:             ap.GetJSONLDId(statusable), -			APObjectModel:     nil, -			GTSModel:          nil, -			ReceivingAccount:  receiver, -			RequestingAccount: requester, +		f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			APIRI:          ap.GetJSONLDId(statusable), +			APObject:       nil, +			GTSModel:       nil, +			Receiving:      receiver, +			Requesting:     requester,  		})  		return nil  	}  	// Do the rest of the processing asynchronously. The processor  	// will handle inserting/updating + further dereferencing the status. -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ObjectNote, -		APActivityType:    ap.ActivityCreate, -		APIri:             nil, -		GTSModel:          nil, -		APObjectModel:     statusable, -		ReceivingAccount:  receiver, -		RequestingAccount: requester, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ObjectNote, +		APActivityType: ap.ActivityCreate, +		APIRI:          nil, +		GTSModel:       nil, +		APObject:       statusable, +		Receiving:      receiver, +		Requesting:     requester,  	})  	return nil @@ -439,12 +439,12 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re  		return fmt.Errorf("activityFollow: database error inserting follow request: %s", err)  	} -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ActivityFollow, -		APActivityType:    ap.ActivityCreate, -		GTSModel:          followRequest, -		ReceivingAccount:  receivingAccount, -		RequestingAccount: requestingAccount, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ActivityFollow, +		APActivityType: ap.ActivityCreate, +		GTSModel:       followRequest, +		Receiving:      receivingAccount, +		Requesting:     requestingAccount,  	})  	return nil @@ -484,12 +484,12 @@ func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, rece  		return fmt.Errorf("activityLike: database error inserting fave: %w", err)  	} -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ActivityLike, -		APActivityType:    ap.ActivityCreate, -		GTSModel:          fave, -		ReceivingAccount:  receivingAccount, -		RequestingAccount: requestingAccount, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ActivityLike, +		APActivityType: ap.ActivityCreate, +		GTSModel:       fave, +		Receiving:      receivingAccount, +		Requesting:     requestingAccount,  	})  	return nil @@ -536,12 +536,12 @@ func (f *federatingDB) activityFlag(ctx context.Context, asType vocab.Type, rece  		return fmt.Errorf("activityFlag: database error inserting report: %w", err)  	} -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ActivityFlag, -		APActivityType:    ap.ActivityCreate, -		GTSModel:          report, -		ReceivingAccount:  receivingAccount, -		RequestingAccount: requestingAccount, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ActivityFlag, +		APActivityType: ap.ActivityCreate, +		GTSModel:       report, +		Receiving:      receivingAccount, +		Requesting:     requestingAccount,  	})  	return nil diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go index 5f80812bf..fffee1432 100644 --- a/internal/federation/federatingdb/create_test.go +++ b/internal/federation/federatingdb/create_test.go @@ -21,6 +21,7 @@ import (  	"context"  	"encoding/json"  	"testing" +	"time"  	"github.com/stretchr/testify/suite"  	"github.com/superseriousbusiness/activity/streams" @@ -48,10 +49,10 @@ func (suite *CreateTestSuite) TestCreateNote() {  	suite.NoError(err)  	// should be a message heading to the processor now, which we can intercept here -	msg := <-suite.fromFederator +	msg, _ := suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ObjectNote, msg.APObjectType)  	suite.Equal(ap.ActivityCreate, msg.APActivityType) -	suite.Equal(note, msg.APObjectModel) +	suite.Equal(note, msg.APObject)  }  func (suite *CreateTestSuite) TestCreateNoteForward() { @@ -79,15 +80,15 @@ func (suite *CreateTestSuite) TestCreateNoteForward() {  	suite.NoError(err)  	// should be a message heading to the processor now, which we can intercept here -	msg := <-suite.fromFederator +	msg, _ := suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ObjectNote, msg.APObjectType)  	suite.Equal(ap.ActivityCreate, msg.APActivityType)  	// nothing should be set as the model since this is a forward -	suite.Nil(msg.APObjectModel) +	suite.Nil(msg.APObject)  	// but we should have a uri set -	suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIri.String()) +	suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIRI.String())  }  func (suite *CreateTestSuite) TestCreateFlag1() { @@ -120,7 +121,7 @@ func (suite *CreateTestSuite) TestCreateFlag1() {  	}  	// should be a message heading to the processor now, which we can intercept here -	msg := <-suite.fromFederator +	msg, _ := suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ActivityFlag, msg.APObjectType)  	suite.Equal(ap.ActivityCreate, msg.APActivityType) diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go index 622ef6d3d..7e9b66c5a 100644 --- a/internal/federation/federatingdb/delete.go +++ b/internal/federation/federatingdb/delete.go @@ -19,10 +19,13 @@ package federatingdb  import (  	"context" +	"errors"  	"net/url" -	"codeberg.org/gruf/go-kv"  	"github.com/superseriousbusiness/gotosocial/internal/ap" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/log"  	"github.com/superseriousbusiness/gotosocial/internal/messages"  ) @@ -34,43 +37,130 @@ import (  //  // The library makes this call only after acquiring a lock first.  func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error { -	l := log.WithContext(ctx). -		WithFields(kv.Fields{ -			{"id", id}, -		}...) -	l.Debug("entering Delete") -  	activityContext := getActivityContext(ctx)  	if activityContext.internal {  		return nil // Already processed.  	} -	requestingAcct := activityContext.requestingAcct -	receivingAcct := activityContext.receivingAcct - -	// in a delete we only get the URI, we can't know if we have a status or a profile or something else, -	// so we have to try a few different things... -	if s, err := f.state.DB.GetStatusByURI(ctx, id.String()); err == nil && requestingAcct.ID == s.AccountID { -		l.Debugf("deleting status: %s", s.ID) -		f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -			APObjectType:      ap.ObjectNote, -			APActivityType:    ap.ActivityDelete, -			GTSModel:          s, -			ReceivingAccount:  receivingAcct, -			RequestingAccount: requestingAcct, +	// Extract receiving / requesting accounts. +	requesting := activityContext.requestingAcct +	receiving := activityContext.receivingAcct + +	// Serialize deleted ID URI. +	// (may be status OR account) +	uriStr := id.String() + +	var ( +		ok  bool +		err error +	) + +	// Try delete as an account URI. +	ok, err = f.deleteAccount(ctx, +		requesting, +		receiving, +		uriStr, +	) +	if err != nil { +		return err +	} else if ok { +		// success! +		return nil +	} + +	// Try delete as a status URI. +	ok, err = f.deleteStatus(ctx, +		requesting, +		receiving, +		uriStr, +	) +	if err != nil { +		return err +	} else if ok { +		// success! +		return nil +	} + +	// Log at debug level, as lots of these could indicate federation +	// issues between remote and this instance, or help with debugging. +	log.Debugf(ctx, "received delete for unknown target: %s", uriStr) +	return nil +} + +func (f *federatingDB) deleteAccount( +	ctx context.Context, +	requesting *gtsmodel.Account, +	receiving *gtsmodel.Account, +	uri string, // target account +) ( +	bool, // success? +	error, // any error +) { +	account, err := f.state.DB.GetAccountByURI(ctx, uri) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		return false, gtserror.Newf("error getting account: %w", err) +	} + +	if account != nil { +		// Ensure requesting account is +		// only trying to delete itself. +		if account.ID != requesting.ID { + +			// TODO: handled forwarded deletes, +			// for now we silently drop this. +			return true, nil +		} + +		log.Debugf(ctx, "deleting account: %s", account.URI) +		f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +			APObjectType:   ap.ObjectProfile, +			APActivityType: ap.ActivityDelete, +			GTSModel:       account, +			Receiving:      receiving, +			Requesting:     requesting,  		}) + +		return true, nil +	} + +	return false, nil +} + +func (f *federatingDB) deleteStatus( +	ctx context.Context, +	requesting *gtsmodel.Account, +	receiving *gtsmodel.Account, +	uri string, // target status +) ( +	bool, // success? +	error, // any error +) { +	status, err := f.state.DB.GetStatusByURI(ctx, uri) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		return false, gtserror.Newf("error getting status: %w", err)  	} -	if a, err := f.state.DB.GetAccountByURI(ctx, id.String()); err == nil && requestingAcct.ID == a.ID { -		l.Debugf("deleting account: %s", a.ID) -		f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -			APObjectType:      ap.ObjectProfile, -			APActivityType:    ap.ActivityDelete, -			GTSModel:          a, -			ReceivingAccount:  receivingAcct, -			RequestingAccount: requestingAcct, +	if status != nil { +		// Ensure requesting account is only +		// trying to delete its own statuses. +		if status.AccountID != requesting.ID { + +			// TODO: handled forwarded deletes, +			// for now we silently drop this. +			return true, nil +		} + +		log.Debugf(ctx, "deleting status: %s", status.URI) +		f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityDelete, +			GTSModel:       status, +			Receiving:      receiving, +			Requesting:     requesting,  		}) + +		return true, nil  	} -	return nil +	return false, nil  } diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go index 0f227164d..360094887 100644 --- a/internal/federation/federatingdb/federatingdb_test.go +++ b/internal/federation/federatingdb/federatingdb_test.go @@ -19,6 +19,7 @@ package federatingdb_test  import (  	"context" +	"time"  	"github.com/stretchr/testify/suite"  	"github.com/superseriousbusiness/gotosocial/internal/db" @@ -34,11 +35,10 @@ import (  type FederatingDBTestSuite struct {  	suite.Suite -	db            db.DB -	tc            *typeutils.Converter -	fromFederator chan messages.FromFediAPI -	federatingDB  federatingdb.DB -	state         state.State +	db           db.DB +	tc           *typeutils.Converter +	federatingDB federatingdb.DB +	state        state.State  	testTokens       map[string]*gtsmodel.Token  	testClients      map[string]*gtsmodel.Client @@ -51,6 +51,13 @@ type FederatingDBTestSuite struct {  	testActivities   map[string]testrig.ActivityWithSignature  } +func (suite *FederatingDBTestSuite) getFederatorMsg(timeout time.Duration) (*messages.FromFediAPI, bool) { +	ctx := context.Background() +	ctx, cncl := context.WithTimeout(ctx, timeout) +	defer cncl() +	return suite.state.Workers.Federator.Queue.PopCtx(ctx) +} +  func (suite *FederatingDBTestSuite) SetupSuite() {  	suite.testTokens = testrig.NewTestTokens()  	suite.testClients = testrig.NewTestClients() @@ -69,13 +76,6 @@ func (suite *FederatingDBTestSuite) SetupTest() {  	suite.state.Caches.Init()  	testrig.StartNoopWorkers(&suite.state) -	suite.fromFederator = make(chan messages.FromFediAPI, 10) -	suite.state.Workers.EnqueueFediAPI = func(ctx context.Context, msgs ...messages.FromFediAPI) { -		for _, msg := range msgs { -			suite.fromFederator <- msg -		} -	} -  	suite.db = testrig.NewTestDB(&suite.state)  	suite.testActivities = testrig.NewTestActivities(suite.testAccounts) @@ -96,13 +96,6 @@ func (suite *FederatingDBTestSuite) SetupTest() {  func (suite *FederatingDBTestSuite) TearDownTest() {  	testrig.StandardDBTeardown(suite.db)  	testrig.StopWorkers(&suite.state) -	for suite.fromFederator != nil { -		select { -		case <-suite.fromFederator: -		default: -			return -		} -	}  }  func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) context.Context { diff --git a/internal/federation/federatingdb/move.go b/internal/federation/federatingdb/move.go index 2e8049e08..59dc2529c 100644 --- a/internal/federation/federatingdb/move.go +++ b/internal/federation/federatingdb/move.go @@ -170,12 +170,12 @@ func (f *federatingDB) Move(ctx context.Context, move vocab.ActivityStreamsMove)  	// We had a Move already or stored a new Move.  	// Pass back to a worker for async processing. -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ObjectProfile, -		APActivityType:    ap.ActivityMove, -		GTSModel:          stubMove, -		RequestingAccount: requestingAcct, -		ReceivingAccount:  receivingAcct, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ObjectProfile, +		APActivityType: ap.ActivityMove, +		GTSModel:       stubMove, +		Requesting:     requestingAcct, +		Receiving:      receivingAcct,  	})  	return nil diff --git a/internal/federation/federatingdb/move_test.go b/internal/federation/federatingdb/move_test.go index 006dcf0dc..3e35dc97a 100644 --- a/internal/federation/federatingdb/move_test.go +++ b/internal/federation/federatingdb/move_test.go @@ -27,7 +27,6 @@ import (  	"github.com/superseriousbusiness/activity/streams/vocab"  	"github.com/superseriousbusiness/gotosocial/internal/ap"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/messages"  )  type MoveTestSuite struct { @@ -78,13 +77,7 @@ func (suite *MoveTestSuite) TestMove() {  	suite.move(receivingAcct, requestingAcct, moveStr1)  	// Should be a message heading to the processor. -	var msg messages.FromFediAPI -	select { -	case msg = <-suite.fromFederator: -		// Fine. -	case <-time.After(5 * time.Second): -		suite.FailNow("", "timeout waiting for suite.fromFederator") -	} +	msg, _ := suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ObjectProfile, msg.APObjectType)  	suite.Equal(ap.ActivityMove, msg.APActivityType) @@ -101,12 +94,7 @@ func (suite *MoveTestSuite) TestMove() {  	// Should be a message heading to the processor  	// since this is just a straight up retry. -	select { -	case msg = <-suite.fromFederator: -		// Fine. -	case <-time.After(5 * time.Second): -		suite.FailNow("", "timeout waiting for suite.fromFederator") -	} +	msg, _ = suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ObjectProfile, msg.APObjectType)  	suite.Equal(ap.ActivityMove, msg.APActivityType) @@ -126,12 +114,7 @@ func (suite *MoveTestSuite) TestMove() {  	// Should be a message heading to the processor  	// since this is just a retry with a different ID. -	select { -	case msg = <-suite.fromFederator: -		// Fine. -	case <-time.After(5 * time.Second): -		suite.FailNow("", "timeout waiting for suite.fromFederator") -	} +	msg, _ = suite.getFederatorMsg(5 * time.Second)  	suite.Equal(ap.ObjectProfile, msg.APObjectType)  	suite.Equal(ap.ActivityMove, msg.APActivityType)  } diff --git a/internal/federation/federatingdb/reject_test.go b/internal/federation/federatingdb/reject_test.go index d4c537a92..f51ffaf56 100644 --- a/internal/federation/federatingdb/reject_test.go +++ b/internal/federation/federatingdb/reject_test.go @@ -81,7 +81,8 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() {  	suite.NoError(err)  	// there should be nothing in the federator channel since nothing needs to be passed -	suite.Empty(suite.fromFederator) +	_, ok := suite.getFederatorMsg(time.Second) +	suite.False(ok)  	// the follow request should not be in the database anymore -- it's been rejected  	err = suite.db.GetByID(ctx, fr.ID, >smodel.FollowRequest{}) diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go index 733abba0d..2f00e0867 100644 --- a/internal/federation/federatingdb/update.go +++ b/internal/federation/federatingdb/update.go @@ -98,13 +98,13 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts  	// was delivered along with the Update, for further asynchronous  	// updating of eg., avatar/header, emojis, etc. The actual db  	// inserts/updates will take place there. -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ObjectProfile, -		APActivityType:    ap.ActivityUpdate, -		GTSModel:          requestingAcct, -		APObjectModel:     accountable, -		ReceivingAccount:  receivingAcct, -		RequestingAccount: requestingAcct, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ObjectProfile, +		APActivityType: ap.ActivityUpdate, +		GTSModel:       requestingAcct, +		APObject:       accountable, +		Receiving:      receivingAcct, +		Requesting:     requestingAcct,  	})  	return nil @@ -155,13 +155,13 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm  	// Queue an UPDATE NOTE activity to our fedi API worker,  	// this will handle necessary database insertions, etc. -	f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ -		APObjectType:      ap.ObjectNote, -		APActivityType:    ap.ActivityUpdate, -		GTSModel:          status, // original status -		APObjectModel:     (ap.Statusable)(statusable), -		ReceivingAccount:  receivingAcct, -		RequestingAccount: requestingAcct, +	f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{ +		APObjectType:   ap.ObjectNote, +		APActivityType: ap.ActivityUpdate, +		GTSModel:       status, // original status +		APObject:       (ap.Statusable)(statusable), +		Receiving:      receivingAcct, +		Requesting:     requestingAcct,  	})  	return nil | 
