summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-runners/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/pool.go')
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go26
1 files changed, 20 insertions, 6 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
index 49fc22038..ca8849f30 100644
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ b/vendor/codeberg.org/gruf/go-runners/pool.go
@@ -2,6 +2,7 @@ package runners
import (
"context"
+ "runtime"
"sync"
)
@@ -22,6 +23,12 @@ type WorkerPool struct {
// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the
// queue size represents the max number of WorkerFuncs that can be queued at any one time.
func NewWorkerPool(workers int, queue int) WorkerPool {
+ if workers < 1 {
+ workers = runtime.GOMAXPROCS(0)
+ }
+ if queue < 1 {
+ queue = workers * 2
+ }
return WorkerPool{
queue: make(chan WorkerFunc, queue),
free: make(chan struct{}, workers),
@@ -59,22 +66,28 @@ func (pool *WorkerPool) Running() bool {
// execute will take a queued function and pass it to a free worker when available.
func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) {
+ var acquired bool
+
// Set as running
pool.wait.Add(1)
select {
// Pool context cancelled
+ // (we fall through and let
+ // the function execute).
case <-ctx.Done():
- pool.wait.Done()
- // Free worker acquired
+ // Free worker acquired.
case pool.free <- struct{}{}:
+ acquired = true
}
go func() {
defer func() {
// defer in case panic
- <-pool.free
+ if acquired {
+ <-pool.free
+ }
pool.wait.Done()
}()
@@ -110,8 +123,8 @@ func (pool *WorkerPool) process(ctx context.Context) {
}
// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
-// Note that 'fn' will ALWAYS be executed, and the supplied context will specify whether this 'fn'
-// is being executed during normal pool execution, or if the pool has been stopped with <-ctx.Done().
+// This will block until the function has been queued. 'fn' will ALWAYS be executed, even on pool
+// close, which can be determined via context <-ctx.Done(). WorkerFuncs MUST respect the passed context.
func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
// Check valid fn
if fn == nil {
@@ -121,13 +134,14 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
select {
// Pool context cancelled
case <-pool.svc.Done():
+ fn(closedctx)
// Placed fn in queue
case pool.queue <- fn:
}
}
-// EnqueueNoBlock performs Enqueue but returns false if queue size is at max. Else, true.
+// EnqueueNoBlock attempts Enqueue but returns false if not executed.
func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {
// Check valid fn
if fn == nil {