diff options
| author | 2022-04-28 13:23:11 +0100 | |
|---|---|---|
| committer | 2022-04-28 13:23:11 +0100 | |
| commit | 420e2fb22bc7aa4967ddadb11e444079efdf5117 (patch) | |
| tree | 413842c5df646c30a8079671ade5e677e3825fb8 /internal/worker | |
| parent | [bugfix] Fix possible race condition in federatingdb (#490) (diff) | |
| download | gotosocial-420e2fb22bc7aa4967ddadb11e444079efdf5117.tar.xz | |
replace async client API / federator msg processing with worker pools (#497)
* replace async client API / federator msg processing with worker pools
* appease our lord-and-saviour, the linter
Diffstat (limited to 'internal/worker')
| -rw-r--r-- | internal/worker/workers.go | 69 | 
1 files changed, 69 insertions, 0 deletions
| diff --git a/internal/worker/workers.go b/internal/worker/workers.go new file mode 100644 index 000000000..d3d6197ed --- /dev/null +++ b/internal/worker/workers.go @@ -0,0 +1,69 @@ +package worker + +import ( +	"context" +	"errors" +	"runtime" + +	"codeberg.org/gruf/go-runners" +	"github.com/sirupsen/logrus" +) + +// Worker represents a proccessor for MsgType objects, using a worker pool to allocate resources. +type Worker[MsgType any] struct { +	workers runners.WorkerPool +	process func(context.Context, MsgType) error +} + +// New returns a new Worker[MsgType] with given number of workers and queue size +// (see runners.WorkerPool for more information on args). If args < 1 then suitable +// defaults are determined from the runtime's GOMAXPROCS variable. +func New[MsgType any](workers int, queue int) *Worker[MsgType] { +	if workers < 1 { +		workers = runtime.GOMAXPROCS(0) +	} +	if queue < 1 { +		queue = workers * 100 +	} +	return &Worker[MsgType]{ +		workers: runners.NewWorkerPool(workers, queue), +		process: nil, +	} +} + +// Start will attempt to start the underlying worker pool, or return error. +func (w *Worker[MsgType]) Start() error { +	if w.process == nil { +		return errors.New("nil Worker.process function") +	} +	if !w.workers.Start() { +		return errors.New("failed to start Worker pool") +	} +	return nil +} + +// Stop will attempt to stop the underlying worker pool, or return error. +func (w *Worker[MsgType]) Stop() error { +	if !w.workers.Stop() { +		return errors.New("failed to stop Worker pool") +	} +	return nil +} + +// SetProcessor will set the Worker's processor function, which is called for each queued message. +func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { +	if w.process != nil { +		logrus.Panic("Worker.process is already set") +	} +	w.process = fn +} + +// Queue will queue provided message to be processed with there's a free worker. +func (w *Worker[MsgType]) Queue(msg MsgType) { +	logrus.Tracef("queueing %[1]T message; %+[1]v", msg) +	w.workers.Enqueue(func(ctx context.Context) { +		if err := w.process(ctx, msg); err != nil { +			logrus.Error(err) +		} +	}) +} | 
