diff options
author | 2024-04-26 13:50:46 +0100 | |
---|---|---|
committer | 2024-04-26 13:50:46 +0100 | |
commit | c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 (patch) | |
tree | dbd3409070765d5ca81448a574ccd32b4da1ffe6 /cmd | |
parent | [chore] update Docker container to use new go swagger hash (#2872) (diff) | |
download | gotosocial-c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9.tar.xz |
[performance] update remaining worker pools to use queues (#2865)
* start replacing client + federator + media workers with new worker + queue types
* refactor federatingDB.Delete(), drop queued messages when deleting account / status
* move all queue purging to the processor workers
* undo toolchain updates
* code comments, ensure dereferencer worker pool gets started
* update gruf libraries in readme
* start the job scheduler separately to the worker pools
* reshuffle ordering or server.go + remove duplicate worker start / stop
* update go-list version
* fix vendoring
* move queue invalidation to before wipeing / deletion, to ensure queued work not dropped
* add logging to worker processing functions in testrig, don't start workers in unexpected places
* update go-structr to add (+then rely on) QueueCtx{} type
* ensure more worker pools get started properly in tests
* fix remaining broken tests relying on worker queue logic
* fix account test suite queue popping logic, ensure noop workers do not pull from queue
* move back accidentally shuffled account deletion order
* ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete()
* silently drop deletes from accounts not permitted to
* don't warn log on forwarded deletes
* make if else clauses easier to parse
* use getFederatorMsg()
* improved code comment
* improved code comment re: requesting account delete checks
* remove boolean result from worker start / stop since false = already running or already stopped
* remove optional passed-in http.client
* remove worker starting from the admin CLI commands (we don't need to handle side-effects)
* update prune cli to start scheduler but not all of the workers
* fix rebase issues
* remove redundant return statements
* i'm sorry sir linter
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/gotosocial/action/admin/account/account.go | 1 | ||||
-rw-r--r-- | cmd/gotosocial/action/admin/media/prune/common.go | 5 | ||||
-rw-r--r-- | cmd/gotosocial/action/server/server.go | 56 |
3 files changed, 33 insertions, 29 deletions
diff --git a/cmd/gotosocial/action/admin/account/account.go b/cmd/gotosocial/action/admin/account/account.go index 42b324107..0693dd04e 100644 --- a/cmd/gotosocial/action/admin/account/account.go +++ b/cmd/gotosocial/action/admin/account/account.go @@ -39,7 +39,6 @@ func initState(ctx context.Context) (*state.State, error) { var state state.State state.Caches.Init() state.Caches.Start() - state.Workers.Start() // Set the state DB connection dbConn, err := bundb.NewBunDBService(ctx, &state) diff --git a/cmd/gotosocial/action/admin/media/prune/common.go b/cmd/gotosocial/action/admin/media/prune/common.go index ed272984b..fcdb2bcf2 100644 --- a/cmd/gotosocial/action/admin/media/prune/common.go +++ b/cmd/gotosocial/action/admin/media/prune/common.go @@ -44,7 +44,10 @@ func setupPrune(ctx context.Context) (*prune, error) { state.Caches.Init() state.Caches.Start() - state.Workers.Start() + // Scheduler is required for the + // claner, but no other workers + // are needed for this CLI action. + state.Workers.StartScheduler() dbService, err := bundb.NewBunDBService(ctx, &state) if err != nil { diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 5aaccd1c4..22d8b3920 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/filter/spam" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/metrics" "github.com/superseriousbusiness/gotosocial/internal/middleware" tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" @@ -128,25 +129,6 @@ var Start action.GTSAction = func(ctx context.Context) error { TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(), }) - // Initialize delivery worker with http client. - state.Workers.Delivery.Init(client) - - // Initialize workers. - state.Workers.Start() - defer state.Workers.Stop() - - // Add a task to the scheduler to sweep caches. - // Frequency = 1 * minute - // Threshold = 80% capacity - _ = state.Workers.Scheduler.AddRecurring( - "@cachesweep", // id - time.Time{}, // start - time.Minute, // freq - func(context.Context, time.Time) { - state.Caches.Sweep(60) - }, - ) - // Build handlers used in later initializations. mediaManager := media.NewManager(&state) oauthServer := oauth.New(ctx, dbService) @@ -195,10 +177,27 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error starting list timeline: %s", err) } - // Create a media cleaner using the given state. + // Start the job scheduler + // (this is required for cleaner). + state.Workers.StartScheduler() + + // Add a task to the scheduler to sweep caches. + // Frequency = 1 * minute + // Threshold = 60% capacity + _ = state.Workers.Scheduler.AddRecurring( + "@cachesweep", // id + time.Time{}, // start + time.Minute, // freq + func(context.Context, time.Time) { + state.Caches.Sweep(60) + }, + ) + + // Create background cleaner. cleaner := cleaner.New(&state) - // Create the processor using all the other services we've created so far. + // Create the processor using all the + // other services we've created so far. processor := processing.NewProcessor( cleaner, typeConverter, @@ -209,13 +208,16 @@ var Start action.GTSAction = func(ctx context.Context) error { emailSender, ) - // Set state client / federator asynchronous worker enqueue functions - state.Workers.EnqueueClientAPI = processor.Workers().EnqueueClientAPI - state.Workers.EnqueueFediAPI = processor.Workers().EnqueueFediAPI + // Initialize the specialized workers. + state.Workers.Client.Init(messages.ClientMsgIndices()) + state.Workers.Federator.Init(messages.FederatorMsgIndices()) + state.Workers.Delivery.Init(client) + state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI + state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI - // Set state client / federator synchronous processing functions. - state.Workers.ProcessFromClientAPI = processor.Workers().ProcessFromClientAPI - state.Workers.ProcessFromFediAPI = processor.Workers().ProcessFromFediAPI + // Initialize workers. + state.Workers.Start() + defer state.Workers.Stop() // Schedule tasks for all existing poll expiries. if err := processor.Polls().ScheduleAll(ctx); err != nil { |