summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-runners/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/pool.go')
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go197
1 files changed, 97 insertions, 100 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
index ca8849f30..b6be57d0a 100644
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ b/vendor/codeberg.org/gruf/go-runners/pool.go
@@ -7,124 +7,105 @@ import (
)
// WorkerFunc represents a function processable by a worker in WorkerPool. Note
-// that implementations absolutely MUST check whether passed context is Done()
-// otherwise stopping the pool may block for large periods of time.
+// that implementations absolutely MUST check whether passed context is <-ctx.Done()
+// otherwise stopping the pool may block indefinitely.
type WorkerFunc func(context.Context)
// WorkerPool provides a means of enqueuing asynchronous work.
type WorkerPool struct {
- queue chan WorkerFunc
- free chan struct{}
- wait sync.WaitGroup
- svc Service
+ fns chan WorkerFunc
+ svc Service
}
-// NewWorkerPool returns a new WorkerPool with provided worker count and WorkerFunc queue size.
-// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the
-// queue size represents the max number of WorkerFuncs that can be queued at any one time.
-func NewWorkerPool(workers int, queue int) WorkerPool {
+// Start will start the main WorkerPool management loop in a new goroutine, along
+// with requested number of child worker goroutines. Returns false if already running.
+func (pool *WorkerPool) Start(workers int, queue int) bool {
+ // Attempt to start the svc
+ ctx, ok := pool.svc.doStart()
+ if !ok {
+ return false
+ }
+
if workers < 1 {
+ // Use $GOMAXPROCS as default worker count
workers = runtime.GOMAXPROCS(0)
}
- if queue < 1 {
+
+ if queue < 0 {
+ // Set a reasonable queue default
queue = workers * 2
}
- return WorkerPool{
- queue: make(chan WorkerFunc, queue),
- free: make(chan struct{}, workers),
- }
-}
-// Start will attempt to start the worker pool, asynchronously. Return is success state.
-func (pool *WorkerPool) Start() bool {
- ok := true
+ // Allocate pool queue of given size
+ fns := make(chan WorkerFunc, queue)
+ pool.fns = fns
- done := make(chan struct{})
go func() {
- ok = pool.svc.Run(func(ctx context.Context) {
- close(done)
- pool.process(ctx)
- })
- if !ok {
- close(done)
- }
- }()
- <-done
-
- return ok
-}
-
-// Stop will attempt to stop the worker pool, this will block until stopped. Return is success state.
-func (pool *WorkerPool) Stop() bool {
- return pool.svc.Stop()
-}
+ defer func() {
+ // unlock single wait
+ pool.svc.wait.Unlock()
-// Running returns whether the worker pool is running.
-func (pool *WorkerPool) Running() bool {
- return pool.svc.Running()
-}
+ // ensure stopped
+ pool.svc.Stop()
+ }()
-// execute will take a queued function and pass it to a free worker when available.
-func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) {
- var acquired bool
+ var wait sync.WaitGroup
- // Set as running
- pool.wait.Add(1)
+ // Start goroutine worker functions
+ for i := 0; i < workers; i++ {
+ go func() {
+ // Trigger start / stop
+ wait.Add(1)
+ defer wait.Done()
- select {
- // Pool context cancelled
- // (we fall through and let
- // the function execute).
- case <-ctx.Done():
+ // Keep workers running on panic
+ for !workerstart(ctx, fns) {
+ }
+ }()
+ }
- // Free worker acquired.
- case pool.free <- struct{}{}:
- acquired = true
- }
+ // Set GC finalizer to stop pool on dealloc
+ runtime.SetFinalizer(pool, func(pool *WorkerPool) {
+ pool.svc.Stop()
+ })
- go func() {
- defer func() {
- // defer in case panic
- if acquired {
- <-pool.free
- }
- pool.wait.Done()
- }()
+ // Wait on ctx
+ <-ctx.Done()
- // Run queued
- fn(ctx)
+ // Stop all workers
+ close(pool.fns)
+ wait.Wait()
}()
+
+ return true
}
-// process is the background processing routine that passes queued functions to workers.
-func (pool *WorkerPool) process(ctx context.Context) {
- for {
- select {
- // Pool context cancelled
- case <-ctx.Done():
- for {
- select {
- // Pop and execute queued
- case fn := <-pool.queue:
- fn(ctx) // ctx is closed
-
- // Empty, wait for workers
- default:
- pool.wait.Wait()
- return
- }
- }
+// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed.
+func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool {
+ // Recover and drop any panic
+ defer func() { recover() }()
- // Queued func received
- case fn := <-pool.queue:
- pool.execute(ctx, fn)
+ for {
+ // Wait on next func
+ fn, ok := <-fns
+ if !ok {
+ return true
}
+
+ // Run with ctx
+ fn(ctx)
}
}
+// Stop will stop the WorkerPool management loop, blocking until stopped.
+func (pool *WorkerPool) Stop() bool {
+ return pool.svc.Stop()
+}
+
// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
-// This will block until the function has been queued. 'fn' will ALWAYS be executed, even on pool
-// close, which can be determined via context <-ctx.Done(). WorkerFuncs MUST respect the passed context.
+// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
+// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
+// WorkerFuncs MUST respect the passed context.
func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
// Check valid fn
if fn == nil {
@@ -132,29 +113,50 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
}
select {
- // Pool context cancelled
+ // Pool ctx cancelled
+ case <-pool.svc.Done():
+ fn(closedctx)
+
+ // Placed fn in queue
+ case pool.fns <- fn:
+ }
+}
+
+// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
+// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
+func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) {
+ // Check valid fn
+ if fn == nil {
+ return
+ }
+
+ select {
+ // Caller ctx cancelled
+ case <-ctx.Done():
+
+ // Pool ctx cancelled
case <-pool.svc.Done():
fn(closedctx)
// Placed fn in queue
- case pool.queue <- fn:
+ case pool.fns <- fn:
}
}
-// EnqueueNoBlock attempts Enqueue but returns false if not executed.
-func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {
+// EnqueueNow attempts Enqueue but returns false if not executed.
+func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
// Check valid fn
if fn == nil {
return false
}
select {
- // Pool context cancelled
+ // Pool ctx cancelled
case <-pool.svc.Done():
return false
// Placed fn in queue
- case pool.queue <- fn:
+ case pool.fns <- fn:
return true
// Queue is full
@@ -165,10 +167,5 @@ func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {
// Queue returns the number of currently queued WorkerFuncs.
func (pool *WorkerPool) Queue() int {
- return len(pool.queue)
-}
-
-// Workers returns the number of currently active workers.
-func (pool *WorkerPool) Workers() int {
- return len(pool.free)
+ return len(pool.fns)
}