diff options
Diffstat (limited to 'internal/concurrency')
| -rw-r--r-- | internal/concurrency/workers.go | 42 | 
1 files changed, 29 insertions, 13 deletions
| diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index 279a0c3c1..377b9e041 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -26,6 +26,7 @@ import (  	"reflect"  	"runtime" +	"codeberg.org/gruf/go-kv"  	"codeberg.org/gruf/go-runners"  	"github.com/superseriousbusiness/gotosocial/internal/log"  ) @@ -35,7 +36,7 @@ type WorkerPool[MsgType any] struct {  	workers runners.WorkerPool  	process func(context.Context, MsgType) error  	nw, nq  int -	prefix  string // contains type prefix for logging +	wtype   string // contains worker type for logging  }  // New returns a new WorkerPool[MsgType] with given number of workers and queue ratio, @@ -61,12 +62,12 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType  		process: nil,  		nw:      workers,  		nq:      workers * queueRatio, -		prefix:  fmt.Sprintf("worker.Worker[%s]", msgType), +		wtype:   fmt.Sprintf("worker.Worker[%s]", msgType),  	} -	// Log new worker creation with type prefix +	// Log new worker creation with worker type prefix  	log.Infof("%s created with workers=%d queue=%d", -		w.prefix, +		w.wtype,  		workers,  		workers*queueRatio,  	) @@ -76,7 +77,7 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType  // Start will attempt to start the underlying worker pool, or return error.  func (w *WorkerPool[MsgType]) Start() error { -	log.Infof("%s starting", w.prefix) +	log.Infof("%s starting", w.wtype)  	// Check processor was set  	if w.process == nil { @@ -93,7 +94,7 @@ func (w *WorkerPool[MsgType]) Start() error {  // Stop will attempt to stop the underlying worker pool, or return error.  func (w *WorkerPool[MsgType]) Stop() error { -	log.Infof("%s stopping", w.prefix) +	log.Infof("%s stopping", w.wtype)  	// Attempt to stop pool  	if !w.workers.Stop() { @@ -106,19 +107,34 @@ func (w *WorkerPool[MsgType]) Stop() error {  // SetProcessor will set the Worker's processor function, which is called for each queued message.  func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) {  	if w.process != nil { -		log.Fatalf("%s Worker.process is already set", w.prefix) +		log.Panicf("%s Worker.process is already set", w.wtype)  	}  	w.process = fn  }  // Queue will queue provided message to be processed with there's a free worker.  func (w *WorkerPool[MsgType]) Queue(msg MsgType) { -	log.Tracef("%s queueing message (queue=%d): %+v", -		w.prefix, w.workers.Queue(), msg, -	) -	w.workers.Enqueue(func(ctx context.Context) { +	log.Tracef("%s queueing message: %+v", w.wtype, msg) + +	// Create new process function for msg +	process := func(ctx context.Context) {  		if err := w.process(ctx, msg); err != nil { -			log.Errorf("%s %v", w.prefix, err) +			log.WithFields(kv.Fields{ +				kv.Field{K: "type", V: w.wtype}, +				kv.Field{K: "error", V: err}, +			}...).Error("message processing error")  		} -	}) +	} + +	// Attempt a fast-enqueue of process +	if !w.workers.EnqueueNow(process) { +		// No spot acquired, log warning +		log.WithFields(kv.Fields{ +			kv.Field{K: "type", V: w.wtype}, +			kv.Field{K: "queue", V: w.workers.Queue()}, +		}...).Warn("full worker queue") + +		// Block on enqueuing process func +		w.workers.Enqueue(process) +	}  } | 
