diff options
author | 2024-04-26 13:50:46 +0100 | |
---|---|---|
committer | 2024-04-26 13:50:46 +0100 | |
commit | c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 (patch) | |
tree | dbd3409070765d5ca81448a574ccd32b4da1ffe6 /internal/processing/workers/fromclientapi.go | |
parent | [chore] update Docker container to use new go swagger hash (#2872) (diff) | |
download | gotosocial-c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9.tar.xz |
[performance] update remaining worker pools to use queues (#2865)
* start replacing client + federator + media workers with new worker + queue types
* refactor federatingDB.Delete(), drop queued messages when deleting account / status
* move all queue purging to the processor workers
* undo toolchain updates
* code comments, ensure dereferencer worker pool gets started
* update gruf libraries in readme
* start the job scheduler separately to the worker pools
* reshuffle ordering or server.go + remove duplicate worker start / stop
* update go-list version
* fix vendoring
* move queue invalidation to before wipeing / deletion, to ensure queued work not dropped
* add logging to worker processing functions in testrig, don't start workers in unexpected places
* update go-structr to add (+then rely on) QueueCtx{} type
* ensure more worker pools get started properly in tests
* fix remaining broken tests relying on worker queue logic
* fix account test suite queue popping logic, ensure noop workers do not pull from queue
* move back accidentally shuffled account deletion order
* ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete()
* silently drop deletes from accounts not permitted to
* don't warn log on forwarded deletes
* make if else clauses easier to parse
* use getFederatorMsg()
* improved code comment
* improved code comment re: requesting account delete checks
* remove boolean result from worker start / stop since false = already running or already stopped
* remove optional passed-in http.client
* remove worker starting from the admin CLI commands (we don't need to handle side-effects)
* update prune cli to start scheduler but not all of the workers
* fix rebase issues
* remove redundant return statements
* i'm sorry sir linter
Diffstat (limited to 'internal/processing/workers/fromclientapi.go')
-rw-r--r-- | internal/processing/workers/fromclientapi.go | 143 |
1 files changed, 81 insertions, 62 deletions
diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index 1412ea003..4564afbb9 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -25,7 +25,6 @@ import ( "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -45,29 +44,15 @@ type clientAPI struct { surface *surface federate *federate account *account.Processor - utilF *utilF + utils *utils } -func (p *Processor) EnqueueClientAPI(cctx context.Context, msgs ...messages.FromClientAPI) { - _ = p.workers.ClientAPI.MustEnqueueCtx(cctx, func(wctx context.Context) { - // Copy caller ctx values to worker's. - wctx = gtscontext.WithValues(wctx, cctx) - - // Process worker messages. - for _, msg := range msgs { - if err := p.ProcessFromClientAPI(wctx, msg); err != nil { - log.Errorf(wctx, "error processing client API message: %v", err) - } - } - }) -} - -func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg *messages.FromClientAPI) error { // Allocate new log fields slice fields := make([]kv.Field, 3, 4) fields[0] = kv.Field{"activityType", cMsg.APActivityType} fields[1] = kv.Field{"objectType", cMsg.APObjectType} - fields[2] = kv.Field{"fromAccount", cMsg.OriginAccount.Username} + fields[2] = kv.Field{"fromAccount", cMsg.Origin.Username} // Include GTSModel in logs if appropriate. if cMsg.GTSModel != nil && @@ -217,7 +202,7 @@ func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg messages.From return gtserror.Newf("unhandled: %s %s", cMsg.APActivityType, cMsg.APObjectType) } -func (p *clientAPI) CreateAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) CreateAccount(ctx context.Context, cMsg *messages.FromClientAPI) error { newUser, ok := cMsg.GTSModel.(*gtsmodel.User) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.User", cMsg.GTSModel) @@ -241,14 +226,14 @@ func (p *clientAPI) CreateAccount(ctx context.Context, cMsg messages.FromClientA return nil } -func (p *clientAPI) CreateStatus(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientAPI) error { status, ok := cMsg.GTSModel.(*gtsmodel.Status) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) } // Update stats for the actor account. - if err := p.utilF.incrementStatusesCount(ctx, cMsg.OriginAccount, status); err != nil { + if err := p.utils.incrementStatusesCount(ctx, cMsg.Origin, status); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -269,7 +254,7 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg messages.FromClientAP return nil } -func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClientAPI) error { // Cast the create poll vote attached to message. vote, ok := cMsg.GTSModel.(*gtsmodel.PollVote) if !ok { @@ -310,14 +295,14 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg messages.FromClient return nil } -func (p *clientAPI) CreateFollowReq(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) CreateFollowReq(ctx context.Context, cMsg *messages.FromClientAPI) error { followRequest, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel) } // Update stats for the target account. - if err := p.utilF.incrementFollowRequestsCount(ctx, cMsg.TargetAccount); err != nil { + if err := p.utils.incrementFollowRequestsCount(ctx, cMsg.Target); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -338,7 +323,7 @@ func (p *clientAPI) CreateFollowReq(ctx context.Context, cMsg messages.FromClien return nil } -func (p *clientAPI) CreateLike(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI) error { fave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) @@ -364,14 +349,14 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg messages.FromClientAPI) return nil } -func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error { boost, ok := cMsg.GTSModel.(*gtsmodel.Status) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) } // Update stats for the actor account. - if err := p.utilF.incrementStatusesCount(ctx, cMsg.OriginAccount, boost); err != nil { + if err := p.utils.incrementStatusesCount(ctx, cMsg.Origin, boost); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -396,7 +381,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg messages.FromClient return nil } -func (p *clientAPI) CreateBlock(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) CreateBlock(ctx context.Context, cMsg *messages.FromClientAPI) error { block, ok := cMsg.GTSModel.(*gtsmodel.Block) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) @@ -430,7 +415,7 @@ func (p *clientAPI) CreateBlock(ctx context.Context, cMsg messages.FromClientAPI return nil } -func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientAPI) error { // Cast the updated Status model attached to msg. status, ok := cMsg.GTSModel.(*gtsmodel.Status) if !ok { @@ -462,7 +447,7 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg messages.FromClientAP return nil } -func (p *clientAPI) UpdateAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) UpdateAccount(ctx context.Context, cMsg *messages.FromClientAPI) error { account, ok := cMsg.GTSModel.(*gtsmodel.Account) if !ok { return gtserror.Newf("cannot cast %T -> *gtsmodel.Account", cMsg.GTSModel) @@ -475,7 +460,7 @@ func (p *clientAPI) UpdateAccount(ctx context.Context, cMsg messages.FromClientA return nil } -func (p *clientAPI) UpdateReport(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) UpdateReport(ctx context.Context, cMsg *messages.FromClientAPI) error { report, ok := cMsg.GTSModel.(*gtsmodel.Report) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel) @@ -494,23 +479,23 @@ func (p *clientAPI) UpdateReport(ctx context.Context, cMsg messages.FromClientAP return nil } -func (p *clientAPI) AcceptFollow(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) AcceptFollow(ctx context.Context, cMsg *messages.FromClientAPI) error { follow, ok := cMsg.GTSModel.(*gtsmodel.Follow) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel) } // Update stats for the target account. - if err := p.utilF.decrementFollowRequestsCount(ctx, cMsg.TargetAccount); err != nil { + if err := p.utils.decrementFollowRequestsCount(ctx, cMsg.Target); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } - if err := p.utilF.incrementFollowersCount(ctx, cMsg.TargetAccount); err != nil { + if err := p.utils.incrementFollowersCount(ctx, cMsg.Target); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } // Update stats for the origin account. - if err := p.utilF.incrementFollowingCount(ctx, cMsg.OriginAccount); err != nil { + if err := p.utils.incrementFollowingCount(ctx, cMsg.Origin); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -525,14 +510,14 @@ func (p *clientAPI) AcceptFollow(ctx context.Context, cMsg messages.FromClientAP return nil } -func (p *clientAPI) RejectFollowRequest(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) RejectFollowRequest(ctx context.Context, cMsg *messages.FromClientAPI) error { followReq, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel) } // Update stats for the target account. - if err := p.utilF.decrementFollowRequestsCount(ctx, cMsg.TargetAccount); err != nil { + if err := p.utils.decrementFollowRequestsCount(ctx, cMsg.Target); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -546,19 +531,19 @@ func (p *clientAPI) RejectFollowRequest(ctx context.Context, cMsg messages.FromC return nil } -func (p *clientAPI) UndoFollow(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) UndoFollow(ctx context.Context, cMsg *messages.FromClientAPI) error { follow, ok := cMsg.GTSModel.(*gtsmodel.Follow) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel) } // Update stats for the origin account. - if err := p.utilF.decrementFollowingCount(ctx, cMsg.OriginAccount); err != nil { + if err := p.utils.decrementFollowingCount(ctx, cMsg.Origin); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } // Update stats for the target account. - if err := p.utilF.decrementFollowersCount(ctx, cMsg.TargetAccount); err != nil { + if err := p.utils.decrementFollowersCount(ctx, cMsg.Target); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -569,7 +554,7 @@ func (p *clientAPI) UndoFollow(ctx context.Context, cMsg messages.FromClientAPI) return nil } -func (p *clientAPI) UndoBlock(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) UndoBlock(ctx context.Context, cMsg *messages.FromClientAPI) error { block, ok := cMsg.GTSModel.(*gtsmodel.Block) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) @@ -582,7 +567,7 @@ func (p *clientAPI) UndoBlock(ctx context.Context, cMsg messages.FromClientAPI) return nil } -func (p *clientAPI) UndoFave(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) UndoFave(ctx context.Context, cMsg *messages.FromClientAPI) error { statusFave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) @@ -599,7 +584,7 @@ func (p *clientAPI) UndoFave(ctx context.Context, cMsg messages.FromClientAPI) e return nil } -func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error { status, ok := cMsg.GTSModel.(*gtsmodel.Status) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) @@ -610,7 +595,7 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg messages.FromClientAP } // Update stats for the origin account. - if err := p.utilF.decrementStatusesCount(ctx, cMsg.OriginAccount); err != nil { + if err := p.utils.decrementStatusesCount(ctx, cMsg.Origin); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -629,7 +614,7 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg messages.FromClientAP return nil } -func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg *messages.FromClientAPI) error { // Don't delete attachments, just unattach them: // this request comes from the client API and the // poster may want to use attachments again later. @@ -648,12 +633,26 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg messages.FromClientAP return gtserror.Newf("db error populating status: %w", err) } - if err := p.utilF.wipeStatus(ctx, status, deleteAttachments); err != nil { + // Drop any outgoing queued AP requests about / targeting + // this status, (stops queued likes, boosts, creates etc). + p.state.Workers.Delivery.Queue.Delete("ObjectID", status.URI) + p.state.Workers.Delivery.Queue.Delete("TargetID", status.URI) + + // Drop any incoming queued client messages about / targeting + // status, (stops processing of local origin data for status). + p.state.Workers.Client.Queue.Delete("TargetURI", status.URI) + + // Drop any incoming queued federator messages targeting status, + // (stops processing of remote origin data targeting this status). + p.state.Workers.Federator.Queue.Delete("TargetURI", status.URI) + + // First perform the actual status deletion. + if err := p.utils.wipeStatus(ctx, status, deleteAttachments); err != nil { log.Errorf(ctx, "error wiping status: %v", err) } // Update stats for the origin account. - if err := p.utilF.decrementStatusesCount(ctx, cMsg.OriginAccount); err != nil { + if err := p.utils.decrementStatusesCount(ctx, cMsg.Origin); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -670,7 +669,7 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg messages.FromClientAP return nil } -func (p *clientAPI) DeleteAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) DeleteAccount(ctx context.Context, cMsg *messages.FromClientAPI) error { // The originID of the delete, one of: // - ID of a domain block, for which // this account delete is a side effect. @@ -684,21 +683,41 @@ func (p *clientAPI) DeleteAccount(ctx context.Context, cMsg messages.FromClientA } else { // Origin is whichever account // originated this message. - originID = cMsg.OriginAccount.ID + originID = cMsg.Origin.ID } - if err := p.federate.DeleteAccount(ctx, cMsg.TargetAccount); err != nil { + // Extract target account. + account := cMsg.Target + + // Drop any outgoing queued AP requests to / from / targeting + // this account, (stops queued likes, boosts, creates etc). + p.state.Workers.Delivery.Queue.Delete("ActorID", account.URI) + p.state.Workers.Delivery.Queue.Delete("ObjectID", account.URI) + p.state.Workers.Delivery.Queue.Delete("TargetID", account.URI) + + // Drop any incoming queued client messages to / from this + // account, (stops processing of local origin data for acccount). + p.state.Workers.Client.Queue.Delete("Origin.ID", account.ID) + p.state.Workers.Client.Queue.Delete("Target.ID", account.ID) + p.state.Workers.Client.Queue.Delete("TargetURI", account.URI) + + // Drop any incoming queued federator messages to this account, + // (stops processing of remote origin data targeting this account). + p.state.Workers.Federator.Queue.Delete("Receiving.ID", account.ID) + p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) + + if err := p.federate.DeleteAccount(ctx, cMsg.Target); err != nil { log.Errorf(ctx, "error federating account delete: %v", err) } - if err := p.account.Delete(ctx, cMsg.TargetAccount, originID); err != nil { + if err := p.account.Delete(ctx, cMsg.Target, originID); err != nil { log.Errorf(ctx, "error deleting account: %v", err) } return nil } -func (p *clientAPI) ReportAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) ReportAccount(ctx context.Context, cMsg *messages.FromClientAPI) error { report, ok := cMsg.GTSModel.(*gtsmodel.Report) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel) @@ -719,28 +738,28 @@ func (p *clientAPI) ReportAccount(ctx context.Context, cMsg messages.FromClientA return nil } -func (p *clientAPI) MoveAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) MoveAccount(ctx context.Context, cMsg *messages.FromClientAPI) error { // Redirect each local follower of // OriginAccount to follow move target. - p.utilF.redirectFollowers(ctx, cMsg.OriginAccount, cMsg.TargetAccount) + p.utils.redirectFollowers(ctx, cMsg.Origin, cMsg.Target) // At this point, we know OriginAccount has the // Move set on it. Just make sure it's populated. - if err := p.state.DB.PopulateMove(ctx, cMsg.OriginAccount.Move); err != nil { + if err := p.state.DB.PopulateMove(ctx, cMsg.Origin.Move); err != nil { return gtserror.Newf("error populating Move: %w", err) } // Now send the Move message out to // OriginAccount's (remote) followers. - if err := p.federate.MoveAccount(ctx, cMsg.OriginAccount); err != nil { + if err := p.federate.MoveAccount(ctx, cMsg.Origin); err != nil { return gtserror.Newf("error federating account move: %w", err) } // Mark the move attempt as successful. - cMsg.OriginAccount.Move.SucceededAt = cMsg.OriginAccount.Move.AttemptedAt + cMsg.Origin.Move.SucceededAt = cMsg.Origin.Move.AttemptedAt if err := p.state.DB.UpdateMove( ctx, - cMsg.OriginAccount.Move, + cMsg.Origin.Move, "succeeded_at", ); err != nil { return gtserror.Newf("error marking move as successful: %w", err) @@ -749,7 +768,7 @@ func (p *clientAPI) MoveAccount(ctx context.Context, cMsg messages.FromClientAPI return nil } -func (p *clientAPI) AcceptAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) AcceptAccount(ctx context.Context, cMsg *messages.FromClientAPI) error { newUser, ok := cMsg.GTSModel.(*gtsmodel.User) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.User", cMsg.GTSModel) @@ -772,17 +791,17 @@ func (p *clientAPI) AcceptAccount(ctx context.Context, cMsg messages.FromClientA return nil } -func (p *clientAPI) RejectAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +func (p *clientAPI) RejectAccount(ctx context.Context, cMsg *messages.FromClientAPI) error { deniedUser, ok := cMsg.GTSModel.(*gtsmodel.DeniedUser) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.DeniedUser", cMsg.GTSModel) } // Remove the account. - if err := p.state.DB.DeleteAccount(ctx, cMsg.TargetAccount.ID); err != nil { + if err := p.state.DB.DeleteAccount(ctx, cMsg.Target.ID); err != nil { log.Errorf(ctx, "db error deleting account %s: %v", - cMsg.TargetAccount.ID, err, + cMsg.Target.ID, err, ) } |