diff options
Diffstat (limited to 'internal/workers')
-rw-r--r-- | internal/workers/worker_fn.go | 142 | ||||
-rw-r--r-- | internal/workers/worker_msg.go | 157 | ||||
-rw-r--r-- | internal/workers/workers.go | 108 |
3 files changed, 337 insertions, 70 deletions
diff --git a/internal/workers/worker_fn.go b/internal/workers/worker_fn.go new file mode 100644 index 000000000..c41bee2b0 --- /dev/null +++ b/internal/workers/worker_fn.go @@ -0,0 +1,142 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + + "codeberg.org/gruf/go-runners" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/queue" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// FnWorkerPool wraps multiple FnWorker{}s in +// a singular struct for easy multi start / stop. +type FnWorkerPool struct { + + // Queue is embedded queue.SimpleQueue{} + // passed to each of the pool Worker{}s. + Queue queue.SimpleQueue[func(context.Context)] + + // internal fields. + workers []*FnWorker +} + +// Start will attempt to start 'n' FnWorker{}s. +func (p *FnWorkerPool) Start(n int) { + // Check whether workers are + // set (is already running). + ok := (len(p.workers) > 0) + if ok { + return + } + + // Allocate new workers slice. + p.workers = make([]*FnWorker, n) + for i := range p.workers { + + // Allocate new FnWorker{}. + p.workers[i] = new(FnWorker) + p.workers[i].Queue = &p.Queue + + // Attempt to start worker. + // Return bool not useful + // here, as true = started, + // false = already running. + _ = p.workers[i].Start() + } +} + +// Stop will attempt to stop contained FnWorker{}s. +func (p *FnWorkerPool) Stop() { + // Check whether workers are + // set (is currently running). + ok := (len(p.workers) == 0) + if ok { + return + } + + // Stop all running workers. + for i := range p.workers { + + // return bool not useful + // here, as true = stopped, + // false = never running. + _ = p.workers[i].Stop() + } + + // Unset workers slice. + p.workers = p.workers[:0] +} + +// FnWorker wraps a queue.SimpleQueue{} which +// it feeds from to provide it with function +// tasks to execute. It does so in a single +// goroutine with state management utilities. +type FnWorker struct { + + // Queue is the fn queue that FnWorker + // will feed from for upcoming tasks. + Queue *queue.SimpleQueue[func(context.Context)] + + // internal fields. + service runners.Service +} + +// Start will attempt to start the Worker{}. +func (w *FnWorker) Start() bool { + return w.service.GoRun(w.run) +} + +// Stop will attempt to stop the Worker{}. +func (w *FnWorker) Stop() bool { + return w.service.Stop() +} + +// run wraps process to restart on any panic. +func (w *FnWorker) run(ctx context.Context) { + if w.Queue == nil { + panic("not yet initialized") + } + log.Infof(ctx, "%p: starting worker", w) + defer log.Infof(ctx, "%p: stopped worker", w) + util.Must(func() { w.process(ctx) }) +} + +// process is the main delivery worker processing routine. +func (w *FnWorker) process(ctx context.Context) { + if w.Queue == nil { + // we perform this check here just + // to ensure the compiler knows these + // variables aren't nil in the loop, + // even if already checked by caller. + panic("not yet initialized") + } + + for { + // Block until pop next func. + fn, ok := w.Queue.PopCtx(ctx) + if !ok { + return + } + + // run! + fn(ctx) + } +} diff --git a/internal/workers/worker_msg.go b/internal/workers/worker_msg.go new file mode 100644 index 000000000..1920c964e --- /dev/null +++ b/internal/workers/worker_msg.go @@ -0,0 +1,157 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + + "codeberg.org/gruf/go-runners" + "codeberg.org/gruf/go-structr" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/queue" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// MsgWorkerPool wraps multiple MsgWorker{}s in +// a singular struct for easy multi start / stop. +type MsgWorkerPool[Msg any] struct { + + // Process handles queued message types. + Process func(context.Context, Msg) error + + // Queue is embedded queue.StructQueue{} + // passed to each of the pool Worker{}s. + Queue queue.StructQueue[Msg] + + // internal fields. + workers []*MsgWorker[Msg] +} + +// Init will initialize the worker pool queue with given struct indices. +func (p *MsgWorkerPool[T]) Init(indices []structr.IndexConfig) { + p.Queue.Init(structr.QueueConfig[T]{Indices: indices}) +} + +// Start will attempt to start 'n' Worker{}s. +func (p *MsgWorkerPool[T]) Start(n int) { + // Check whether workers are + // set (is already running). + ok := (len(p.workers) > 0) + if ok { + return + } + + // Allocate new msg workers slice. + p.workers = make([]*MsgWorker[T], n) + for i := range p.workers { + + // Allocate new MsgWorker[T]{}. + p.workers[i] = new(MsgWorker[T]) + p.workers[i].Process = p.Process + p.workers[i].Queue = &p.Queue + + // Attempt to start worker. + // Return bool not useful + // here, as true = started, + // false = already running. + _ = p.workers[i].Start() + } +} + +// Stop will attempt to stop contained Worker{}s. +func (p *MsgWorkerPool[T]) Stop() { + // Check whether workers are + // set (is currently running). + ok := (len(p.workers) == 0) + if ok { + return + } + + // Stop all running workers. + for i := range p.workers { + + // return bool not useful + // here, as true = stopped, + // false = never running. + _ = p.workers[i].Stop() + } + + // Unset workers slice. + p.workers = p.workers[:0] +} + +// MsgWorker wraps a processing function to +// feed from a queue.StructQueue{} for messages +// to process. It does so in a single goroutine +// with state management utilities. +type MsgWorker[Msg any] struct { + + // Process handles queued message types. + Process func(context.Context, Msg) error + + // Queue is the Delivery{} message queue + // that delivery worker will feed from. + Queue *queue.StructQueue[Msg] + + // internal fields. + service runners.Service +} + +// Start will attempt to start the Worker{}. +func (w *MsgWorker[T]) Start() bool { + return w.service.GoRun(w.run) +} + +// Stop will attempt to stop the Worker{}. +func (w *MsgWorker[T]) Stop() bool { + return w.service.Stop() +} + +// run wraps process to restart on any panic. +func (w *MsgWorker[T]) run(ctx context.Context) { + if w.Process == nil || w.Queue == nil { + panic("not yet initialized") + } + log.Infof(ctx, "%p: starting worker", w) + defer log.Infof(ctx, "%p: stopped worker", w) + util.Must(func() { w.process(ctx) }) +} + +// process is the main delivery worker processing routine. +func (w *MsgWorker[T]) process(ctx context.Context) { + if w.Process == nil || w.Queue == nil { + // we perform this check here just + // to ensure the compiler knows these + // variables aren't nil in the loop, + // even if already checked by caller. + panic("not yet initialized") + } + + for { + // Block until pop next message. + msg, ok := w.Queue.PopCtx(ctx) + if !ok { + return + } + + // Attempt to process popped message type. + if err := w.Process(ctx, msg); err != nil { + log.Errorf(ctx, "%p: error processing: %v", w, err) + } + } +} diff --git a/internal/workers/workers.go b/internal/workers/workers.go index 17728c255..3f4156841 100644 --- a/internal/workers/workers.go +++ b/internal/workers/workers.go @@ -18,11 +18,8 @@ package workers import ( - "context" - "log" "runtime" - "codeberg.org/gruf/go-runners" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/scheduler" @@ -39,77 +36,49 @@ type Workers struct { // indexed queue of Delivery{} objects. Delivery delivery.WorkerPool - // ClientAPI provides a worker pool that handles both - // incoming client actions, and our own side-effects. - ClientAPI runners.WorkerPool - - // Federator provides a worker pool that handles both - // incoming federated actions, and our own side-effects. - Federator runners.WorkerPool - - // Enqueue functions for clientAPI / federator worker pools, - // these are pointers to Processor{}.Enqueue___() msg functions. - // This prevents dependency cycling as Processor depends on Workers. - EnqueueClientAPI func(context.Context, ...messages.FromClientAPI) - EnqueueFediAPI func(context.Context, ...messages.FromFediAPI) - - // Blocking processing functions for clientAPI / federator. - // These are pointers to Processor{}.Process___() msg functions. - // This prevents dependency cycling as Processor depends on Workers. - // - // Rather than queueing messages for asynchronous processing, these - // functions will process immediately and in a blocking manner, and - // will not use up a worker slot. - // - // As such, you should only call them in special cases where something - // synchronous needs to happen before you can do something else. - ProcessFromClientAPI func(context.Context, messages.FromClientAPI) error - ProcessFromFediAPI func(context.Context, messages.FromFediAPI) error - - // Media manager worker pools. - Media runners.WorkerPool + // Client provides a worker pool that handles + // incoming processing jobs from the client API. + Client MsgWorkerPool[*messages.FromClientAPI] + + // Federator provides a worker pool that handles + // incoming processing jobs from the fedi API. + Federator MsgWorkerPool[*messages.FromFediAPI] + + // Dereference provides a worker pool + // for asynchronous dereferencer jobs. + Dereference FnWorkerPool + + // Media provides a worker pool for + // asynchronous media processing jobs. + Media FnWorkerPool // prevent pass-by-value. _ nocopy } -// Start will start all of the contained -// worker pools (and global scheduler). +// StartScheduler starts the job scheduler. +func (w *Workers) StartScheduler() { + _ = w.Scheduler.Start() // false = already running +} + +// Start will start contained worker pools. func (w *Workers) Start() { - // Get currently set GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - - tryUntil("starting scheduler", 5, w.Scheduler.Start) - - tryUntil("start delivery workerpool", 5, func() bool { - n := config.GetAdvancedSenderMultiplier() - if n < 1 { - // clamp min senders to 1. - return w.Delivery.Start(1) - } - return w.Delivery.Start(n * maxprocs) - }) - - tryUntil("starting client API workerpool", 5, func() bool { - return w.ClientAPI.Start(4*maxprocs, 400*maxprocs) - }) - - tryUntil("starting federator workerpool", 5, func() bool { - return w.Federator.Start(4*maxprocs, 400*maxprocs) - }) - - tryUntil("starting media workerpool", 5, func() bool { - return w.Media.Start(8*maxprocs, 80*maxprocs) - }) + w.Delivery.Start(deliveryWorkers(maxprocs)) + w.Client.Start(4 * maxprocs) + w.Federator.Start(4 * maxprocs) + w.Dereference.Start(4 * maxprocs) + w.Media.Start(8 * maxprocs) } // Stop will stop all of the contained worker pools (and global scheduler). func (w *Workers) Stop() { - tryUntil("stopping scheduler", 5, w.Scheduler.Stop) - tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop) - tryUntil("stopping client API workerpool", 5, w.ClientAPI.Stop) - tryUntil("stopping federator workerpool", 5, w.Federator.Stop) - tryUntil("stopping media workerpool", 5, w.Media.Stop) + _ = w.Scheduler.Stop() // false = not running + w.Delivery.Stop() + w.Client.Stop() + w.Federator.Stop() + w.Dereference.Stop() + w.Media.Stop() } // nocopy when embedded will signal linter to @@ -120,12 +89,11 @@ func (*nocopy) Lock() {} func (*nocopy) Unlock() {} -// tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'. -func tryUntil(msg string, count int, do func() bool) { - for i := 0; i < count; i++ { - if do() { - return - } +func deliveryWorkers(maxprocs int) int { + n := config.GetAdvancedSenderMultiplier() + if n < 1 { + // clamp to 1 + return 1 } - log.Panicf("failed %s after %d tries", msg, count) + return n * maxprocs } |