diff options
Diffstat (limited to 'internal/concurrency/workers.go')
-rw-r--r-- | internal/concurrency/workers.go | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index d9669fcb3..279a0c3c1 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -34,6 +34,7 @@ import ( type WorkerPool[MsgType any] struct { workers runners.WorkerPool process func(context.Context, MsgType) error + nw, nq int prefix string // contains type prefix for logging } @@ -57,8 +58,9 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType _, msgType = path.Split(msgType) w := &WorkerPool[MsgType]{ - workers: runners.NewWorkerPool(workers, workers*queueRatio), process: nil, + nw: workers, + nq: workers * queueRatio, prefix: fmt.Sprintf("worker.Worker[%s]", msgType), } @@ -82,7 +84,7 @@ func (w *WorkerPool[MsgType]) Start() error { } // Attempt to start pool - if !w.workers.Start() { + if !w.workers.Start(w.nw, w.nq) { return errors.New("failed to start Worker pool") } @@ -111,8 +113,8 @@ func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) err // Queue will queue provided message to be processed with there's a free worker. func (w *WorkerPool[MsgType]) Queue(msg MsgType) { - log.Tracef("%s queueing message (workers=%d queue=%d): %+v", - w.prefix, w.workers.Workers(), w.workers.Queue(), msg, + log.Tracef("%s queueing message (queue=%d): %+v", + w.prefix, w.workers.Queue(), msg, ) w.workers.Enqueue(func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { |