summaryrefslogtreecommitdiff
path: root/internal/concurrency/workers.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2022-09-28 18:30:40 +0100
committerLibravatar GitHub <noreply@github.com>2022-09-28 18:30:40 +0100
commita156188b3eb5cb3da44aa1b7452265f5fa38a607 (patch)
tree7097fa48d56fbabc7c2c8750b1f3bc9321d71c0f /internal/concurrency/workers.go
parent[bugfix] Fix emphasis being added to emoji shortcodes with markdown parsing (... (diff)
downloadgotosocial-a156188b3eb5cb3da44aa1b7452265f5fa38a607.tar.xz
[chore] update dependencies, bump to Go 1.19.1 (#826)
* update dependencies, bump Go version to 1.19 * bump test image Go version * update golangci-lint * update gotosocial-drone-build * sign * linting, go fmt * update swagger docs * update swagger docs * whitespace * update contributing.md * fuckin whoopsie doopsie * linterino, linteroni * fix followrequest test not starting processor * fix other api/client tests not starting processor * fix remaining tests where processor not started * bump go-runners version * don't check last-webfingered-at, processor may have updated this * update swagger command * update bun to latest version * fix embed to work the same as before with new bun Signed-off-by: kim <grufwub@gmail.com> Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
Diffstat (limited to 'internal/concurrency/workers.go')
-rw-r--r--internal/concurrency/workers.go10
1 files changed, 6 insertions, 4 deletions
diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go
index d9669fcb3..279a0c3c1 100644
--- a/internal/concurrency/workers.go
+++ b/internal/concurrency/workers.go
@@ -34,6 +34,7 @@ import (
type WorkerPool[MsgType any] struct {
workers runners.WorkerPool
process func(context.Context, MsgType) error
+ nw, nq int
prefix string // contains type prefix for logging
}
@@ -57,8 +58,9 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType
_, msgType = path.Split(msgType)
w := &WorkerPool[MsgType]{
- workers: runners.NewWorkerPool(workers, workers*queueRatio),
process: nil,
+ nw: workers,
+ nq: workers * queueRatio,
prefix: fmt.Sprintf("worker.Worker[%s]", msgType),
}
@@ -82,7 +84,7 @@ func (w *WorkerPool[MsgType]) Start() error {
}
// Attempt to start pool
- if !w.workers.Start() {
+ if !w.workers.Start(w.nw, w.nq) {
return errors.New("failed to start Worker pool")
}
@@ -111,8 +113,8 @@ func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) err
// Queue will queue provided message to be processed with there's a free worker.
func (w *WorkerPool[MsgType]) Queue(msg MsgType) {
- log.Tracef("%s queueing message (workers=%d queue=%d): %+v",
- w.prefix, w.workers.Workers(), w.workers.Queue(), msg,
+ log.Tracef("%s queueing message (queue=%d): %+v",
+ w.prefix, w.workers.Queue(), msg,
)
w.workers.Enqueue(func(ctx context.Context) {
if err := w.process(ctx, msg); err != nil {