summaryrefslogtreecommitdiff
path: root/internal/federation
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2022-04-28 13:23:11 +0100
committerLibravatar GitHub <noreply@github.com>2022-04-28 13:23:11 +0100
commit420e2fb22bc7aa4967ddadb11e444079efdf5117 (patch)
tree413842c5df646c30a8079671ade5e677e3825fb8 /internal/federation
parent[bugfix] Fix possible race condition in federatingdb (#490) (diff)
downloadgotosocial-420e2fb22bc7aa4967ddadb11e444079efdf5117.tar.xz
replace async client API / federator msg processing with worker pools (#497)
* replace async client API / federator msg processing with worker pools * appease our lord-and-saviour, the linter
Diffstat (limited to 'internal/federation')
-rw-r--r--internal/federation/dereferencing/dereferencer_test.go5
-rw-r--r--internal/federation/federatingdb/accept.go14
-rw-r--r--internal/federation/federatingdb/announce.go10
-rw-r--r--internal/federation/federatingdb/create.go46
-rw-r--r--internal/federation/federatingdb/create_test.go11
-rw-r--r--internal/federation/federatingdb/db.go6
-rw-r--r--internal/federation/federatingdb/delete.go14
-rw-r--r--internal/federation/federatingdb/federatingdb_test.go21
-rw-r--r--internal/federation/federatingdb/reject.go4
-rw-r--r--internal/federation/federatingdb/reject_test.go6
-rw-r--r--internal/federation/federatingdb/undo.go6
-rw-r--r--internal/federation/federatingdb/update.go10
-rw-r--r--internal/federation/federatingdb/util.go12
-rw-r--r--internal/federation/federator_test.go14
14 files changed, 93 insertions, 86 deletions
diff --git a/internal/federation/dereferencing/dereferencer_test.go b/internal/federation/dereferencing/dereferencer_test.go
index cabb3d6a8..441019866 100644
--- a/internal/federation/dereferencing/dereferencer_test.go
+++ b/internal/federation/dereferencing/dereferencer_test.go
@@ -32,7 +32,9 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/transport"
+ "github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)
@@ -148,6 +150,7 @@ func (suite *DereferencerStandardTestSuite) mockTransportController() transport.
return response, nil
}
+ fedWorker := worker.New[messages.FromFederator](-1, -1)
mockClient := testrig.NewMockHTTPClient(do)
- return testrig.NewTestTransportController(mockClient, suite.db)
+ return testrig.NewTestTransportController(mockClient, suite.db, fedWorker)
}
diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go
index 25dd2bce0..f22db38a5 100644
--- a/internal/federation/federatingdb/accept.go
+++ b/internal/federation/federatingdb/accept.go
@@ -48,9 +48,9 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
l.Debug("entering Accept")
}
- receivingAccount, _, fromFederatorChan := extractFromCtx(ctx)
- if receivingAccount == nil || fromFederatorChan == nil {
- // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass
+ receivingAccount, _ := extractFromCtx(ctx)
+ if receivingAccount == nil {
+ // If the receiving account wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
@@ -82,12 +82,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAccount,
- }
+ })
return nil
}
@@ -117,12 +117,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityAccept,
GTSModel: follow,
ReceivingAccount: receivingAccount,
- }
+ })
return nil
}
diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go
index 61a85bbc6..b70fa1913 100644
--- a/internal/federation/federatingdb/announce.go
+++ b/internal/federation/federatingdb/announce.go
@@ -44,9 +44,9 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre
l.Debug("entering Announce")
}
- receivingAccount, _, fromFederatorChan := extractFromCtx(ctx)
- if receivingAccount == nil || fromFederatorChan == nil {
- // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass
+ receivingAccount, _ := extractFromCtx(ctx)
+ if receivingAccount == nil {
+ // If the receiving account wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
@@ -63,12 +63,12 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre
}
// it's a new announce so pass it back to the processor async for dereferencing etc
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ActivityAnnounce,
APActivityType: ap.ActivityCreate,
GTSModel: boost,
ReceivingAccount: receivingAccount,
- }
+ })
return nil
}
diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go
index 6c86151f3..625d75603 100644
--- a/internal/federation/federatingdb/create.go
+++ b/internal/federation/federatingdb/create.go
@@ -61,9 +61,9 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
l.Debug("entering Create")
}
- receivingAccount, requestingAccount, fromFederatorChan := extractFromCtx(ctx)
- if receivingAccount == nil || fromFederatorChan == nil {
- // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass
+ receivingAccount, requestingAccount := extractFromCtx(ctx)
+ if receivingAccount == nil {
+ // If the receiving account wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
@@ -72,16 +72,16 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
switch asType.GetTypeName() {
case ap.ActivityBlock:
// BLOCK SOMETHING
- return f.activityBlock(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan)
+ return f.activityBlock(ctx, asType, receivingAccount, requestingAccount)
case ap.ActivityCreate:
// CREATE SOMETHING
- return f.activityCreate(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan)
+ return f.activityCreate(ctx, asType, receivingAccount, requestingAccount)
case ap.ActivityFollow:
// FOLLOW SOMETHING
- return f.activityFollow(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan)
+ return f.activityFollow(ctx, asType, receivingAccount, requestingAccount)
case ap.ActivityLike:
// LIKE SOMETHING
- return f.activityLike(ctx, asType, receivingAccount, requestingAccount, fromFederatorChan)
+ return f.activityLike(ctx, asType, receivingAccount, requestingAccount)
}
return nil
}
@@ -90,7 +90,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
BLOCK HANDLERS
*/
-func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error {
+func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
blockable, ok := asType.(vocab.ActivityStreamsBlock)
if !ok {
return errors.New("activityBlock: could not convert type to block")
@@ -111,12 +111,12 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec
return fmt.Errorf("activityBlock: database error inserting block: %s", err)
}
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ActivityBlock,
APActivityType: ap.ActivityCreate,
GTSModel: block,
ReceivingAccount: receiving,
- }
+ })
return nil
}
@@ -124,7 +124,7 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec
CREATE HANDLERS
*/
-func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error {
+func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
create, ok := asType.(vocab.ActivityStreamsCreate)
if !ok {
return errors.New("activityCreate: could not convert type to create")
@@ -152,7 +152,7 @@ func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, re
switch asObjectTypeName {
case ap.ObjectNote:
// CREATE A NOTE
- if err := f.createNote(ctx, objectIter.GetActivityStreamsNote(), receivingAccount, requestingAccount, fromFederatorChan); err != nil {
+ if err := f.createNote(ctx, objectIter.GetActivityStreamsNote(), receivingAccount, requestingAccount); err != nil {
errs = append(errs, err.Error())
}
default:
@@ -168,7 +168,7 @@ func (f *federatingDB) activityCreate(ctx context.Context, asType vocab.Type, re
}
// createNote handles a Create activity with a Note type.
-func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStreamsNote, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error {
+func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStreamsNote, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
l := logrus.WithFields(logrus.Fields{
"func": "createNote",
"receivingAccount": receivingAccount.URI,
@@ -206,13 +206,13 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream
return nil
}
// pass the note iri into the processor and have it do the dereferencing instead of doing it here
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
APIri: id.GetIRI(),
GTSModel: nil,
ReceivingAccount: receivingAccount,
- }
+ })
return nil
}
@@ -241,12 +241,12 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream
return fmt.Errorf("createNote: database error inserting status: %s", err)
}
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
GTSModel: status,
ReceivingAccount: receivingAccount,
- }
+ })
return nil
}
@@ -255,7 +255,7 @@ func (f *federatingDB) createNote(ctx context.Context, note vocab.ActivityStream
FOLLOW HANDLERS
*/
-func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error {
+func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
follow, ok := asType.(vocab.ActivityStreamsFollow)
if !ok {
return errors.New("activityFollow: could not convert type to follow")
@@ -276,12 +276,12 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re
return fmt.Errorf("activityFollow: database error inserting follow request: %s", err)
}
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ActivityFollow,
APActivityType: ap.ActivityCreate,
GTSModel: followRequest,
ReceivingAccount: receivingAccount,
- }
+ })
return nil
}
@@ -290,7 +290,7 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re
LIKE HANDLERS
*/
-func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) error {
+func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
like, ok := asType.(vocab.ActivityStreamsLike)
if !ok {
return errors.New("activityLike: could not convert type to like")
@@ -311,12 +311,12 @@ func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, rece
return fmt.Errorf("activityLike: database error inserting fave: %s", err)
}
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ActivityLike,
APActivityType: ap.ActivityCreate,
GTSModel: fave,
ReceivingAccount: receivingAccount,
- }
+ })
return nil
}
diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go
index 3be19c45f..d4f277c37 100644
--- a/internal/federation/federatingdb/create_test.go
+++ b/internal/federation/federatingdb/create_test.go
@@ -25,7 +25,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/messages"
)
type CreateTestSuite struct {
@@ -35,9 +34,8 @@ type CreateTestSuite struct {
func (suite *CreateTestSuite) TestCreateNote() {
receivingAccount := suite.testAccounts["local_account_1"]
requestingAccount := suite.testAccounts["remote_account_1"]
- fromFederatorChan := make(chan messages.FromFederator, 10)
- ctx := createTestContext(receivingAccount, requestingAccount, fromFederatorChan)
+ ctx := createTestContext(receivingAccount, requestingAccount)
create := suite.testActivities["dm_for_zork"].Activity
@@ -45,7 +43,7 @@ func (suite *CreateTestSuite) TestCreateNote() {
suite.NoError(err)
// should be a message heading to the processor now, which we can intercept here
- msg := <-fromFederatorChan
+ msg := <-suite.fromFederator
suite.Equal(ap.ObjectNote, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
@@ -65,9 +63,8 @@ func (suite *CreateTestSuite) TestCreateNote() {
func (suite *CreateTestSuite) TestCreateNoteForward() {
receivingAccount := suite.testAccounts["local_account_1"]
requestingAccount := suite.testAccounts["remote_account_1"]
- fromFederatorChan := make(chan messages.FromFederator, 10)
- ctx := createTestContext(receivingAccount, requestingAccount, fromFederatorChan)
+ ctx := createTestContext(receivingAccount, requestingAccount)
create := suite.testActivities["forwarded_message"].Activity
@@ -75,7 +72,7 @@ func (suite *CreateTestSuite) TestCreateNoteForward() {
suite.NoError(err)
// should be a message heading to the processor now, which we can intercept here
- msg := <-fromFederatorChan
+ msg := <-suite.fromFederator
suite.Equal(ap.ObjectNote, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go
index 36df2593f..60f09b909 100644
--- a/internal/federation/federatingdb/db.go
+++ b/internal/federation/federatingdb/db.go
@@ -25,7 +25,9 @@ import (
"github.com/superseriousbusiness/activity/pub"
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/worker"
)
// DB wraps the pub.Database interface with a couple of custom functions for GoToSocial.
@@ -42,14 +44,16 @@ type DB interface {
type federatingDB struct {
locks mutexes.MutexMap
db db.DB
+ fedWorker *worker.Worker[messages.FromFederator]
typeConverter typeutils.TypeConverter
}
// New returns a DB interface using the given database and config
-func New(db db.DB) DB {
+func New(db db.DB, fedWorker *worker.Worker[messages.FromFederator]) DB {
fdb := federatingDB{
locks: mutexes.NewMap(-1, -1), // use defaults
db: db,
+ fedWorker: fedWorker,
typeConverter: typeutils.NewConverter(db),
}
return &fdb
diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go
index 7293701aa..bd0184f76 100644
--- a/internal/federation/federatingdb/delete.go
+++ b/internal/federation/federatingdb/delete.go
@@ -44,9 +44,9 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
)
l.Debug("entering Delete")
- receivingAccount, _, fromFederatorChan := extractFromCtx(ctx)
- if receivingAccount == nil || fromFederatorChan == nil {
- // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass
+ receivingAccount, _ := extractFromCtx(ctx)
+ if receivingAccount == nil {
+ // If the receiving account wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
@@ -61,24 +61,24 @@ func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
if err := f.db.DeleteByID(ctx, s.ID, &gtsmodel.Status{}); err != nil {
return fmt.Errorf("DELETE: err deleting status: %s", err)
}
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityDelete,
GTSModel: s,
ReceivingAccount: receivingAccount,
- }
+ })
}
a, err := f.db.GetAccountByURI(ctx, id.String())
if err == nil {
// it's an account
l.Debugf("uri is for an account with id %s, passing delete message to the processor", a.ID)
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityDelete,
GTSModel: a,
ReceivingAccount: receivingAccount,
- }
+ })
}
return nil
diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go
index 3f1af7d78..d53294c1c 100644
--- a/internal/federation/federatingdb/federatingdb_test.go
+++ b/internal/federation/federatingdb/federatingdb_test.go
@@ -28,14 +28,17 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)
type FederatingDBTestSuite struct {
suite.Suite
- db db.DB
- tc typeutils.TypeConverter
- federatingDB federatingdb.DB
+ db db.DB
+ tc typeutils.TypeConverter
+ fedWorker *worker.Worker[messages.FromFederator]
+ fromFederator chan messages.FromFederator
+ federatingDB federatingdb.DB
testTokens map[string]*gtsmodel.Token
testClients map[string]*gtsmodel.Client
@@ -62,10 +65,17 @@ func (suite *FederatingDBTestSuite) SetupSuite() {
func (suite *FederatingDBTestSuite) SetupTest() {
testrig.InitTestLog()
testrig.InitTestConfig()
+ suite.fedWorker = worker.New[messages.FromFederator](-1, -1)
+ suite.fromFederator = make(chan messages.FromFederator, 10)
+ suite.fedWorker.SetProcessor(func(ctx context.Context, msg messages.FromFederator) error {
+ suite.fromFederator <- msg
+ return nil
+ })
+ _ = suite.fedWorker.Start()
suite.db = testrig.NewTestDB()
suite.testActivities = testrig.NewTestActivities(suite.testAccounts)
suite.tc = testrig.NewTestTypeConverter(suite.db)
- suite.federatingDB = testrig.NewTestFederatingDB(suite.db)
+ suite.federatingDB = testrig.NewTestFederatingDB(suite.db, suite.fedWorker)
testrig.StandardDBSetup(suite.db, suite.testAccounts)
}
@@ -73,10 +83,9 @@ func (suite *FederatingDBTestSuite) TearDownTest() {
testrig.StandardDBTeardown(suite.db)
}
-func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) context.Context {
+func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) context.Context {
ctx := context.Background()
ctx = context.WithValue(ctx, ap.ContextReceivingAccount, receivingAccount)
ctx = context.WithValue(ctx, ap.ContextRequestingAccount, requestingAccount)
- ctx = context.WithValue(ctx, ap.ContextFromFederatorChan, fromFederatorChan)
return ctx
}
diff --git a/internal/federation/federatingdb/reject.go b/internal/federation/federatingdb/reject.go
index 8df1b853a..9cb81c267 100644
--- a/internal/federation/federatingdb/reject.go
+++ b/internal/federation/federatingdb/reject.go
@@ -47,8 +47,8 @@ func (f *federatingDB) Reject(ctx context.Context, reject vocab.ActivityStreamsR
l.Debug("entering Reject")
}
- receivingAccount, _, fromFederatorChan := extractFromCtx(ctx)
- if receivingAccount == nil || fromFederatorChan == nil {
+ receivingAccount, _ := extractFromCtx(ctx)
+ if receivingAccount == nil {
// If the receiving account or federator channel wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
diff --git a/internal/federation/federatingdb/reject_test.go b/internal/federation/federatingdb/reject_test.go
index 825ff92b3..52730925d 100644
--- a/internal/federation/federatingdb/reject_test.go
+++ b/internal/federation/federatingdb/reject_test.go
@@ -26,7 +26,6 @@ import (
"github.com/superseriousbusiness/activity/streams"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/uris"
"github.com/superseriousbusiness/gotosocial/testrig"
)
@@ -40,8 +39,7 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() {
// remote_account_2 rejects the follow request
followingAccount := suite.testAccounts["local_account_1"]
followedAccount := suite.testAccounts["remote_account_2"]
- fromFederatorChan := make(chan messages.FromFederator, 10)
- ctx := createTestContext(followingAccount, followedAccount, fromFederatorChan)
+ ctx := createTestContext(followingAccount, followedAccount)
// put the follow request in the database
fr := &gtsmodel.FollowRequest{
@@ -84,7 +82,7 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() {
suite.NoError(err)
// there should be nothing in the federator channel since nothing needs to be passed
- suite.Empty(fromFederatorChan)
+ suite.Empty(suite.fromFederator)
// the follow request should not be in the database anymore -- it's been rejected
err = suite.db.GetByID(ctx, fr.ID, &gtsmodel.FollowRequest{})
diff --git a/internal/federation/federatingdb/undo.go b/internal/federation/federatingdb/undo.go
index 94a0d74ce..92f24f315 100644
--- a/internal/federation/federatingdb/undo.go
+++ b/internal/federation/federatingdb/undo.go
@@ -46,9 +46,9 @@ func (f *federatingDB) Undo(ctx context.Context, undo vocab.ActivityStreamsUndo)
l.Debug("entering Undo")
}
- receivingAccount, _, fromFederatorChan := extractFromCtx(ctx)
- if receivingAccount == nil || fromFederatorChan == nil {
- // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass
+ receivingAccount, _ := extractFromCtx(ctx)
+ if receivingAccount == nil {
+ // If the receiving account wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go
index 3a48eabac..7930cde12 100644
--- a/internal/federation/federatingdb/update.go
+++ b/internal/federation/federatingdb/update.go
@@ -57,9 +57,9 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
l.Debug("entering Update")
}
- receivingAccount, _, fromFederatorChan := extractFromCtx(ctx)
- if receivingAccount == nil || fromFederatorChan == nil {
- // If the receiving account or federator channel wasn't set on the context, that means this request didn't pass
+ receivingAccount, _ := extractFromCtx(ctx)
+ if receivingAccount == nil {
+ // If the receiving account wasn't set on the context, that means this request didn't pass
// through the API, but came from inside GtS as the result of another activity on this instance. That being so,
// we can safely just ignore this activity, since we know we've already processed it elsewhere.
return nil
@@ -148,12 +148,12 @@ func (f *federatingDB) Update(ctx context.Context, asType vocab.Type) error {
}
// pass to the processor for further processing of eg., avatar/header
- fromFederatorChan <- messages.FromFederator{
+ f.fedWorker.Queue(messages.FromFederator{
APObjectType: ap.ObjectProfile,
APActivityType: ap.ActivityUpdate,
GTSModel: updatedAcct,
ReceivingAccount: receivingAccount,
- }
+ })
}
return nil
diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go
index 74262fea4..5a3a65a0c 100644
--- a/internal/federation/federatingdb/util.go
+++ b/internal/federation/federatingdb/util.go
@@ -34,7 +34,6 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
- "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/uris"
)
@@ -310,7 +309,7 @@ func (f *federatingDB) collectIRIs(ctx context.Context, iris []*url.URL) (vocab.
// - The requesting account that posted to the inbox.
// - A channel that messages for the processor can be placed into.
// If a value is not present, nil will be returned for it. It's up to the caller to check this and respond appropriately.
-func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *gtsmodel.Account, fromFederatorChan chan messages.FromFederator) {
+func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *gtsmodel.Account) {
receivingAccountI := ctx.Value(ap.ContextReceivingAccount)
if receivingAccountI != nil {
var ok bool
@@ -329,15 +328,6 @@ func extractFromCtx(ctx context.Context) (receivingAccount, requestingAccount *g
}
}
- fromFederatorChanI := ctx.Value(ap.ContextFromFederatorChan)
- if fromFederatorChanI != nil {
- var ok bool
- fromFederatorChan, ok = fromFederatorChanI.(chan messages.FromFederator)
- if !ok {
- logrus.Panicf("extractFromCtx: context entry with key %s could not be asserted to chan messages.FromFederator", ap.ContextFromFederatorChan)
- }
- }
-
return
}
diff --git a/internal/federation/federator_test.go b/internal/federation/federator_test.go
index 6dac76c05..220c3a193 100644
--- a/internal/federation/federator_test.go
+++ b/internal/federation/federator_test.go
@@ -34,7 +34,9 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/worker"
"github.com/superseriousbusiness/gotosocial/testrig"
)
@@ -73,12 +75,14 @@ func (suite *ProtocolTestSuite) TestPostInboxRequestBodyHook() {
// the activity we're gonna use
activity := suite.activities["dm_for_zork"]
+ fedWorker := worker.New[messages.FromFederator](-1, -1)
+
// setup transport controller with a no-op client so we don't make external calls
tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(func(req *http.Request) (*http.Response, error) {
return nil, nil
- }), suite.db)
+ }), suite.db, fedWorker)
// setup module being tested
- federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage))
+ federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db, fedWorker), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage))
// setup request
ctx := context.Background()
@@ -105,9 +109,11 @@ func (suite *ProtocolTestSuite) TestAuthenticatePostInbox() {
sendingAccount := suite.accounts["remote_account_1"]
inboxAccount := suite.accounts["local_account_1"]
- tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db)
+ fedWorker := worker.New[messages.FromFederator](-1, -1)
+
+ tc := testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker)
// now setup module being tested, with the mock transport controller
- federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage))
+ federator := federation.NewFederator(suite.db, testrig.NewTestFederatingDB(suite.db, fedWorker), tc, suite.typeConverter, testrig.NewTestMediaManager(suite.db, suite.storage))
request := httptest.NewRequest(http.MethodPost, "http://localhost:8080/users/the_mighty_zork/inbox", nil)
// we need these headers for the request to be validated