summaryrefslogtreecommitdiff
path: root/internal/federation
diff options
context:
space:
mode:
Diffstat (limited to 'internal/federation')
-rw-r--r--internal/federation/dereferencing/account.go8
-rw-r--r--internal/federation/dereferencing/status.go2
-rw-r--r--internal/federation/dereferencing/thread.go4
-rw-r--r--internal/federation/federatingdb/accept.go26
-rw-r--r--internal/federation/federatingdb/announce.go12
-rw-r--r--internal/federation/federatingdb/announce_test.go8
-rw-r--r--internal/federation/federatingdb/create.go92
-rw-r--r--internal/federation/federatingdb/create_test.go13
-rw-r--r--internal/federation/federatingdb/delete.go148
-rw-r--r--internal/federation/federatingdb/federatingdb_test.go31
-rw-r--r--internal/federation/federatingdb/move.go12
-rw-r--r--internal/federation/federatingdb/move_test.go23
-rw-r--r--internal/federation/federatingdb/reject_test.go3
-rw-r--r--internal/federation/federatingdb/update.go28
14 files changed, 239 insertions, 171 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go
index e8d32f58a..94df9538a 100644
--- a/internal/federation/dereferencing/account.go
+++ b/internal/federation/dereferencing/account.go
@@ -104,7 +104,7 @@ func (d *Dereferencer) GetAccountByURI(ctx context.Context, requestUser string,
if accountable != nil {
// This account was updated, enqueue re-dereference featured posts.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {
log.Errorf(ctx, "error fetching account featured collection: %v", err)
}
@@ -201,7 +201,7 @@ func (d *Dereferencer) GetAccountByUsernameDomain(ctx context.Context, requestUs
if accountable != nil {
// This account was updated, enqueue re-dereference featured posts.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {
log.Errorf(ctx, "error fetching account featured collection: %v", err)
}
@@ -322,7 +322,7 @@ func (d *Dereferencer) RefreshAccount(
if accountable != nil {
// This account was updated, enqueue re-dereference featured posts.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
log.Errorf(ctx, "error fetching account featured collection: %v", err)
}
@@ -362,7 +362,7 @@ func (d *Dereferencer) RefreshAccountAsync(
}
// Enqueue a worker function to enrich this account async.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
latest, accountable, err := d.enrichAccountSafely(ctx, requestUser, uri, account, accountable)
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go
index 769539a2f..bd50a08fd 100644
--- a/internal/federation/dereferencing/status.go
+++ b/internal/federation/dereferencing/status.go
@@ -255,7 +255,7 @@ func (d *Dereferencer) RefreshStatusAsync(
}
// Enqueue a worker function to re-fetch this status entirely async.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
latest, statusable, _, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go
index e528581c9..ed2c1a43f 100644
--- a/internal/federation/dereferencing/thread.go
+++ b/internal/federation/dereferencing/thread.go
@@ -56,14 +56,14 @@ func (d *Dereferencer) dereferenceThread(
}
// Enqueue dereferencing remaining status thread, (children), asychronously .
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {
log.Error(ctx, err)
}
})
} else {
// This is an existing status, dereference the WHOLE thread asynchronously.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ d.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil {
log.Error(ctx, err)
}
diff --git a/internal/federation/federatingdb/accept.go b/internal/federation/federatingdb/accept.go
index 50a7c2db1..e26e5955b 100644
--- a/internal/federation/federatingdb/accept.go
+++ b/internal/federation/federatingdb/accept.go
@@ -89,13 +89,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}
- // Process side effects asynchronously.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ActivityFollow,
- APActivityType: ap.ActivityAccept,
- GTSModel: follow,
- ReceivingAccount: receivingAcct,
- RequestingAccount: requestingAcct,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ActivityFollow,
+ APActivityType: ap.ActivityAccept,
+ GTSModel: follow,
+ Receiving: receivingAcct,
+ Requesting: requestingAcct,
})
}
@@ -138,13 +137,12 @@ func (f *federatingDB) Accept(ctx context.Context, accept vocab.ActivityStreamsA
return err
}
- // Process side effects asynchronously.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ActivityFollow,
- APActivityType: ap.ActivityAccept,
- GTSModel: follow,
- ReceivingAccount: receivingAcct,
- RequestingAccount: requestingAcct,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ActivityFollow,
+ APActivityType: ap.ActivityAccept,
+ GTSModel: follow,
+ Receiving: receivingAcct,
+ Requesting: requestingAcct,
})
continue
diff --git a/internal/federation/federatingdb/announce.go b/internal/federation/federatingdb/announce.go
index 2f5950a30..3a3b91236 100644
--- a/internal/federation/federatingdb/announce.go
+++ b/internal/federation/federatingdb/announce.go
@@ -81,12 +81,12 @@ func (f *federatingDB) Announce(ctx context.Context, announce vocab.ActivityStre
}
// This is a new boost. Process side effects asynchronously.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ActivityAnnounce,
- APActivityType: ap.ActivityCreate,
- GTSModel: boost,
- ReceivingAccount: receivingAcct,
- RequestingAccount: requestingAcct,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ActivityAnnounce,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: boost,
+ Receiving: receivingAcct,
+ Requesting: requestingAcct,
})
return nil
diff --git a/internal/federation/federatingdb/announce_test.go b/internal/federation/federatingdb/announce_test.go
index 8dd5ce9da..2833c04c4 100644
--- a/internal/federation/federatingdb/announce_test.go
+++ b/internal/federation/federatingdb/announce_test.go
@@ -19,6 +19,7 @@ package federatingdb_test
import (
"testing"
+ "time"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/activity/streams/vocab"
@@ -42,7 +43,7 @@ func (suite *AnnounceTestSuite) TestNewAnnounce() {
suite.NoError(err)
// should be a message heading to the processor now, which we can intercept here
- msg := <-suite.fromFederator
+ msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ActivityAnnounce, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
@@ -69,7 +70,7 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() {
suite.NoError(err)
// should be a message heading to the processor now, which we can intercept here
- msg := <-suite.fromFederator
+ msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ActivityAnnounce, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
boost, ok := msg.GTSModel.(*gtsmodel.Status)
@@ -94,7 +95,8 @@ func (suite *AnnounceTestSuite) TestAnnounceTwice() {
// since this is a repeat announce with the same URI, just delivered to a different inbox,
// we should have nothing in the messages channel...
- suite.Empty(suite.fromFederator)
+ _, ok = suite.getFederatorMsg(time.Second)
+ suite.False(ok)
}
func TestAnnounceTestSuite(t *testing.T) {
diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go
index 94261526e..44f3cd98c 100644
--- a/internal/federation/federatingdb/create.go
+++ b/internal/federation/federatingdb/create.go
@@ -99,7 +99,7 @@ func (f *federatingDB) Create(ctx context.Context, asType vocab.Type) error {
BLOCK HANDLERS
*/
-func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requestingAccount *gtsmodel.Account) error {
+func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, receiving *gtsmodel.Account, requesting *gtsmodel.Account) error {
blockable, ok := asType.(vocab.ActivityStreamsBlock)
if !ok {
return errors.New("activityBlock: could not convert type to block")
@@ -110,10 +110,10 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec
return fmt.Errorf("activityBlock: could not convert Block to gts model block")
}
- if block.AccountID != requestingAccount.ID {
+ if block.AccountID != requesting.ID {
return fmt.Errorf(
"activityBlock: requestingAccount %s is not Block actor account %s",
- requestingAccount.URI, block.Account.URI,
+ requesting.URI, block.Account.URI,
)
}
@@ -130,12 +130,12 @@ func (f *federatingDB) activityBlock(ctx context.Context, asType vocab.Type, rec
return fmt.Errorf("activityBlock: database error inserting block: %s", err)
}
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ActivityBlock,
- APActivityType: ap.ActivityCreate,
- GTSModel: block,
- ReceivingAccount: receiving,
- RequestingAccount: requestingAccount,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ActivityBlock,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: block,
+ Receiving: receiving,
+ Requesting: requesting,
})
return nil
@@ -297,7 +297,7 @@ func (f *federatingDB) createPollOptionables(
}
// Enqueue message to the fedi API worker with poll vote(s).
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
APActivityType: ap.ActivityCreate,
APObjectType: ap.ActivityQuestion,
GTSModel: &gtsmodel.PollVote{
@@ -308,8 +308,8 @@ func (f *federatingDB) createPollOptionables(
PollID: inReplyTo.PollID,
Poll: inReplyTo.Poll,
},
- ReceivingAccount: receiver,
- RequestingAccount: requester,
+ Receiving: receiver,
+ Requesting: requester,
})
return nil
@@ -377,28 +377,28 @@ func (f *federatingDB) createStatusable(
// Pass the statusable URI (APIri) into the processor
// worker and do the rest of the processing asynchronously.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ObjectNote,
- APActivityType: ap.ActivityCreate,
- APIri: ap.GetJSONLDId(statusable),
- APObjectModel: nil,
- GTSModel: nil,
- ReceivingAccount: receiver,
- RequestingAccount: requester,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ APIRI: ap.GetJSONLDId(statusable),
+ APObject: nil,
+ GTSModel: nil,
+ Receiving: receiver,
+ Requesting: requester,
})
return nil
}
// Do the rest of the processing asynchronously. The processor
// will handle inserting/updating + further dereferencing the status.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ObjectNote,
- APActivityType: ap.ActivityCreate,
- APIri: nil,
- GTSModel: nil,
- APObjectModel: statusable,
- ReceivingAccount: receiver,
- RequestingAccount: requester,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ APIRI: nil,
+ GTSModel: nil,
+ APObject: statusable,
+ Receiving: receiver,
+ Requesting: requester,
})
return nil
@@ -439,12 +439,12 @@ func (f *federatingDB) activityFollow(ctx context.Context, asType vocab.Type, re
return fmt.Errorf("activityFollow: database error inserting follow request: %s", err)
}
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ActivityFollow,
- APActivityType: ap.ActivityCreate,
- GTSModel: followRequest,
- ReceivingAccount: receivingAccount,
- RequestingAccount: requestingAccount,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ActivityFollow,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: followRequest,
+ Receiving: receivingAccount,
+ Requesting: requestingAccount,
})
return nil
@@ -484,12 +484,12 @@ func (f *federatingDB) activityLike(ctx context.Context, asType vocab.Type, rece
return fmt.Errorf("activityLike: database error inserting fave: %w", err)
}
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ActivityLike,
- APActivityType: ap.ActivityCreate,
- GTSModel: fave,
- ReceivingAccount: receivingAccount,
- RequestingAccount: requestingAccount,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ActivityLike,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: fave,
+ Receiving: receivingAccount,
+ Requesting: requestingAccount,
})
return nil
@@ -536,12 +536,12 @@ func (f *federatingDB) activityFlag(ctx context.Context, asType vocab.Type, rece
return fmt.Errorf("activityFlag: database error inserting report: %w", err)
}
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ActivityFlag,
- APActivityType: ap.ActivityCreate,
- GTSModel: report,
- ReceivingAccount: receivingAccount,
- RequestingAccount: requestingAccount,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ActivityFlag,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: report,
+ Receiving: receivingAccount,
+ Requesting: requestingAccount,
})
return nil
diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go
index 5f80812bf..fffee1432 100644
--- a/internal/federation/federatingdb/create_test.go
+++ b/internal/federation/federatingdb/create_test.go
@@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"testing"
+ "time"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/activity/streams"
@@ -48,10 +49,10 @@ func (suite *CreateTestSuite) TestCreateNote() {
suite.NoError(err)
// should be a message heading to the processor now, which we can intercept here
- msg := <-suite.fromFederator
+ msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ObjectNote, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
- suite.Equal(note, msg.APObjectModel)
+ suite.Equal(note, msg.APObject)
}
func (suite *CreateTestSuite) TestCreateNoteForward() {
@@ -79,15 +80,15 @@ func (suite *CreateTestSuite) TestCreateNoteForward() {
suite.NoError(err)
// should be a message heading to the processor now, which we can intercept here
- msg := <-suite.fromFederator
+ msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ObjectNote, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
// nothing should be set as the model since this is a forward
- suite.Nil(msg.APObjectModel)
+ suite.Nil(msg.APObject)
// but we should have a uri set
- suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIri.String())
+ suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIRI.String())
}
func (suite *CreateTestSuite) TestCreateFlag1() {
@@ -120,7 +121,7 @@ func (suite *CreateTestSuite) TestCreateFlag1() {
}
// should be a message heading to the processor now, which we can intercept here
- msg := <-suite.fromFederator
+ msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ActivityFlag, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
diff --git a/internal/federation/federatingdb/delete.go b/internal/federation/federatingdb/delete.go
index 622ef6d3d..7e9b66c5a 100644
--- a/internal/federation/federatingdb/delete.go
+++ b/internal/federation/federatingdb/delete.go
@@ -19,10 +19,13 @@ package federatingdb
import (
"context"
+ "errors"
"net/url"
- "codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/gotosocial/internal/ap"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/messages"
)
@@ -34,43 +37,130 @@ import (
//
// The library makes this call only after acquiring a lock first.
func (f *federatingDB) Delete(ctx context.Context, id *url.URL) error {
- l := log.WithContext(ctx).
- WithFields(kv.Fields{
- {"id", id},
- }...)
- l.Debug("entering Delete")
-
activityContext := getActivityContext(ctx)
if activityContext.internal {
return nil // Already processed.
}
- requestingAcct := activityContext.requestingAcct
- receivingAcct := activityContext.receivingAcct
-
- // in a delete we only get the URI, we can't know if we have a status or a profile or something else,
- // so we have to try a few different things...
- if s, err := f.state.DB.GetStatusByURI(ctx, id.String()); err == nil && requestingAcct.ID == s.AccountID {
- l.Debugf("deleting status: %s", s.ID)
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ObjectNote,
- APActivityType: ap.ActivityDelete,
- GTSModel: s,
- ReceivingAccount: receivingAcct,
- RequestingAccount: requestingAcct,
+ // Extract receiving / requesting accounts.
+ requesting := activityContext.requestingAcct
+ receiving := activityContext.receivingAcct
+
+ // Serialize deleted ID URI.
+ // (may be status OR account)
+ uriStr := id.String()
+
+ var (
+ ok bool
+ err error
+ )
+
+ // Try delete as an account URI.
+ ok, err = f.deleteAccount(ctx,
+ requesting,
+ receiving,
+ uriStr,
+ )
+ if err != nil {
+ return err
+ } else if ok {
+ // success!
+ return nil
+ }
+
+ // Try delete as a status URI.
+ ok, err = f.deleteStatus(ctx,
+ requesting,
+ receiving,
+ uriStr,
+ )
+ if err != nil {
+ return err
+ } else if ok {
+ // success!
+ return nil
+ }
+
+ // Log at debug level, as lots of these could indicate federation
+ // issues between remote and this instance, or help with debugging.
+ log.Debugf(ctx, "received delete for unknown target: %s", uriStr)
+ return nil
+}
+
+func (f *federatingDB) deleteAccount(
+ ctx context.Context,
+ requesting *gtsmodel.Account,
+ receiving *gtsmodel.Account,
+ uri string, // target account
+) (
+ bool, // success?
+ error, // any error
+) {
+ account, err := f.state.DB.GetAccountByURI(ctx, uri)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return false, gtserror.Newf("error getting account: %w", err)
+ }
+
+ if account != nil {
+ // Ensure requesting account is
+ // only trying to delete itself.
+ if account.ID != requesting.ID {
+
+ // TODO: handled forwarded deletes,
+ // for now we silently drop this.
+ return true, nil
+ }
+
+ log.Debugf(ctx, "deleting account: %s", account.URI)
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ObjectProfile,
+ APActivityType: ap.ActivityDelete,
+ GTSModel: account,
+ Receiving: receiving,
+ Requesting: requesting,
})
+
+ return true, nil
+ }
+
+ return false, nil
+}
+
+func (f *federatingDB) deleteStatus(
+ ctx context.Context,
+ requesting *gtsmodel.Account,
+ receiving *gtsmodel.Account,
+ uri string, // target status
+) (
+ bool, // success?
+ error, // any error
+) {
+ status, err := f.state.DB.GetStatusByURI(ctx, uri)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return false, gtserror.Newf("error getting status: %w", err)
}
- if a, err := f.state.DB.GetAccountByURI(ctx, id.String()); err == nil && requestingAcct.ID == a.ID {
- l.Debugf("deleting account: %s", a.ID)
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ObjectProfile,
- APActivityType: ap.ActivityDelete,
- GTSModel: a,
- ReceivingAccount: receivingAcct,
- RequestingAccount: requestingAcct,
+ if status != nil {
+ // Ensure requesting account is only
+ // trying to delete its own statuses.
+ if status.AccountID != requesting.ID {
+
+ // TODO: handled forwarded deletes,
+ // for now we silently drop this.
+ return true, nil
+ }
+
+ log.Debugf(ctx, "deleting status: %s", status.URI)
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityDelete,
+ GTSModel: status,
+ Receiving: receiving,
+ Requesting: requesting,
})
+
+ return true, nil
}
- return nil
+ return false, nil
}
diff --git a/internal/federation/federatingdb/federatingdb_test.go b/internal/federation/federatingdb/federatingdb_test.go
index 0f227164d..360094887 100644
--- a/internal/federation/federatingdb/federatingdb_test.go
+++ b/internal/federation/federatingdb/federatingdb_test.go
@@ -19,6 +19,7 @@ package federatingdb_test
import (
"context"
+ "time"
"github.com/stretchr/testify/suite"
"github.com/superseriousbusiness/gotosocial/internal/db"
@@ -34,11 +35,10 @@ import (
type FederatingDBTestSuite struct {
suite.Suite
- db db.DB
- tc *typeutils.Converter
- fromFederator chan messages.FromFediAPI
- federatingDB federatingdb.DB
- state state.State
+ db db.DB
+ tc *typeutils.Converter
+ federatingDB federatingdb.DB
+ state state.State
testTokens map[string]*gtsmodel.Token
testClients map[string]*gtsmodel.Client
@@ -51,6 +51,13 @@ type FederatingDBTestSuite struct {
testActivities map[string]testrig.ActivityWithSignature
}
+func (suite *FederatingDBTestSuite) getFederatorMsg(timeout time.Duration) (*messages.FromFediAPI, bool) {
+ ctx := context.Background()
+ ctx, cncl := context.WithTimeout(ctx, timeout)
+ defer cncl()
+ return suite.state.Workers.Federator.Queue.PopCtx(ctx)
+}
+
func (suite *FederatingDBTestSuite) SetupSuite() {
suite.testTokens = testrig.NewTestTokens()
suite.testClients = testrig.NewTestClients()
@@ -69,13 +76,6 @@ func (suite *FederatingDBTestSuite) SetupTest() {
suite.state.Caches.Init()
testrig.StartNoopWorkers(&suite.state)
- suite.fromFederator = make(chan messages.FromFediAPI, 10)
- suite.state.Workers.EnqueueFediAPI = func(ctx context.Context, msgs ...messages.FromFediAPI) {
- for _, msg := range msgs {
- suite.fromFederator <- msg
- }
- }
-
suite.db = testrig.NewTestDB(&suite.state)
suite.testActivities = testrig.NewTestActivities(suite.testAccounts)
@@ -96,13 +96,6 @@ func (suite *FederatingDBTestSuite) SetupTest() {
func (suite *FederatingDBTestSuite) TearDownTest() {
testrig.StandardDBTeardown(suite.db)
testrig.StopWorkers(&suite.state)
- for suite.fromFederator != nil {
- select {
- case <-suite.fromFederator:
- default:
- return
- }
- }
}
func createTestContext(receivingAccount *gtsmodel.Account, requestingAccount *gtsmodel.Account) context.Context {
diff --git a/internal/federation/federatingdb/move.go b/internal/federation/federatingdb/move.go
index 2e8049e08..59dc2529c 100644
--- a/internal/federation/federatingdb/move.go
+++ b/internal/federation/federatingdb/move.go
@@ -170,12 +170,12 @@ func (f *federatingDB) Move(ctx context.Context, move vocab.ActivityStreamsMove)
// We had a Move already or stored a new Move.
// Pass back to a worker for async processing.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ObjectProfile,
- APActivityType: ap.ActivityMove,
- GTSModel: stubMove,
- RequestingAccount: requestingAcct,
- ReceivingAccount: receivingAcct,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ObjectProfile,
+ APActivityType: ap.ActivityMove,
+ GTSModel: stubMove,
+ Requesting: requestingAcct,
+ Receiving: receivingAcct,
})
return nil
diff --git a/internal/federation/federatingdb/move_test.go b/internal/federation/federatingdb/move_test.go
index 006dcf0dc..3e35dc97a 100644
--- a/internal/federation/federatingdb/move_test.go
+++ b/internal/federation/federatingdb/move_test.go
@@ -27,7 +27,6 @@ import (
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/messages"
)
type MoveTestSuite struct {
@@ -78,13 +77,7 @@ func (suite *MoveTestSuite) TestMove() {
suite.move(receivingAcct, requestingAcct, moveStr1)
// Should be a message heading to the processor.
- var msg messages.FromFediAPI
- select {
- case msg = <-suite.fromFederator:
- // Fine.
- case <-time.After(5 * time.Second):
- suite.FailNow("", "timeout waiting for suite.fromFederator")
- }
+ msg, _ := suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ObjectProfile, msg.APObjectType)
suite.Equal(ap.ActivityMove, msg.APActivityType)
@@ -101,12 +94,7 @@ func (suite *MoveTestSuite) TestMove() {
// Should be a message heading to the processor
// since this is just a straight up retry.
- select {
- case msg = <-suite.fromFederator:
- // Fine.
- case <-time.After(5 * time.Second):
- suite.FailNow("", "timeout waiting for suite.fromFederator")
- }
+ msg, _ = suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ObjectProfile, msg.APObjectType)
suite.Equal(ap.ActivityMove, msg.APActivityType)
@@ -126,12 +114,7 @@ func (suite *MoveTestSuite) TestMove() {
// Should be a message heading to the processor
// since this is just a retry with a different ID.
- select {
- case msg = <-suite.fromFederator:
- // Fine.
- case <-time.After(5 * time.Second):
- suite.FailNow("", "timeout waiting for suite.fromFederator")
- }
+ msg, _ = suite.getFederatorMsg(5 * time.Second)
suite.Equal(ap.ObjectProfile, msg.APObjectType)
suite.Equal(ap.ActivityMove, msg.APActivityType)
}
diff --git a/internal/federation/federatingdb/reject_test.go b/internal/federation/federatingdb/reject_test.go
index d4c537a92..f51ffaf56 100644
--- a/internal/federation/federatingdb/reject_test.go
+++ b/internal/federation/federatingdb/reject_test.go
@@ -81,7 +81,8 @@ func (suite *RejectTestSuite) TestRejectFollowRequest() {
suite.NoError(err)
// there should be nothing in the federator channel since nothing needs to be passed
- suite.Empty(suite.fromFederator)
+ _, ok := suite.getFederatorMsg(time.Second)
+ suite.False(ok)
// the follow request should not be in the database anymore -- it's been rejected
err = suite.db.GetByID(ctx, fr.ID, &gtsmodel.FollowRequest{})
diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go
index 733abba0d..2f00e0867 100644
--- a/internal/federation/federatingdb/update.go
+++ b/internal/federation/federatingdb/update.go
@@ -98,13 +98,13 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts
// was delivered along with the Update, for further asynchronous
// updating of eg., avatar/header, emojis, etc. The actual db
// inserts/updates will take place there.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ObjectProfile,
- APActivityType: ap.ActivityUpdate,
- GTSModel: requestingAcct,
- APObjectModel: accountable,
- ReceivingAccount: receivingAcct,
- RequestingAccount: requestingAcct,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ObjectProfile,
+ APActivityType: ap.ActivityUpdate,
+ GTSModel: requestingAcct,
+ APObject: accountable,
+ Receiving: receivingAcct,
+ Requesting: requestingAcct,
})
return nil
@@ -155,13 +155,13 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm
// Queue an UPDATE NOTE activity to our fedi API worker,
// this will handle necessary database insertions, etc.
- f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
- APObjectType: ap.ObjectNote,
- APActivityType: ap.ActivityUpdate,
- GTSModel: status, // original status
- APObjectModel: (ap.Statusable)(statusable),
- ReceivingAccount: receivingAcct,
- RequestingAccount: requestingAcct,
+ f.state.Workers.Federator.Queue.Push(&messages.FromFediAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityUpdate,
+ GTSModel: status, // original status
+ APObject: (ap.Statusable)(statusable),
+ Receiving: receivingAcct,
+ Requesting: requestingAcct,
})
return nil