From c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Fri, 26 Apr 2024 13:50:46 +0100 Subject: [performance] update remaining worker pools to use queues (#2865) * start replacing client + federator + media workers with new worker + queue types * refactor federatingDB.Delete(), drop queued messages when deleting account / status * move all queue purging to the processor workers * undo toolchain updates * code comments, ensure dereferencer worker pool gets started * update gruf libraries in readme * start the job scheduler separately to the worker pools * reshuffle ordering or server.go + remove duplicate worker start / stop * update go-list version * fix vendoring * move queue invalidation to before wipeing / deletion, to ensure queued work not dropped * add logging to worker processing functions in testrig, don't start workers in unexpected places * update go-structr to add (+then rely on) QueueCtx{} type * ensure more worker pools get started properly in tests * fix remaining broken tests relying on worker queue logic * fix account test suite queue popping logic, ensure noop workers do not pull from queue * move back accidentally shuffled account deletion order * ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete() * silently drop deletes from accounts not permitted to * don't warn log on forwarded deletes * make if else clauses easier to parse * use getFederatorMsg() * improved code comment * improved code comment re: requesting account delete checks * remove boolean result from worker start / stop since false = already running or already stopped * remove optional passed-in http.client * remove worker starting from the admin CLI commands (we don't need to handle side-effects) * update prune cli to start scheduler but not all of the workers * fix rebase issues * remove redundant return statements * i'm sorry sir linter --- internal/workers/workers.go | 108 ++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 70 deletions(-) (limited to 'internal/workers/workers.go') diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 17728c255..3f4156841 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -18,11 +18,8 @@ package workers import ( - "context" - "log" "runtime" - "codeberg.org/gruf/go-runners" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/scheduler" @@ -39,77 +36,49 @@ type Workers struct { // indexed queue of Delivery{} objects. Delivery delivery.WorkerPool - // ClientAPI provides a worker pool that handles both - // incoming client actions, and our own side-effects. - ClientAPI runners.WorkerPool - - // Federator provides a worker pool that handles both - // incoming federated actions, and our own side-effects. - Federator runners.WorkerPool - - // Enqueue functions for clientAPI / federator worker pools, - // these are pointers to Processor{}.Enqueue___() msg functions. - // This prevents dependency cycling as Processor depends on Workers. - EnqueueClientAPI func(context.Context, ...messages.FromClientAPI) - EnqueueFediAPI func(context.Context, ...messages.FromFediAPI) - - // Blocking processing functions for clientAPI / federator. - // These are pointers to Processor{}.Process___() msg functions. - // This prevents dependency cycling as Processor depends on Workers. - // - // Rather than queueing messages for asynchronous processing, these - // functions will process immediately and in a blocking manner, and - // will not use up a worker slot. - // - // As such, you should only call them in special cases where something - // synchronous needs to happen before you can do something else. - ProcessFromClientAPI func(context.Context, messages.FromClientAPI) error - ProcessFromFediAPI func(context.Context, messages.FromFediAPI) error - - // Media manager worker pools. - Media runners.WorkerPool + // Client provides a worker pool that handles + // incoming processing jobs from the client API. + Client MsgWorkerPool[*messages.FromClientAPI] + + // Federator provides a worker pool that handles + // incoming processing jobs from the fedi API. + Federator MsgWorkerPool[*messages.FromFediAPI] + + // Dereference provides a worker pool + // for asynchronous dereferencer jobs. + Dereference FnWorkerPool + + // Media provides a worker pool for + // asynchronous media processing jobs. + Media FnWorkerPool // prevent pass-by-value. _ nocopy } -// Start will start all of the contained -// worker pools (and global scheduler). +// StartScheduler starts the job scheduler. +func (w *Workers) StartScheduler() { + _ = w.Scheduler.Start() // false = already running +} + +// Start will start contained worker pools. func (w *Workers) Start() { - // Get currently set GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - - tryUntil("starting scheduler", 5, w.Scheduler.Start) - - tryUntil("start delivery workerpool", 5, func() bool { - n := config.GetAdvancedSenderMultiplier() - if n < 1 { - // clamp min senders to 1. - return w.Delivery.Start(1) - } - return w.Delivery.Start(n * maxprocs) - }) - - tryUntil("starting client API workerpool", 5, func() bool { - return w.ClientAPI.Start(4*maxprocs, 400*maxprocs) - }) - - tryUntil("starting federator workerpool", 5, func() bool { - return w.Federator.Start(4*maxprocs, 400*maxprocs) - }) - - tryUntil("starting media workerpool", 5, func() bool { - return w.Media.Start(8*maxprocs, 80*maxprocs) - }) + w.Delivery.Start(deliveryWorkers(maxprocs)) + w.Client.Start(4 * maxprocs) + w.Federator.Start(4 * maxprocs) + w.Dereference.Start(4 * maxprocs) + w.Media.Start(8 * maxprocs) } // Stop will stop all of the contained worker pools (and global scheduler). func (w *Workers) Stop() { - tryUntil("stopping scheduler", 5, w.Scheduler.Stop) - tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop) - tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop) - tryUntil("stopping federator workerpool", 5, w.Federator.Stop) - tryUntil("stopping media workerpool", 5, w.Media.Stop) + _ = w.Scheduler.Stop() // false = not running + w.Delivery.Stop() + w.Client.Stop() + w.Federator.Stop() + w.Dereference.Stop() + w.Media.Stop() } // nocopy when embedded will signal linter to @@ -120,12 +89,11 @@ func (*nocopy) Lock() {} func (*nocopy) Unlock() {} -// tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'. -func tryUntil(msg string, count int, do func() bool) { - for i := 0; i < count; i++ { - if do() { - return - } +func deliveryWorkers(maxprocs int) int { + n := config.GetAdvancedSenderMultiplier() + if n < 1 { + // clamp to 1 + return 1 } - log.Panicf("failed %s after %d tries", msg, count) + return n * maxprocs } -- cgit v1.2.3