diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/pool.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 197 |
1 files changed, 97 insertions, 100 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index ca8849f30..b6be57d0a 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -7,124 +7,105 @@ import ( ) // WorkerFunc represents a function processable by a worker in WorkerPool. Note -// that implementations absolutely MUST check whether passed context is Done() -// otherwise stopping the pool may block for large periods of time. +// that implementations absolutely MUST check whether passed context is <-ctx.Done() +// otherwise stopping the pool may block indefinitely. type WorkerFunc func(context.Context) // WorkerPool provides a means of enqueuing asynchronous work. type WorkerPool struct { - queue chan WorkerFunc - free chan struct{} - wait sync.WaitGroup - svc Service + fns chan WorkerFunc + svc Service } -// NewWorkerPool returns a new WorkerPool with provided worker count and WorkerFunc queue size. -// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the -// queue size represents the max number of WorkerFuncs that can be queued at any one time. -func NewWorkerPool(workers int, queue int) WorkerPool { +// Start will start the main WorkerPool management loop in a new goroutine, along +// with requested number of child worker goroutines. Returns false if already running. +func (pool *WorkerPool) Start(workers int, queue int) bool { + // Attempt to start the svc + ctx, ok := pool.svc.doStart() + if !ok { + return false + } + if workers < 1 { + // Use $GOMAXPROCS as default worker count workers = runtime.GOMAXPROCS(0) } - if queue < 1 { + + if queue < 0 { + // Set a reasonable queue default queue = workers * 2 } - return WorkerPool{ - queue: make(chan WorkerFunc, queue), - free: make(chan struct{}, workers), - } -} -// Start will attempt to start the worker pool, asynchronously. Return is success state. -func (pool *WorkerPool) Start() bool { - ok := true + // Allocate pool queue of given size + fns := make(chan WorkerFunc, queue) + pool.fns = fns - done := make(chan struct{}) go func() { - ok = pool.svc.Run(func(ctx context.Context) { - close(done) - pool.process(ctx) - }) - if !ok { - close(done) - } - }() - <-done - - return ok -} - -// Stop will attempt to stop the worker pool, this will block until stopped. Return is success state. -func (pool *WorkerPool) Stop() bool { - return pool.svc.Stop() -} + defer func() { + // unlock single wait + pool.svc.wait.Unlock() -// Running returns whether the worker pool is running. -func (pool *WorkerPool) Running() bool { - return pool.svc.Running() -} + // ensure stopped + pool.svc.Stop() + }() -// execute will take a queued function and pass it to a free worker when available. -func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) { - var acquired bool + var wait sync.WaitGroup - // Set as running - pool.wait.Add(1) + // Start goroutine worker functions + for i := 0; i < workers; i++ { + go func() { + // Trigger start / stop + wait.Add(1) + defer wait.Done() - select { - // Pool context cancelled - // (we fall through and let - // the function execute). - case <-ctx.Done(): + // Keep workers running on panic + for !workerstart(ctx, fns) { + } + }() + } - // Free worker acquired. - case pool.free <- struct{}{}: - acquired = true - } + // Set GC finalizer to stop pool on dealloc + runtime.SetFinalizer(pool, func(pool *WorkerPool) { + pool.svc.Stop() + }) - go func() { - defer func() { - // defer in case panic - if acquired { - <-pool.free - } - pool.wait.Done() - }() + // Wait on ctx + <-ctx.Done() - // Run queued - fn(ctx) + // Stop all workers + close(pool.fns) + wait.Wait() }() + + return true } -// process is the background processing routine that passes queued functions to workers. -func (pool *WorkerPool) process(ctx context.Context) { - for { - select { - // Pool context cancelled - case <-ctx.Done(): - for { - select { - // Pop and execute queued - case fn := <-pool.queue: - fn(ctx) // ctx is closed - - // Empty, wait for workers - default: - pool.wait.Wait() - return - } - } +// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed. +func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool { + // Recover and drop any panic + defer func() { recover() }() - // Queued func received - case fn := <-pool.queue: - pool.execute(ctx, fn) + for { + // Wait on next func + fn, ok := <-fns + if !ok { + return true } + + // Run with ctx + fn(ctx) } } +// Stop will stop the WorkerPool management loop, blocking until stopped. +func (pool *WorkerPool) Stop() bool { + return pool.svc.Stop() +} + // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. -// This will block until the function has been queued. 'fn' will ALWAYS be executed, even on pool -// close, which can be determined via context <-ctx.Done(). WorkerFuncs MUST respect the passed context. +// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be +// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. +// WorkerFuncs MUST respect the passed context. func (pool *WorkerPool) Enqueue(fn WorkerFunc) { // Check valid fn if fn == nil { @@ -132,29 +113,50 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) { } select { - // Pool context cancelled + // Pool ctx cancelled + case <-pool.svc.Done(): + fn(closedctx) + + // Placed fn in queue + case pool.fns <- fn: + } +} + +// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the +// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. +func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) { + // Check valid fn + if fn == nil { + return + } + + select { + // Caller ctx cancelled + case <-ctx.Done(): + + // Pool ctx cancelled case <-pool.svc.Done(): fn(closedctx) // Placed fn in queue - case pool.queue <- fn: + case pool.fns <- fn: } } -// EnqueueNoBlock attempts Enqueue but returns false if not executed. -func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool { +// EnqueueNow attempts Enqueue but returns false if not executed. +func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { // Check valid fn if fn == nil { return false } select { - // Pool context cancelled + // Pool ctx cancelled case <-pool.svc.Done(): return false // Placed fn in queue - case pool.queue <- fn: + case pool.fns <- fn: return true // Queue is full @@ -165,10 +167,5 @@ func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool { // Queue returns the number of currently queued WorkerFuncs. func (pool *WorkerPool) Queue() int { - return len(pool.queue) -} - -// Workers returns the number of currently active workers. -func (pool *WorkerPool) Workers() int { - return len(pool.free) + return len(pool.fns) } |