summaryrefslogtreecommitdiff
path: root/internal/concurrency/workers.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/concurrency/workers.go')
-rw-r--r--internal/concurrency/workers.go10
1 files changed, 6 insertions, 4 deletions
diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go
index d9669fcb3..279a0c3c1 100644
--- a/internal/concurrency/workers.go
+++ b/internal/concurrency/workers.go
@@ -34,6 +34,7 @@ import (
type WorkerPool[MsgType any] struct {
workers runners.WorkerPool
process func(context.Context, MsgType) error
+ nw, nq int
prefix string // contains type prefix for logging
}
@@ -57,8 +58,9 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType
_, msgType = path.Split(msgType)
w := &WorkerPool[MsgType]{
- workers: runners.NewWorkerPool(workers, workers*queueRatio),
process: nil,
+ nw: workers,
+ nq: workers * queueRatio,
prefix: fmt.Sprintf("worker.Worker[%s]", msgType),
}
@@ -82,7 +84,7 @@ func (w *WorkerPool[MsgType]) Start() error {
}
// Attempt to start pool
- if !w.workers.Start() {
+ if !w.workers.Start(w.nw, w.nq) {
return errors.New("failed to start Worker pool")
}
@@ -111,8 +113,8 @@ func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) err
// Queue will queue provided message to be processed with there's a free worker.
func (w *WorkerPool[MsgType]) Queue(msg MsgType) {
- log.Tracef("%s queueing message (workers=%d queue=%d): %+v",
- w.prefix, w.workers.Workers(), w.workers.Queue(), msg,
+ log.Tracef("%s queueing message (queue=%d): %+v",
+ w.prefix, w.workers.Queue(), msg,
)
w.workers.Enqueue(func(ctx context.Context) {
if err := w.process(ctx, msg); err != nil {