diff options
author | 2023-11-14 15:57:25 +0100 | |
---|---|---|
committer | 2023-11-14 15:57:25 +0100 | |
commit | 4ee436e98a3351d9568c4a018bd2de34c218e9a6 (patch) | |
tree | 4b919150266fb327805f78663e9e705179598a7b /internal/processing | |
parent | [bugfix] Update poll delete/update db queries (#2361) (diff) | |
download | gotosocial-4ee436e98a3351d9568c4a018bd2de34c218e9a6.tar.xz |
[bugfix] process account delete side effects in serial, not in parallel (#2360)
* [bugfix] process account delete side effects in serial, not in parallel
* StartWorkers / StartNoopWorkers for tests
* undo testrig trace logging
* log errors instead of immediately returning
Diffstat (limited to 'internal/processing')
-rw-r--r-- | internal/processing/account/account_test.go | 2 | ||||
-rw-r--r-- | internal/processing/account/delete.go | 164 | ||||
-rw-r--r-- | internal/processing/admin/admin_test.go | 4 | ||||
-rw-r--r-- | internal/processing/polls/poll_test.go | 2 | ||||
-rw-r--r-- | internal/processing/processor_test.go | 2 | ||||
-rw-r--r-- | internal/processing/status/status_test.go | 2 | ||||
-rw-r--r-- | internal/processing/workers/workers_test.go | 3 |
7 files changed, 117 insertions, 62 deletions
diff --git a/internal/processing/account/account_test.go b/internal/processing/account/account_test.go index 7f259815a..d43b4c937 100644 --- a/internal/processing/account/account_test.go +++ b/internal/processing/account/account_test.go @@ -81,7 +81,7 @@ func (suite *AccountStandardTestSuite) SetupSuite() { func (suite *AccountStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go index bf1ea2f44..bd320571f 100644 --- a/internal/processing/account/delete.go +++ b/internal/processing/account/delete.go @@ -27,6 +27,7 @@ import ( "github.com/google/uuid" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -39,38 +40,45 @@ const deleteSelectLimit = 50 // Delete deletes an account, and all of that account's statuses, media, follows, notifications, etc etc etc. // The origin passed here should be either the ID of the account doing the delete (can be itself), or the ID of a domain block. -func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, origin string) gtserror.WithCode { +func (p *Processor) Delete( + ctx context.Context, + account *gtsmodel.Account, + origin string, +) gtserror.WithCode { l := log.WithContext(ctx).WithFields(kv.Fields{ {"username", account.Username}, {"domain", account.Domain}, }...) l.Trace("beginning account delete process") - if err := p.deleteAccountFollows(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + // Delete statuses *before* follows to ensure correct addressing + // of any outgoing fedi messages generated by deleting statuses. + if err := p.deleteAccountStatuses(ctx, account); err != nil { + l.Errorf("continuing after error during account delete: %v", err) } - if err := p.deleteAccountBlocks(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + if err := p.deleteAccountFollows(ctx, account); err != nil { + l.Errorf("continuing after error during account delete: %v", err) } - if err := p.deleteAccountStatuses(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + if err := p.deleteAccountBlocks(ctx, account); err != nil { + l.Errorf("continuing after error during account delete: %v", err) } if err := p.deleteAccountNotifications(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } if err := p.deleteAccountPeripheral(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } if account.IsLocal() { - // we tokens, applications and clients for account as one of the last - // stages during deletion, as other database models rely on these. + // We delete tokens, applications and clients for + // account as one of the last stages during deletion, + // as other database models rely on these. if err := p.deleteUserAndTokensForAccount(ctx, account); err != nil { - return gtserror.NewErrorInternalError(err) + l.Errorf("continuing after error during account delete: %v", err) } } @@ -83,7 +91,7 @@ func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, origi return gtserror.NewErrorInternalError(err) } - l.Info("account deleted") + l.Info("account delete process complete") return nil } @@ -189,7 +197,7 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. // To avoid checking if account is local over + over // inside the subsequent loops, just generate static // side effects function once now. - unfollowSideEffects = p.unfollowSideEffectsFunc(account) + unfollowSideEffects = p.unfollowSideEffectsFunc(account.IsLocal()) ) // Delete follows originating from this account. @@ -240,31 +248,56 @@ func (p *Processor) deleteAccountFollows(ctx context.Context, account *gtsmodel. } } - // Process accreted messages asynchronously. - p.state.Workers.EnqueueClientAPI(ctx, msgs...) + // Process accreted messages in serial. + for _, msg := range msgs { + if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { + log.Errorf( + ctx, + "error processing %s of %s during Delete of account %s: %v", + msg.APActivityType, msg.APObjectType, account.ID, err, + ) + } + } return nil } -func (p *Processor) unfollowSideEffectsFunc(deletedAccount *gtsmodel.Account) func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { - if !deletedAccount.IsLocal() { +func (p *Processor) unfollowSideEffectsFunc(local bool) func( + ctx context.Context, + account *gtsmodel.Account, + follow *gtsmodel.Follow, +) *messages.FromClientAPI { + if !local { // Don't try to process side effects // for accounts that aren't local. - return func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { - return nil // noop + return func( + _ context.Context, + _ *gtsmodel.Account, + _ *gtsmodel.Follow, + ) *messages.FromClientAPI { + // noop + return nil } } - return func(ctx context.Context, account *gtsmodel.Account, follow *gtsmodel.Follow) *messages.FromClientAPI { + return func( + ctx context.Context, + account *gtsmodel.Account, + follow *gtsmodel.Follow, + ) *messages.FromClientAPI { if follow.TargetAccount == nil { // TargetAccount seems to have gone; // race condition? db corruption? - log.WithContext(ctx).WithField("follow", follow).Warn("follow had no TargetAccount, likely race condition") + log. + WithContext(ctx). + WithField("follow", follow). + Warn("follow had no TargetAccount, likely race condition") return nil } if follow.TargetAccount.IsLocal() { - // No side effects for local unfollows. + // No side effects + // for local unfollows. return nil } @@ -288,8 +321,11 @@ func (p *Processor) deleteAccountBlocks(ctx context.Context, account *gtsmodel.A // deleteAccountStatuses iterates through all statuses owned by // the given account, passing each discovered status (and boosts -// thereof) to the processor workers for further async processing. -func (p *Processor) deleteAccountStatuses(ctx context.Context, account *gtsmodel.Account) error { +// thereof) to the processor workers for further processing. +func (p *Processor) deleteAccountStatuses( + ctx context.Context, + account *gtsmodel.Account, +) error { // We'll select statuses 50 at a time so we don't wreck the db, // and pass them through to the client api worker to handle. // @@ -331,42 +367,43 @@ statusLoop: maxID = statuses[len(statuses)-1].ID for _, status := range statuses { - status.Account = account // ensure account is set - - // Pass the status delete through the client api worker for processing. - msgs = append(msgs, messages.FromClientAPI{ - APObjectType: ap.ObjectNote, - APActivityType: ap.ActivityDelete, - GTSModel: status, - OriginAccount: account, - TargetAccount: account, - }) + // Ensure account is set. + status.Account = account // Look for any boosts of this status in DB. - boosts, err := p.state.DB.GetStatusBoosts(ctx, status.ID) + // + // We put these in the msgs slice first so + // that they're handled first, before the + // parent status that's being boosted. + // + // Use a barebones context and just select the + // origin account separately. The rest will be + // populated later anyway, and we don't want to + // stop now because we couldn't get something. + boosts, err := p.state.DB.GetStatusBoosts( + gtscontext.SetBarebones(ctx), + status.ID, + ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return gtserror.Newf("error fetching status reblogs for %s: %w", status.ID, err) + return gtserror.Newf("error fetching status boosts for %s: %w", status.ID, err) } + // Prepare to Undo each boost. for _, boost := range boosts { - if boost.Account == nil { - // Fetch the relevant account for this status boost. - boostAcc, err := p.state.DB.GetAccountByID(ctx, boost.AccountID) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - // We don't have an account for this boost - // for some reason, so just skip processing. - log.WithContext(ctx).WithField("boost", boost).Warnf("no account found with id %s for boost %s", boost.AccountID, boost.ID) - continue - } - return gtserror.Newf("error fetching boosted status account for %s: %w", boost.AccountID, err) - } - - // Set account model - boost.Account = boostAcc + boost.Account, err = p.state.DB.GetAccountByID( + gtscontext.SetBarebones(ctx), + boost.AccountID, + ) + + if err != nil { + log.Warnf( + ctx, + "db error getting owner %s of status boost %s: %v", + boost.AccountID, boost.ID, err, + ) + continue } - // Pass the boost delete through the client api worker for processing. msgs = append(msgs, messages.FromClientAPI{ APObjectType: ap.ActivityAnnounce, APActivityType: ap.ActivityUndo, @@ -375,11 +412,28 @@ statusLoop: TargetAccount: account, }) } + + // Now prepare to Delete status. + msgs = append(msgs, messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityDelete, + GTSModel: status, + OriginAccount: account, + TargetAccount: account, + }) } } - // Batch process all accreted messages. - p.state.Workers.EnqueueClientAPI(ctx, msgs...) + // Process accreted messages in serial. + for _, msg := range msgs { + if err := p.state.Workers.ProcessFromClientAPI(ctx, msg); err != nil { + log.Errorf( + ctx, + "error processing %s of %s during Delete of account %s: %v", + msg.APActivityType, msg.APObjectType, account.ID, err, + ) + } + } return nil } diff --git a/internal/processing/admin/admin_test.go b/internal/processing/admin/admin_test.go index 614735ee1..367924664 100644 --- a/internal/processing/admin/admin_test.go +++ b/internal/processing/admin/admin_test.go @@ -80,7 +80,7 @@ func (suite *AdminStandardTestSuite) SetupSuite() { func (suite *AdminStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.InitTestConfig() testrig.InitTestLog() @@ -115,7 +115,7 @@ func (suite *AdminStandardTestSuite) SetupTest() { suite.emailSender, ) - suite.state.Workers.ProcessFromClientAPI = suite.processor.Workers().ProcessFromClientAPI + testrig.StartWorkers(&suite.state, suite.processor.Workers()) suite.adminProcessor = suite.processor.Admin() testrig.StandardDBSetup(suite.db, nil) diff --git a/internal/processing/polls/poll_test.go b/internal/processing/polls/poll_test.go index 15a1938a8..59e9aeb60 100644 --- a/internal/processing/polls/poll_test.go +++ b/internal/processing/polls/poll_test.go @@ -50,7 +50,7 @@ func (suite *PollTestSuite) SetupTest() { testrig.InitTestConfig() testrig.InitTestLog() suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.NewTestDB(&suite.state) converter := typeutils.NewConverter(&suite.state) controller := testrig.NewTestTransportController(&suite.state, nil) diff --git a/internal/processing/processor_test.go b/internal/processing/processor_test.go index 2e0baae96..148ec42ed 100644 --- a/internal/processing/processor_test.go +++ b/internal/processing/processor_test.go @@ -95,7 +95,7 @@ func (suite *ProcessingStandardTestSuite) SetupSuite() { func (suite *ProcessingStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/processing/status/status_test.go b/internal/processing/status/status_test.go index dd9ad00f8..2a47a205e 100644 --- a/internal/processing/status/status_test.go +++ b/internal/processing/status/status_test.go @@ -74,7 +74,7 @@ func (suite *StatusStandardTestSuite) SetupSuite() { func (suite *StatusStandardTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) + testrig.StartNoopWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go index c97e9eeb8..cbdd10f9a 100644 --- a/internal/processing/workers/workers_test.go +++ b/internal/processing/workers/workers_test.go @@ -97,7 +97,6 @@ func (suite *WorkersTestSuite) SetupSuite() { func (suite *WorkersTestSuite) SetupTest() { suite.state.Caches.Init() - testrig.StartWorkers(&suite.state) testrig.InitTestConfig() testrig.InitTestLog() @@ -126,6 +125,8 @@ func (suite *WorkersTestSuite) SetupTest() { suite.emailSender = testrig.NewEmailSender("../../../web/template/", nil) suite.processor = processing.NewProcessor(cleaner.New(&suite.state), suite.typeconverter, suite.federator, suite.oauthServer, suite.mediaManager, &suite.state, suite.emailSender) + testrig.StartWorkers(&suite.state, suite.processor.Workers()) + suite.state.Workers.EnqueueClientAPI = suite.processor.Workers().EnqueueClientAPI suite.state.Workers.EnqueueFediAPI = suite.processor.Workers().EnqueueFediAPI |