diff options
Diffstat (limited to 'internal/concurrency/workers.go')
-rw-r--r-- | internal/concurrency/workers.go | 42 |
1 files changed, 29 insertions, 13 deletions
diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index 279a0c3c1..377b9e041 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -26,6 +26,7 @@ import ( "reflect" "runtime" + "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-runners" "github.com/superseriousbusiness/gotosocial/internal/log" ) @@ -35,7 +36,7 @@ type WorkerPool[MsgType any] struct { workers runners.WorkerPool process func(context.Context, MsgType) error nw, nq int - prefix string // contains type prefix for logging + wtype string // contains worker type for logging } // New returns a new WorkerPool[MsgType] with given number of workers and queue ratio, @@ -61,12 +62,12 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType process: nil, nw: workers, nq: workers * queueRatio, - prefix: fmt.Sprintf("worker.Worker[%s]", msgType), + wtype: fmt.Sprintf("worker.Worker[%s]", msgType), } - // Log new worker creation with type prefix + // Log new worker creation with worker type prefix log.Infof("%s created with workers=%d queue=%d", - w.prefix, + w.wtype, workers, workers*queueRatio, ) @@ -76,7 +77,7 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType // Start will attempt to start the underlying worker pool, or return error. func (w *WorkerPool[MsgType]) Start() error { - log.Infof("%s starting", w.prefix) + log.Infof("%s starting", w.wtype) // Check processor was set if w.process == nil { @@ -93,7 +94,7 @@ func (w *WorkerPool[MsgType]) Start() error { // Stop will attempt to stop the underlying worker pool, or return error. func (w *WorkerPool[MsgType]) Stop() error { - log.Infof("%s stopping", w.prefix) + log.Infof("%s stopping", w.wtype) // Attempt to stop pool if !w.workers.Stop() { @@ -106,19 +107,34 @@ func (w *WorkerPool[MsgType]) Stop() error { // SetProcessor will set the Worker's processor function, which is called for each queued message. func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { if w.process != nil { - log.Fatalf("%s Worker.process is already set", w.prefix) + log.Panicf("%s Worker.process is already set", w.wtype) } w.process = fn } // Queue will queue provided message to be processed with there's a free worker. func (w *WorkerPool[MsgType]) Queue(msg MsgType) { - log.Tracef("%s queueing message (queue=%d): %+v", - w.prefix, w.workers.Queue(), msg, - ) - w.workers.Enqueue(func(ctx context.Context) { + log.Tracef("%s queueing message: %+v", w.wtype, msg) + + // Create new process function for msg + process := func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { - log.Errorf("%s %v", w.prefix, err) + log.WithFields(kv.Fields{ + kv.Field{K: "type", V: w.wtype}, + kv.Field{K: "error", V: err}, + }...).Error("message processing error") } - }) + } + + // Attempt a fast-enqueue of process + if !w.workers.EnqueueNow(process) { + // No spot acquired, log warning + log.WithFields(kv.Fields{ + kv.Field{K: "type", V: w.wtype}, + kv.Field{K: "queue", V: w.workers.Queue()}, + }...).Warn("full worker queue") + + // Block on enqueuing process func + w.workers.Enqueue(process) + } } |