diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/pool.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 127 |
1 files changed, 93 insertions, 34 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index b6be57d0a..1d83e85c7 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -2,8 +2,12 @@ package runners import ( "context" + "fmt" + "os" "runtime" "sync" + + "codeberg.org/gruf/go-errors/v2" ) // WorkerFunc represents a function processable by a worker in WorkerPool. Note @@ -26,17 +30,22 @@ func (pool *WorkerPool) Start(workers int, queue int) bool { return false } - if workers < 1 { - // Use $GOMAXPROCS as default worker count + if workers <= 0 { + // Use $GOMAXPROCS as default. workers = runtime.GOMAXPROCS(0) } if queue < 0 { - // Set a reasonable queue default - queue = workers * 2 + // Use reasonable queue default. + queue = workers * 10 } - // Allocate pool queue of given size + // Allocate pool queue of given size. + // + // This MUST be set BEFORE we return and NOT in + // the launched goroutine, or there is a risk that + // the pool may appear as closed for a short time + // until the main goroutine has been entered. fns := make(chan WorkerFunc, queue) pool.fns = fns @@ -53,50 +62,49 @@ func (pool *WorkerPool) Start(workers int, queue int) bool { // Start goroutine worker functions for i := 0; i < workers; i++ { + wait.Add(1) + go func() { - // Trigger start / stop - wait.Add(1) defer wait.Done() - // Keep workers running on panic - for !workerstart(ctx, fns) { + // Run worker function. + for !worker_run(ctx, fns) { + // retry on panic } }() } - // Set GC finalizer to stop pool on dealloc + // Set GC finalizer to stop pool on dealloc. runtime.SetFinalizer(pool, func(pool *WorkerPool) { - pool.svc.Stop() + _ = pool.svc.Stop() }) // Wait on ctx <-ctx.Done() - // Stop all workers - close(pool.fns) + // Drain function queue. + // + // All functions in the queue MUST be + // run, so we pass them a closed context. + // + // This mainly allows us to block until + // the function queue is empty, as worker + // functions will also continue draining in + // the background with the (now) closed ctx. + for !drain_queue(fns) { + // retry on panic + } + + // Now the queue is empty, we can + // safely close the channel signalling + // all of the workers to return. + close(fns) wait.Wait() }() return true } -// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed. -func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool { - // Recover and drop any panic - defer func() { recover() }() - - for { - // Wait on next func - fn, ok := <-fns - if !ok { - return true - } - - // Run with ctx - fn(ctx) - } -} - // Stop will stop the WorkerPool management loop, blocking until stopped. func (pool *WorkerPool) Stop() bool { return pool.svc.Stop() @@ -124,22 +132,24 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) { // EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the // case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. -func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) { +func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool { // Check valid fn if fn == nil { - return + return false } select { // Caller ctx cancelled case <-ctx.Done(): + return false // Pool ctx cancelled case <-pool.svc.Done(): - fn(closedctx) + return false // Placed fn in queue case pool.fns <- fn: + return true } } @@ -167,5 +177,54 @@ func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { // Queue returns the number of currently queued WorkerFuncs. func (pool *WorkerPool) Queue() int { - return len(pool.fns) + var l int + pool.svc.While(func() { + l = len(pool.fns) + }) + return l +} + +// worker_run is the main worker routine, accepting functions from 'fns' until it is closed. +func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool { + defer func() { + // Recover and drop any panic + if r := recover(); r != nil { + const msg = "worker_run: recovered panic: %v\n\n%s\n" + fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10)) + } + }() + + for { + // Wait on next func + fn, ok := <-fns + if !ok { + return true + } + + // Run with ctx + fn(ctx) + } +} + +// drain_queue will drain and run all functions in worker queue, passing in a closed context. +func drain_queue(fns <-chan WorkerFunc) bool { + defer func() { + // Recover and drop any panic + if r := recover(); r != nil { + const msg = "drain_queue: recovered panic: %v\n\n%s\n" + fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10)) + } + }() + + for { + select { + // Run with closed ctx + case fn := <-fns: + fn(closedctx) + + // Queue is empty + default: + return true + } + } } |