From a156188b3eb5cb3da44aa1b7452265f5fa38a607 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Wed, 28 Sep 2022 18:30:40 +0100 Subject: [chore] update dependencies, bump to Go 1.19.1 (#826) * update dependencies, bump Go version to 1.19 * bump test image Go version * update golangci-lint * update gotosocial-drone-build * sign * linting, go fmt * update swagger docs * update swagger docs * whitespace * update contributing.md * fuckin whoopsie doopsie * linterino, linteroni * fix followrequest test not starting processor * fix other api/client tests not starting processor * fix remaining tests where processor not started * bump go-runners version * don't check last-webfingered-at, processor may have updated this * update swagger command * update bun to latest version * fix embed to work the same as before with new bun Signed-off-by: kim Co-authored-by: tsmethurst --- internal/concurrency/workers.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'internal/concurrency/workers.go') diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index d9669fcb3..279a0c3c1 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -34,6 +34,7 @@ import ( type WorkerPool[MsgType any] struct { workers runners.WorkerPool process func(context.Context, MsgType) error + nw, nq int prefix string // contains type prefix for logging } @@ -57,8 +58,9 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType _, msgType = path.Split(msgType) w := &WorkerPool[MsgType]{ - workers: runners.NewWorkerPool(workers, workers*queueRatio), process: nil, + nw: workers, + nq: workers * queueRatio, prefix: fmt.Sprintf("worker.Worker[%s]", msgType), } @@ -82,7 +84,7 @@ func (w *WorkerPool[MsgType]) Start() error { } // Attempt to start pool - if !w.workers.Start() { + if !w.workers.Start(w.nw, w.nq) { return errors.New("failed to start Worker pool") } @@ -111,8 +113,8 @@ func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) err // Queue will queue provided message to be processed with there's a free worker. func (w *WorkerPool[MsgType]) Queue(msg MsgType) { - log.Tracef("%s queueing message (workers=%d queue=%d): %+v", - w.prefix, w.workers.Workers(), w.workers.Queue(), msg, + log.Tracef("%s queueing message (queue=%d): %+v", + w.prefix, w.workers.Queue(), msg, ) w.workers.Enqueue(func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { -- cgit v1.2.3