diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/worker/workers.go | 27 | 
1 files changed, 24 insertions, 3 deletions
diff --git a/internal/worker/workers.go b/internal/worker/workers.go index d3d6197ed..ac329f8f7 100644 --- a/internal/worker/workers.go +++ b/internal/worker/workers.go @@ -3,6 +3,7 @@ package worker  import (  	"context"  	"errors" +	"reflect"  	"runtime"  	"codeberg.org/gruf/go-runners" @@ -13,6 +14,7 @@ import (  type Worker[MsgType any] struct {  	workers runners.WorkerPool  	process func(context.Context, MsgType) error +	prefix  string // contains type prefix for logging  }  // New returns a new Worker[MsgType] with given number of workers and queue size @@ -20,47 +22,66 @@ type Worker[MsgType any] struct {  // defaults are determined from the runtime's GOMAXPROCS variable.  func New[MsgType any](workers int, queue int) *Worker[MsgType] {  	if workers < 1 { +		// ensure sensible workers  		workers = runtime.GOMAXPROCS(0)  	}  	if queue < 1 { +		// ensure sensible queue  		queue = workers * 100  	} -	return &Worker[MsgType]{ + +	w := &Worker[MsgType]{  		workers: runners.NewWorkerPool(workers, queue),  		process: nil, +		prefix:  reflect.TypeOf(Worker[MsgType]{}).String(), //nolint  	} + +	// Log new worker creation with type prefix +	logrus.Infof("%s created with workers=%d queue=%d", w.prefix, workers, queue) + +	return w  }  // Start will attempt to start the underlying worker pool, or return error.  func (w *Worker[MsgType]) Start() error { +	logrus.Info(w.prefix, "starting") + +	// Check processor was set  	if w.process == nil {  		return errors.New("nil Worker.process function")  	} + +	// Attempt to start pool  	if !w.workers.Start() {  		return errors.New("failed to start Worker pool")  	} +  	return nil  }  // Stop will attempt to stop the underlying worker pool, or return error.  func (w *Worker[MsgType]) Stop() error { +	logrus.Info(w.prefix, "stopping") + +	// Attempt to stop pool  	if !w.workers.Stop() {  		return errors.New("failed to stop Worker pool")  	} +  	return nil  }  // SetProcessor will set the Worker's processor function, which is called for each queued message.  func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) {  	if w.process != nil { -		logrus.Panic("Worker.process is already set") +		logrus.Panic(w.prefix, "Worker.process is already set")  	}  	w.process = fn  }  // Queue will queue provided message to be processed with there's a free worker.  func (w *Worker[MsgType]) Queue(msg MsgType) { -	logrus.Tracef("queueing %[1]T message; %+[1]v", msg) +	logrus.Tracef("%s queueing message: %+v", w.prefix, msg)  	w.workers.Enqueue(func(ctx context.Context) {  		if err := w.process(ctx, msg); err != nil {  			logrus.Error(err)  | 
