diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/pool.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index 49fc22038..ca8849f30 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -2,6 +2,7 @@ package runners import ( "context" + "runtime" "sync" ) @@ -22,6 +23,12 @@ type WorkerPool struct { // The number of workers represents how many WorkerFuncs can be executed simultaneously, and the // queue size represents the max number of WorkerFuncs that can be queued at any one time. func NewWorkerPool(workers int, queue int) WorkerPool { + if workers < 1 { + workers = runtime.GOMAXPROCS(0) + } + if queue < 1 { + queue = workers * 2 + } return WorkerPool{ queue: make(chan WorkerFunc, queue), free: make(chan struct{}, workers), @@ -59,22 +66,28 @@ func (pool *WorkerPool) Running() bool { // execute will take a queued function and pass it to a free worker when available. func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) { + var acquired bool + // Set as running pool.wait.Add(1) select { // Pool context cancelled + // (we fall through and let + // the function execute). case <-ctx.Done(): - pool.wait.Done() - // Free worker acquired + // Free worker acquired. case pool.free <- struct{}{}: + acquired = true } go func() { defer func() { // defer in case panic - <-pool.free + if acquired { + <-pool.free + } pool.wait.Done() }() @@ -110,8 +123,8 @@ func (pool *WorkerPool) process(ctx context.Context) { } // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. -// Note that 'fn' will ALWAYS be executed, and the supplied context will specify whether this 'fn' -// is being executed during normal pool execution, or if the pool has been stopped with <-ctx.Done(). +// This will block until the function has been queued. 'fn' will ALWAYS be executed, even on pool +// close, which can be determined via context <-ctx.Done(). WorkerFuncs MUST respect the passed context. func (pool *WorkerPool) Enqueue(fn WorkerFunc) { // Check valid fn if fn == nil { @@ -121,13 +134,14 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) { select { // Pool context cancelled case <-pool.svc.Done(): + fn(closedctx) // Placed fn in queue case pool.queue <- fn: } } -// EnqueueNoBlock performs Enqueue but returns false if queue size is at max. Else, true. +// EnqueueNoBlock attempts Enqueue but returns false if not executed. func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool { // Check valid fn if fn == nil { |