diff options
Diffstat (limited to 'internal/worker/workers.go')
-rw-r--r-- | internal/worker/workers.go | 44 |
1 files changed, 29 insertions, 15 deletions
diff --git a/internal/worker/workers.go b/internal/worker/workers.go index ac329f8f7..6adf9ad30 100644 --- a/internal/worker/workers.go +++ b/internal/worker/workers.go @@ -3,6 +3,8 @@ package worker import ( "context" "errors" + "fmt" + "path" "reflect" "runtime" @@ -17,34 +19,44 @@ type Worker[MsgType any] struct { prefix string // contains type prefix for logging } -// New returns a new Worker[MsgType] with given number of workers and queue size -// (see runners.WorkerPool for more information on args). If args < 1 then suitable -// defaults are determined from the runtime's GOMAXPROCS variable. -func New[MsgType any](workers int, queue int) *Worker[MsgType] { +// New returns a new Worker[MsgType] with given number of workers and queue ratio, +// where the queue ratio is multiplied by no. workers to get queue size. If args < 1 +// then suitable defaults are determined from the runtime's GOMAXPROCS variable. +func New[MsgType any](workers int, queueRatio int) *Worker[MsgType] { + var zero MsgType + if workers < 1 { // ensure sensible workers workers = runtime.GOMAXPROCS(0) } - if queue < 1 { - // ensure sensible queue - queue = workers * 100 + if queueRatio < 1 { + // ensure sensible ratio + queueRatio = 100 } + // Calculate the short type string for the msg type + msgType := reflect.TypeOf(zero).String() + _, msgType = path.Split(msgType) + w := &Worker[MsgType]{ - workers: runners.NewWorkerPool(workers, queue), + workers: runners.NewWorkerPool(workers, workers*queueRatio), process: nil, - prefix: reflect.TypeOf(Worker[MsgType]{}).String(), //nolint + prefix: fmt.Sprintf("worker.Worker[%s]", msgType), } // Log new worker creation with type prefix - logrus.Infof("%s created with workers=%d queue=%d", w.prefix, workers, queue) + logrus.Infof("%s created with workers=%d queue=%d", + w.prefix, + workers, + workers*queueRatio, + ) return w } // Start will attempt to start the underlying worker pool, or return error. func (w *Worker[MsgType]) Start() error { - logrus.Info(w.prefix, "starting") + logrus.Infof("%s starting", w.prefix) // Check processor was set if w.process == nil { @@ -61,7 +73,7 @@ func (w *Worker[MsgType]) Start() error { // Stop will attempt to stop the underlying worker pool, or return error. func (w *Worker[MsgType]) Stop() error { - logrus.Info(w.prefix, "stopping") + logrus.Infof("%s stopping", w.prefix) // Attempt to stop pool if !w.workers.Stop() { @@ -74,17 +86,19 @@ func (w *Worker[MsgType]) Stop() error { // SetProcessor will set the Worker's processor function, which is called for each queued message. func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { if w.process != nil { - logrus.Panic(w.prefix, "Worker.process is already set") + logrus.Panicf("%s Worker.process is already set", w.prefix) } w.process = fn } // Queue will queue provided message to be processed with there's a free worker. func (w *Worker[MsgType]) Queue(msg MsgType) { - logrus.Tracef("%s queueing message: %+v", w.prefix, msg) + logrus.Tracef("%s queueing message (workers=%d queue=%d): %+v", + w.prefix, w.workers.Workers(), w.workers.Queue(), msg, + ) w.workers.Enqueue(func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { - logrus.Error(err) + logrus.Errorf("%s %v", w.prefix, err) } }) } |