summaryrefslogtreecommitdiff
path: root/internal/concurrency/workers.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2022-11-08 09:35:24 +0000
committerLibravatar GitHub <noreply@github.com>2022-11-08 10:35:24 +0100
commit0e572460830ce7767562f08034f96d51e41b6349 (patch)
tree2ffad44c4d6f07b9d4c3bd494d6d1a3d3918692f /internal/concurrency/workers.go
parent[chore] update gruf libraries (#996) (diff)
downloadgotosocial-0e572460830ce7767562f08034f96d51e41b6349.tar.xz
[feature] various worker / request queue improvements (#995)
* greatly simplify httpclient request queuing Signed-off-by: kim <grufwub@gmail.com> * improved request queue mutex logic Signed-off-by: kim <grufwub@gmail.com> * use improved hashmap library Signed-off-by: kim <grufwub@gmail.com> * add warn logging when request queues are full Signed-off-by: kim <grufwub@gmail.com> * improve worker pool prefix var naming Signed-off-by: kim <grufwub@gmail.com> * improved worker pool error logging Signed-off-by: kim <grufwub@gmail.com> * move error message into separate field Signed-off-by: kim <grufwub@gmail.com> * remove old log statement Signed-off-by: kim <grufwub@gmail.com> * don't export worker message, it gets very spammy :') Signed-off-by: kim <grufwub@gmail.com> Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/concurrency/workers.go')
-rw-r--r--internal/concurrency/workers.go42
1 files changed, 29 insertions, 13 deletions
diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go
index 279a0c3c1..377b9e041 100644
--- a/internal/concurrency/workers.go
+++ b/internal/concurrency/workers.go
@@ -26,6 +26,7 @@ import (
"reflect"
"runtime"
+ "codeberg.org/gruf/go-kv"
"codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
@@ -35,7 +36,7 @@ type WorkerPool[MsgType any] struct {
workers runners.WorkerPool
process func(context.Context, MsgType) error
nw, nq int
- prefix string // contains type prefix for logging
+ wtype string // contains worker type for logging
}
// New returns a new WorkerPool[MsgType] with given number of workers and queue ratio,
@@ -61,12 +62,12 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType
process: nil,
nw: workers,
nq: workers * queueRatio,
- prefix: fmt.Sprintf("worker.Worker[%s]", msgType),
+ wtype: fmt.Sprintf("worker.Worker[%s]", msgType),
}
- // Log new worker creation with type prefix
+ // Log new worker creation with worker type prefix
log.Infof("%s created with workers=%d queue=%d",
- w.prefix,
+ w.wtype,
workers,
workers*queueRatio,
)
@@ -76,7 +77,7 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType
// Start will attempt to start the underlying worker pool, or return error.
func (w *WorkerPool[MsgType]) Start() error {
- log.Infof("%s starting", w.prefix)
+ log.Infof("%s starting", w.wtype)
// Check processor was set
if w.process == nil {
@@ -93,7 +94,7 @@ func (w *WorkerPool[MsgType]) Start() error {
// Stop will attempt to stop the underlying worker pool, or return error.
func (w *WorkerPool[MsgType]) Stop() error {
- log.Infof("%s stopping", w.prefix)
+ log.Infof("%s stopping", w.wtype)
// Attempt to stop pool
if !w.workers.Stop() {
@@ -106,19 +107,34 @@ func (w *WorkerPool[MsgType]) Stop() error {
// SetProcessor will set the Worker's processor function, which is called for each queued message.
func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) {
if w.process != nil {
- log.Fatalf("%s Worker.process is already set", w.prefix)
+ log.Panicf("%s Worker.process is already set", w.wtype)
}
w.process = fn
}
// Queue will queue provided message to be processed with there's a free worker.
func (w *WorkerPool[MsgType]) Queue(msg MsgType) {
- log.Tracef("%s queueing message (queue=%d): %+v",
- w.prefix, w.workers.Queue(), msg,
- )
- w.workers.Enqueue(func(ctx context.Context) {
+ log.Tracef("%s queueing message: %+v", w.wtype, msg)
+
+ // Create new process function for msg
+ process := func(ctx context.Context) {
if err := w.process(ctx, msg); err != nil {
- log.Errorf("%s %v", w.prefix, err)
+ log.WithFields(kv.Fields{
+ kv.Field{K: "type", V: w.wtype},
+ kv.Field{K: "error", V: err},
+ }...).Error("message processing error")
}
- })
+ }
+
+ // Attempt a fast-enqueue of process
+ if !w.workers.EnqueueNow(process) {
+ // No spot acquired, log warning
+ log.WithFields(kv.Fields{
+ kv.Field{K: "type", V: w.wtype},
+ kv.Field{K: "queue", V: w.workers.Queue()},
+ }...).Warn("full worker queue")
+
+ // Block on enqueuing process func
+ w.workers.Enqueue(process)
+ }
}