diff options
Diffstat (limited to 'internal/worker/workers.go')
-rw-r--r-- | internal/worker/workers.go | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/internal/worker/workers.go b/internal/worker/workers.go new file mode 100644 index 000000000..d3d6197ed --- /dev/null +++ b/internal/worker/workers.go @@ -0,0 +1,69 @@ +package worker + +import ( + "context" + "errors" + "runtime" + + "codeberg.org/gruf/go-runners" + "github.com/sirupsen/logrus" +) + +// Worker represents a proccessor for MsgType objects, using a worker pool to allocate resources. +type Worker[MsgType any] struct { + workers runners.WorkerPool + process func(context.Context, MsgType) error +} + +// New returns a new Worker[MsgType] with given number of workers and queue size +// (see runners.WorkerPool for more information on args). If args < 1 then suitable +// defaults are determined from the runtime's GOMAXPROCS variable. +func New[MsgType any](workers int, queue int) *Worker[MsgType] { + if workers < 1 { + workers = runtime.GOMAXPROCS(0) + } + if queue < 1 { + queue = workers * 100 + } + return &Worker[MsgType]{ + workers: runners.NewWorkerPool(workers, queue), + process: nil, + } +} + +// Start will attempt to start the underlying worker pool, or return error. +func (w *Worker[MsgType]) Start() error { + if w.process == nil { + return errors.New("nil Worker.process function") + } + if !w.workers.Start() { + return errors.New("failed to start Worker pool") + } + return nil +} + +// Stop will attempt to stop the underlying worker pool, or return error. +func (w *Worker[MsgType]) Stop() error { + if !w.workers.Stop() { + return errors.New("failed to stop Worker pool") + } + return nil +} + +// SetProcessor will set the Worker's processor function, which is called for each queued message. +func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { + if w.process != nil { + logrus.Panic("Worker.process is already set") + } + w.process = fn +} + +// Queue will queue provided message to be processed with there's a free worker. +func (w *Worker[MsgType]) Queue(msg MsgType) { + logrus.Tracef("queueing %[1]T message; %+[1]v", msg) + w.workers.Enqueue(func(ctx context.Context) { + if err := w.process(ctx, msg); err != nil { + logrus.Error(err) + } + }) +} |