From c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Fri, 26 Apr 2024 13:50:46 +0100 Subject: [performance] update remaining worker pools to use queues (#2865) * start replacing client + federator + media workers with new worker + queue types * refactor federatingDB.Delete(), drop queued messages when deleting account / status * move all queue purging to the processor workers * undo toolchain updates * code comments, ensure dereferencer worker pool gets started * update gruf libraries in readme * start the job scheduler separately to the worker pools * reshuffle ordering or server.go + remove duplicate worker start / stop * update go-list version * fix vendoring * move queue invalidation to before wipeing / deletion, to ensure queued work not dropped * add logging to worker processing functions in testrig, don't start workers in unexpected places * update go-structr to add (+then rely on) QueueCtx{} type * ensure more worker pools get started properly in tests * fix remaining broken tests relying on worker queue logic * fix account test suite queue popping logic, ensure noop workers do not pull from queue * move back accidentally shuffled account deletion order * ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete() * silently drop deletes from accounts not permitted to * don't warn log on forwarded deletes * make if else clauses easier to parse * use getFederatorMsg() * improved code comment * improved code comment re: requesting account delete checks * remove boolean result from worker start / stop since false = already running or already stopped * remove optional passed-in http.client * remove worker starting from the admin CLI commands (we don't need to handle side-effects) * update prune cli to start scheduler but not all of the workers * fix rebase issues * remove redundant return statements * i'm sorry sir linter --- cmd/gotosocial/action/admin/account/account.go | 1 - cmd/gotosocial/action/admin/media/prune/common.go | 5 +- cmd/gotosocial/action/server/server.go | 56 ++++++++++++----------- 3 files changed, 33 insertions(+), 29 deletions(-) (limited to 'cmd') diff --git a/cmd/gotosocial/action/admin/account/account.go b/cmd/gotosocial/action/admin/account/account.go index 42b324107..0693dd04e 100644 --- a/cmd/gotosocial/action/admin/account/account.go +++ b/cmd/gotosocial/action/admin/account/account.go @@ -39,7 +39,6 @@ func initState(ctx context.Context) (*state.State, error) { var state state.State state.Caches.Init() state.Caches.Start() - state.Workers.Start() // Set the state DB connection dbConn, err := bundb.NewBunDBService(ctx, &state) diff --git a/cmd/gotosocial/action/admin/media/prune/common.go b/cmd/gotosocial/action/admin/media/prune/common.go index ed272984b..fcdb2bcf2 100644 --- a/cmd/gotosocial/action/admin/media/prune/common.go +++ b/cmd/gotosocial/action/admin/media/prune/common.go @@ -44,7 +44,10 @@ func setupPrune(ctx context.Context) (*prune, error) { state.Caches.Init() state.Caches.Start() - state.Workers.Start() + // Scheduler is required for the + // claner, but no other workers + // are needed for this CLI action. + state.Workers.StartScheduler() dbService, err := bundb.NewBunDBService(ctx, &state) if err != nil { diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 5aaccd1c4..22d8b3920 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/filter/spam" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/metrics" "github.com/superseriousbusiness/gotosocial/internal/middleware" tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" @@ -128,25 +129,6 @@ var Start action.GTSAction = func(ctx context.Context) error { TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(), }) - // Initialize delivery worker with http client. - state.Workers.Delivery.Init(client) - - // Initialize workers. - state.Workers.Start() - defer state.Workers.Stop() - - // Add a task to the scheduler to sweep caches. - // Frequency = 1 * minute - // Threshold = 80% capacity - _ = state.Workers.Scheduler.AddRecurring( - "@cachesweep", // id - time.Time{}, // start - time.Minute, // freq - func(context.Context, time.Time) { - state.Caches.Sweep(60) - }, - ) - // Build handlers used in later initializations. mediaManager := media.NewManager(&state) oauthServer := oauth.New(ctx, dbService) @@ -195,10 +177,27 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error starting list timeline: %s", err) } - // Create a media cleaner using the given state. + // Start the job scheduler + // (this is required for cleaner). + state.Workers.StartScheduler() + + // Add a task to the scheduler to sweep caches. + // Frequency = 1 * minute + // Threshold = 60% capacity + _ = state.Workers.Scheduler.AddRecurring( + "@cachesweep", // id + time.Time{}, // start + time.Minute, // freq + func(context.Context, time.Time) { + state.Caches.Sweep(60) + }, + ) + + // Create background cleaner. cleaner := cleaner.New(&state) - // Create the processor using all the other services we've created so far. + // Create the processor using all the + // other services we've created so far. processor := processing.NewProcessor( cleaner, typeConverter, @@ -209,13 +208,16 @@ var Start action.GTSAction = func(ctx context.Context) error { emailSender, ) - // Set state client / federator asynchronous worker enqueue functions - state.Workers.EnqueueClientAPI = processor.Workers().EnqueueClientAPI - state.Workers.EnqueueFediAPI = processor.Workers().EnqueueFediAPI + // Initialize the specialized workers. + state.Workers.Client.Init(messages.ClientMsgIndices()) + state.Workers.Federator.Init(messages.FederatorMsgIndices()) + state.Workers.Delivery.Init(client) + state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI + state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI - // Set state client / federator synchronous processing functions. - state.Workers.ProcessFromClientAPI = processor.Workers().ProcessFromClientAPI - state.Workers.ProcessFromFediAPI = processor.Workers().ProcessFromFediAPI + // Initialize workers. + state.Workers.Start() + defer state.Workers.Stop() // Schedule tasks for all existing poll expiries. if err := processor.Polls().ScheduleAll(ctx); err != nil { -- cgit v1.2.3