diff options
Diffstat (limited to 'internal/workers/workers.go')
-rw-r--r-- | internal/workers/workers.go | 108 |
1 files changed, 38 insertions, 70 deletions
diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 17728c255..3f4156841 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -18,11 +18,8 @@ package workers import ( - "context" - "log" "runtime" - "codeberg.org/gruf/go-runners" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/scheduler" @@ -39,77 +36,49 @@ type Workers struct { // indexed queue of Delivery{} objects. Delivery delivery.WorkerPool - // ClientAPI provides a worker pool that handles both - // incoming client actions, and our own side-effects. - ClientAPI runners.WorkerPool - - // Federator provides a worker pool that handles both - // incoming federated actions, and our own side-effects. - Federator runners.WorkerPool - - // Enqueue functions for clientAPI / federator worker pools, - // these are pointers to Processor{}.Enqueue___() msg functions. - // This prevents dependency cycling as Processor depends on Workers. - EnqueueClientAPI func(context.Context, ...messages.FromClientAPI) - EnqueueFediAPI func(context.Context, ...messages.FromFediAPI) - - // Blocking processing functions for clientAPI / federator. - // These are pointers to Processor{}.Process___() msg functions. - // This prevents dependency cycling as Processor depends on Workers. - // - // Rather than queueing messages for asynchronous processing, these - // functions will process immediately and in a blocking manner, and - // will not use up a worker slot. - // - // As such, you should only call them in special cases where something - // synchronous needs to happen before you can do something else. - ProcessFromClientAPI func(context.Context, messages.FromClientAPI) error - ProcessFromFediAPI func(context.Context, messages.FromFediAPI) error - - // Media manager worker pools. - Media runners.WorkerPool + // Client provides a worker pool that handles + // incoming processing jobs from the client API. + Client MsgWorkerPool[*messages.FromClientAPI] + + // Federator provides a worker pool that handles + // incoming processing jobs from the fedi API. + Federator MsgWorkerPool[*messages.FromFediAPI] + + // Dereference provides a worker pool + // for asynchronous dereferencer jobs. + Dereference FnWorkerPool + + // Media provides a worker pool for + // asynchronous media processing jobs. + Media FnWorkerPool // prevent pass-by-value. _ nocopy } -// Start will start all of the contained -// worker pools (and global scheduler). +// StartScheduler starts the job scheduler. +func (w *Workers) StartScheduler() { + _ = w.Scheduler.Start() // false = already running +} + +// Start will start contained worker pools. func (w *Workers) Start() { - // Get currently set GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - - tryUntil("starting scheduler", 5, w.Scheduler.Start) - - tryUntil("start delivery workerpool", 5, func() bool { - n := config.GetAdvancedSenderMultiplier() - if n < 1 { - // clamp min senders to 1. - return w.Delivery.Start(1) - } - return w.Delivery.Start(n * maxprocs) - }) - - tryUntil("starting client API workerpool", 5, func() bool { - return w.ClientAPI.Start(4*maxprocs, 400*maxprocs) - }) - - tryUntil("starting federator workerpool", 5, func() bool { - return w.Federator.Start(4*maxprocs, 400*maxprocs) - }) - - tryUntil("starting media workerpool", 5, func() bool { - return w.Media.Start(8*maxprocs, 80*maxprocs) - }) + w.Delivery.Start(deliveryWorkers(maxprocs)) + w.Client.Start(4 * maxprocs) + w.Federator.Start(4 * maxprocs) + w.Dereference.Start(4 * maxprocs) + w.Media.Start(8 * maxprocs) } // Stop will stop all of the contained worker pools (and global scheduler). func (w *Workers) Stop() { - tryUntil("stopping scheduler", 5, w.Scheduler.Stop) - tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop) - tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop) - tryUntil("stopping federator workerpool", 5, w.Federator.Stop) - tryUntil("stopping media workerpool", 5, w.Media.Stop) + _ = w.Scheduler.Stop() // false = not running + w.Delivery.Stop() + w.Client.Stop() + w.Federator.Stop() + w.Dereference.Stop() + w.Media.Stop() } // nocopy when embedded will signal linter to @@ -120,12 +89,11 @@ func (*nocopy) Lock() {} func (*nocopy) Unlock() {} -// tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'. -func tryUntil(msg string, count int, do func() bool) { - for i := 0; i < count; i++ { - if do() { - return - } +func deliveryWorkers(maxprocs int) int { + n := config.GetAdvancedSenderMultiplier() + if n < 1 { + // clamp to 1 + return 1 } - log.Panicf("failed %s after %d tries", msg, count) + return n * maxprocs } |