diff options
Diffstat (limited to 'internal/processing/account')
-rw-r--r-- | internal/processing/account/account_test.go | 16 | ||||
-rw-r--r-- | internal/processing/account/block.go | 14 | ||||
-rw-r--r-- | internal/processing/account/create.go | 4 | ||||
-rw-r--r-- | internal/processing/account/delete.go | 42 | ||||
-rw-r--r-- | internal/processing/account/follow.go | 24 | ||||
-rw-r--r-- | internal/processing/account/follow_request.go | 12 | ||||
-rw-r--r-- | internal/processing/account/follow_test.go | 14 | ||||
-rw-r--r-- | internal/processing/account/move.go | 6 | ||||
-rw-r--r-- | internal/processing/account/move_test.go | 38 | ||||
-rw-r--r-- | internal/processing/account/update.go | 4 | ||||
-rw-r--r-- | internal/processing/account/update_test.go | 75 |
11 files changed, 135 insertions, 114 deletions
diff --git a/internal/processing/account/account_test.go b/internal/processing/account/account_test.go index 244fc89d6..10d5f91e1 100644 --- a/internal/processing/account/account_test.go +++ b/internal/processing/account/account_test.go @@ -19,6 +19,7 @@ package account_test import ( "context" + "time" "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/db" @@ -48,7 +49,6 @@ type AccountStandardTestSuite struct { state state.State mediaManager *media.Manager oauthServer oauth.Server - fromClientAPIChan chan messages.FromClientAPI transportController transport.Controller federator *federation.Federator emailSender email.Sender @@ -68,6 +68,13 @@ type AccountStandardTestSuite struct { accountProcessor account.Processor } +func (suite *AccountStandardTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) { + ctx := context.Background() + ctx, cncl := context.WithTimeout(ctx, timeout) + defer cncl() + return suite.state.Workers.Client.Queue.PopCtx(ctx) +} + func (suite *AccountStandardTestSuite) SetupSuite() { suite.testTokens = testrig.NewTestTokens() suite.testClients = testrig.NewTestClients() @@ -101,13 +108,6 @@ func (suite *AccountStandardTestSuite) SetupTest() { suite.mediaManager = testrig.NewTestMediaManager(&suite.state) suite.oauthServer = testrig.NewTestOauthServer(suite.db) - suite.fromClientAPIChan = make(chan messages.FromClientAPI, 100) - suite.state.Workers.EnqueueClientAPI = func(ctx context.Context, msgs ...messages.FromClientAPI) { - for _, msg := range msgs { - suite.fromClientAPIChan <- msg - } - } - suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media")) suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) suite.sentEmails = make(map[string]string) diff --git a/internal/processing/account/block.go b/internal/processing/account/block.go index 49f81557d..d3904bffa 100644 --- a/internal/processing/account/block.go +++ b/internal/processing/account/block.go @@ -83,16 +83,16 @@ func (p *Processor) BlockCreate(ctx context.Context, requestingAccount *gtsmodel } // Process block side effects (federation etc). - msgs = append(msgs, messages.FromClientAPI{ + msgs = append(msgs, &messages.FromClientAPI{ APObjectType: ap.ActivityBlock, APActivityType: ap.ActivityCreate, GTSModel: block, - OriginAccount: requestingAccount, - TargetAccount: targetAccount, + Origin: requestingAccount, + Target: targetAccount, }) // Batch queue accreted client api messages. - p.state.Workers.EnqueueClientAPI(ctx, msgs...) + p.state.Workers.Client.Queue.Push(msgs...) return p.RelationshipGet(ctx, requestingAccount, targetAccountID) } @@ -120,12 +120,12 @@ func (p *Processor) BlockRemove(ctx context.Context, requestingAccount *gtsmodel existingBlock.TargetAccount = targetAccount // Process block removal side effects (federation etc). - p.state.Workers.EnqueueClientAPI(ctx, messages.FromClientAPI{ + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ActivityBlock, APActivityType: ap.ActivityUndo, GTSModel: existingBlock, - OriginAccount: requestingAccount, - TargetAccount: targetAccount, + Origin: requestingAccount, + Target: targetAccount, }) return p.RelationshipGet(ctx, requestingAccount, targetAccountID) diff --git a/internal/processing/account/create.go b/internal/processing/account/create.go index 12b2d5e57..761165356 100644 --- a/internal/processing/account/create.go +++ b/internal/processing/account/create.go @@ -126,11 +126,11 @@ func (p *Processor) Create( // There are side effects for creating a new account // (confirmation emails etc), perform these async. - p.state.Workers.EnqueueClientAPI(ctx, messages.FromClientAPI{ + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ObjectProfile, APActivityType: ap.ActivityCreate, GTSModel: user, - OriginAccount: user.Account, + Origin: user.Account, }) return user, nil diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go index a900c566d..3f051edf0 100644 --- a/internal/processing/account/delete.go +++ b/internal/processing/account/delete.go @@ -102,16 +102,13 @@ func (p *Processor) Delete( // and the above Delete function will be called afterwards from the processor, to clear // out the account's bits and bobs, and stubbify it. func (p *Processor) DeleteSelf(ctx context.Context, account *gtsmodel.Account) gtserror.WithCode { - fromClientAPIMessage := messages.FromClientAPI{ + // Process the delete side effects asynchronously. + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ActorPerson, APActivityType: ap.ActivityDelete, - OriginAccount: account, - TargetAccount: account, - } - - // Process the delete side effects asynchronously. - p.state.Workers.EnqueueClientAPI(ctx, fromClientAPIMessage) - + Origin: account, + Target: account, + }) return nil } @@ -193,7 +190,8 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. var ( // Use this slice to batch unfollow messages. - msgs = []messages.FromClientAPI{} + msgs = []*messages.FromClientAPI{} + // To avoid checking if account is local over + over // inside the subsequent loops, just generate static // side effects function once now. @@ -214,7 +212,7 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. } if msg := unfollowSideEffects(ctx, account, follow); msg != nil { // There was a side effect to process. - msgs = append(msgs, *msg) + msgs = append(msgs, msg) } } @@ -244,13 +242,13 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. if msg := unfollowSideEffects(ctx, account, follow); msg != nil { // There was a side effect to process. - msgs = append(msgs, *msg) + msgs = append(msgs, msg) } } // Process accreted messages in serial. for _, msg := range msgs { - if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { + if err := p.state.Workers.Client.Process(ctx, msg); err != nil { log.Errorf( ctx, "error processing %s of %s during Delete of account %s: %v", @@ -306,8 +304,8 @@ func (p *Processor) unfollowSideEffectsFunc(local bool) func( APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityUndo, GTSModel: follow, - OriginAccount: account, - TargetAccount: follow.TargetAccount, + Origin: account, + Target: follow.TargetAccount, } } } @@ -337,7 +335,7 @@ func (p *Processor) deleteAccountStatuses( statuses []*gtsmodel.Status err error maxID string - msgs = []messages.FromClientAPI{} + msgs = []*messages.FromClientAPI{} ) statusLoop: @@ -404,29 +402,29 @@ statusLoop: continue } - msgs = append(msgs, messages.FromClientAPI{ + msgs = append(msgs, &messages.FromClientAPI{ APObjectType: ap.ActivityAnnounce, APActivityType: ap.ActivityUndo, GTSModel: status, - OriginAccount: boost.Account, - TargetAccount: account, + Origin: boost.Account, + Target: account, }) } // Now prepare to Delete status. - msgs = append(msgs, messages.FromClientAPI{ + msgs = append(msgs, &messages.FromClientAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityDelete, GTSModel: status, - OriginAccount: account, - TargetAccount: account, + Origin: account, + Target: account, }) } } // Process accreted messages in serial. for _, msg := range msgs { - if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { + if err := p.state.Workers.Client.Process(ctx, msg); err != nil { log.Errorf( ctx, "error processing %s of %s during Delete of account %s: %v", diff --git a/internal/processing/account/follow.go b/internal/processing/account/follow.go index 7f28bb2c0..402c764e8 100644 --- a/internal/processing/account/follow.go +++ b/internal/processing/account/follow.go @@ -117,12 +117,12 @@ func (p *Processor) FollowCreate(ctx context.Context, requestingAccount *gtsmode } else { // Otherwise we leave the follow request as it is, // and we handle the rest of the process async. - p.state.Workers.EnqueueClientAPI(ctx, messages.FromClientAPI{ + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityCreate, GTSModel: fr, - OriginAccount: requestingAccount, - TargetAccount: targetAccount, + Origin: requestingAccount, + Target: targetAccount, }) } @@ -143,7 +143,7 @@ func (p *Processor) FollowRemove(ctx context.Context, requestingAccount *gtsmode } // Batch queue accreted client api messages. - p.state.Workers.EnqueueClientAPI(ctx, msgs...) + p.state.Workers.Client.Queue.Push(msgs...) return p.RelationshipGet(ctx, requestingAccount, targetAccountID) } @@ -225,8 +225,8 @@ func (p *Processor) getFollowTarget(ctx context.Context, requester *gtsmodel.Acc // If a follow and/or follow request was removed this way, one or two // messages will be returned which should then be processed by a client // api worker. -func (p *Processor) unfollow(ctx context.Context, requestingAccount *gtsmodel.Account, targetAccount *gtsmodel.Account) ([]messages.FromClientAPI, error) { - var msgs []messages.FromClientAPI +func (p *Processor) unfollow(ctx context.Context, requestingAccount *gtsmodel.Account, targetAccount *gtsmodel.Account) ([]*messages.FromClientAPI, error) { + var msgs []*messages.FromClientAPI // Get follow from requesting account to target account. follow, err := p.state.DB.GetFollow(ctx, requestingAccount.ID, targetAccount.ID) @@ -251,7 +251,7 @@ func (p *Processor) unfollow(ctx context.Context, requestingAccount *gtsmodel.Ac } // Follow status changed, process side effects. - msgs = append(msgs, messages.FromClientAPI{ + msgs = append(msgs, &messages.FromClientAPI{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityUndo, GTSModel: >smodel.Follow{ @@ -259,8 +259,8 @@ func (p *Processor) unfollow(ctx context.Context, requestingAccount *gtsmodel.Ac TargetAccountID: targetAccount.ID, URI: follow.URI, }, - OriginAccount: requestingAccount, - TargetAccount: targetAccount, + Origin: requestingAccount, + Target: targetAccount, }) } @@ -287,7 +287,7 @@ func (p *Processor) unfollow(ctx context.Context, requestingAccount *gtsmodel.Ac } // Follow status changed, process side effects. - msgs = append(msgs, messages.FromClientAPI{ + msgs = append(msgs, &messages.FromClientAPI{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityUndo, GTSModel: >smodel.Follow{ @@ -295,8 +295,8 @@ func (p *Processor) unfollow(ctx context.Context, requestingAccount *gtsmodel.Ac TargetAccountID: targetAccount.ID, URI: followReq.URI, }, - OriginAccount: requestingAccount, - TargetAccount: targetAccount, + Origin: requestingAccount, + Target: targetAccount, }) } diff --git a/internal/processing/account/follow_request.go b/internal/processing/account/follow_request.go index c054637c8..6f6c7ba2d 100644 --- a/internal/processing/account/follow_request.go +++ b/internal/processing/account/follow_request.go @@ -40,12 +40,12 @@ func (p *Processor) FollowRequestAccept(ctx context.Context, requestingAccount * if follow.Account != nil { // Only enqueue work in the case we have a request creating account stored. // NOTE: due to how AcceptFollowRequest works, the inverse shouldn't be possible. - p.state.Workers.EnqueueClientAPI(ctx, messages.FromClientAPI{ + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityAccept, GTSModel: follow, - OriginAccount: follow.Account, - TargetAccount: follow.TargetAccount, + Origin: follow.Account, + Target: follow.TargetAccount, }) } @@ -67,12 +67,12 @@ func (p *Processor) FollowRequestReject(ctx context.Context, requestingAccount * if followRequest.Account != nil { // Only enqueue work in the case we have a request creating account stored. // NOTE: due to how GetFollowRequest works, the inverse shouldn't be possible. - p.state.Workers.EnqueueClientAPI(ctx, messages.FromClientAPI{ + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ActivityFollow, APActivityType: ap.ActivityReject, GTSModel: followRequest, - OriginAccount: followRequest.Account, - TargetAccount: followRequest.TargetAccount, + Origin: followRequest.Account, + Target: followRequest.TargetAccount, }) } diff --git a/internal/processing/account/follow_test.go b/internal/processing/account/follow_test.go index c269dc710..9ea8ce1b8 100644 --- a/internal/processing/account/follow_test.go +++ b/internal/processing/account/follow_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/ap" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -152,18 +151,11 @@ func (suite *FollowTestSuite) TestFollowRequestLocal() { } // There should be a message going to the worker. - var cMsg messages.FromClientAPI - select { - case cMsg = <-suite.fromClientAPIChan: - // No problem. - case <-time.After(5 * time.Second): - suite.FailNow("timed out waiting for message") - } - + cMsg, _ := suite.getClientMsg(5 * time.Second) suite.Equal(ap.ActivityCreate, cMsg.APActivityType) suite.Equal(ap.ActivityFollow, cMsg.APObjectType) - suite.Equal(requestingAccount.ID, cMsg.OriginAccount.ID) - suite.Equal(targetAccount.ID, cMsg.TargetAccount.ID) + suite.Equal(requestingAccount.ID, cMsg.Origin.ID) + suite.Equal(targetAccount.ID, cMsg.Target.ID) } func TestFollowTestS(t *testing.T) { diff --git a/internal/processing/account/move.go b/internal/processing/account/move.go index 602e8c021..63187dfd1 100644 --- a/internal/processing/account/move.go +++ b/internal/processing/account/move.go @@ -317,12 +317,12 @@ func (p *Processor) MoveSelf( } // Everything seems OK, process Move side effects async. - p.state.Workers.EnqueueClientAPI(ctx, messages.FromClientAPI{ + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ActorPerson, APActivityType: ap.ActivityMove, GTSModel: move, - OriginAccount: originAcct, - TargetAccount: targetAcct, + Origin: originAcct, + Target: targetAcct, }) return nil diff --git a/internal/processing/account/move_test.go b/internal/processing/account/move_test.go index dfa0ea4e4..c1a931252 100644 --- a/internal/processing/account/move_test.go +++ b/internal/processing/account/move_test.go @@ -70,29 +70,23 @@ func (suite *MoveTestSuite) TestMoveAccountOK() { suite.FailNow(err.Error()) } - // There should be a msg heading back to fromClientAPI. - select { - case msg := <-suite.fromClientAPIChan: - move, ok := msg.GTSModel.(*gtsmodel.Move) - if !ok { - suite.FailNow("", "could not cast %T to *gtsmodel.Move", move) - } - - now := time.Now() - suite.WithinDuration(now, move.CreatedAt, 5*time.Second) - suite.WithinDuration(now, move.UpdatedAt, 5*time.Second) - suite.WithinDuration(now, move.AttemptedAt, 5*time.Second) - suite.Zero(move.SucceededAt) - suite.NotZero(move.ID) - suite.Equal(requestingAcct.URI, move.OriginURI) - suite.NotNil(move.Origin) - suite.Equal(targetAcct.URI, move.TargetURI) - suite.NotNil(move.Target) - suite.NotZero(move.URI) - - case <-time.After(5 * time.Second): - suite.FailNow("time out waiting for message") + // There should be a message going to the worker. + cMsg, _ := suite.getClientMsg(5 * time.Second) + move, ok := cMsg.GTSModel.(*gtsmodel.Move) + if !ok { + suite.FailNow("", "could not cast %T to *gtsmodel.Move", move) } + now := time.Now() + suite.WithinDuration(now, move.CreatedAt, 5*time.Second) + suite.WithinDuration(now, move.UpdatedAt, 5*time.Second) + suite.WithinDuration(now, move.AttemptedAt, 5*time.Second) + suite.Zero(move.SucceededAt) + suite.NotZero(move.ID) + suite.Equal(requestingAcct.URI, move.OriginURI) + suite.NotNil(move.Origin) + suite.Equal(targetAcct.URI, move.TargetURI) + suite.NotNil(move.Target) + suite.NotZero(move.URI) // Move should be in the database now. move, err := suite.state.DB.GetMoveByOriginTarget( diff --git a/internal/processing/account/update.go b/internal/processing/account/update.go index 670620e19..7f2749503 100644 --- a/internal/processing/account/update.go +++ b/internal/processing/account/update.go @@ -296,11 +296,11 @@ func (p *Processor) Update(ctx context.Context, account *gtsmodel.Account, form return nil, gtserror.NewErrorInternalError(fmt.Errorf("could not update account settings %s: %s", account.ID, err)) } - p.state.Workers.EnqueueClientAPI(ctx, messages.FromClientAPI{ + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ APObjectType: ap.ObjectProfile, APActivityType: ap.ActivityUpdate, GTSModel: account, - OriginAccount: account, + Origin: account, }) acctSensitive, err := p.converter.AccountToAPIAccountSensitive(ctx, account) diff --git a/internal/processing/account/update_test.go b/internal/processing/account/update_test.go index 76ad3abe8..ad09ff25c 100644 --- a/internal/processing/account/update_test.go +++ b/internal/processing/account/update_test.go @@ -20,6 +20,7 @@ package account_test import ( "context" "testing" + "time" "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/ap" @@ -31,20 +32,6 @@ type AccountUpdateTestSuite struct { AccountStandardTestSuite } -func (suite *AccountStandardTestSuite) checkClientAPIChan(accountID string) { - msg := <-suite.fromClientAPIChan - - // Profile update. - suite.Equal(ap.ActivityUpdate, msg.APActivityType) - suite.Equal(ap.ObjectProfile, msg.APObjectType) - - // Correct account updated. - if msg.OriginAccount == nil { - suite.FailNow("expected msg.OriginAccount not to be nil") - } - suite.Equal(accountID, msg.OriginAccount.ID) -} - func (suite *AccountUpdateTestSuite) TestAccountUpdateSimple() { testAccount := >smodel.Account{} *testAccount = *suite.testAccounts["local_account_1"] @@ -73,7 +60,17 @@ func (suite *AccountUpdateTestSuite) TestAccountUpdateSimple() { suite.Equal(noteExpected, apiAccount.Note) // We should have an update in the client api channel. - suite.checkClientAPIChan(testAccount.ID) + msg, _ := suite.getClientMsg(5 * time.Second) + + // Profile update. + suite.Equal(ap.ActivityUpdate, msg.APActivityType) + suite.Equal(ap.ObjectProfile, msg.APObjectType) + + // Correct account updated. + if msg.Origin == nil { + suite.FailNow("expected msg.OriginAccount not to be nil") + } + suite.Equal(testAccount.ID, msg.Origin.ID) // Check database model of account as well. dbAccount, err := suite.db.GetAccountByID(ctx, testAccount.ID) @@ -113,7 +110,17 @@ func (suite *AccountUpdateTestSuite) TestAccountUpdateWithMention() { suite.Equal(noteExpected, apiAccount.Note) // We should have an update in the client api channel. - suite.checkClientAPIChan(testAccount.ID) + msg, _ := suite.getClientMsg(5 * time.Second) + + // Profile update. + suite.Equal(ap.ActivityUpdate, msg.APActivityType) + suite.Equal(ap.ObjectProfile, msg.APObjectType) + + // Correct account updated. + if msg.Origin == nil { + suite.FailNow("expected msg.OriginAccount not to be nil") + } + suite.Equal(testAccount.ID, msg.Origin.ID) // Check database model of account as well. dbAccount, err := suite.db.GetAccountByID(ctx, testAccount.ID) @@ -159,7 +166,17 @@ func (suite *AccountUpdateTestSuite) TestAccountUpdateWithMarkdownNote() { suite.Equal(noteExpected, apiAccount.Note) // We should have an update in the client api channel. - suite.checkClientAPIChan(testAccount.ID) + msg, _ := suite.getClientMsg(5 * time.Second) + + // Profile update. + suite.Equal(ap.ActivityUpdate, msg.APActivityType) + suite.Equal(ap.ObjectProfile, msg.APObjectType) + + // Correct account updated. + if msg.Origin == nil { + suite.FailNow("expected msg.OriginAccount not to be nil") + } + suite.Equal(testAccount.ID, msg.Origin.ID) // Check database model of account as well. dbAccount, err := suite.db.GetAccountByID(ctx, testAccount.ID) @@ -234,7 +251,17 @@ func (suite *AccountUpdateTestSuite) TestAccountUpdateWithFields() { suite.EqualValues(emojisExpected, apiAccount.Emojis) // We should have an update in the client api channel. - suite.checkClientAPIChan(testAccount.ID) + msg, _ := suite.getClientMsg(5 * time.Second) + + // Profile update. + suite.Equal(ap.ActivityUpdate, msg.APActivityType) + suite.Equal(ap.ObjectProfile, msg.APObjectType) + + // Correct account updated. + if msg.Origin == nil { + suite.FailNow("expected msg.OriginAccount not to be nil") + } + suite.Equal(testAccount.ID, msg.Origin.ID) // Check database model of account as well. dbAccount, err := suite.db.GetAccountByID(ctx, testAccount.ID) @@ -281,7 +308,17 @@ func (suite *AccountUpdateTestSuite) TestAccountUpdateNoteNotFields() { suite.Equal(fieldsBefore, len(apiAccount.Fields)) // We should have an update in the client api channel. - suite.checkClientAPIChan(testAccount.ID) + msg, _ := suite.getClientMsg(5 * time.Second) + + // Profile update. + suite.Equal(ap.ActivityUpdate, msg.APActivityType) + suite.Equal(ap.ObjectProfile, msg.APObjectType) + + // Correct account updated. + if msg.Origin == nil { + suite.FailNow("expected msg.OriginAccount not to be nil") + } + suite.Equal(testAccount.ID, msg.Origin.ID) // Check database model of account as well. dbAccount, err := suite.db.GetAccountByID(ctx, testAccount.ID) |