summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-runners/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/pool.go')
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go292
1 files changed, 0 insertions, 292 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
deleted file mode 100644
index 644cde0b9..000000000
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ /dev/null
@@ -1,292 +0,0 @@
-package runners
-
-import (
- "context"
- "fmt"
- "os"
- "runtime"
- "sync"
-
- "codeberg.org/gruf/go-errors/v2"
-)
-
-// WorkerFunc represents a function processable by a worker in WorkerPool. Note
-// that implementations absolutely MUST check whether passed context is <-ctx.Done()
-// otherwise stopping the pool may block indefinitely.
-type WorkerFunc func(context.Context)
-
-// WorkerPool provides a means of enqueuing asynchronous work.
-type WorkerPool struct {
- fns chan WorkerFunc
- svc Service
-}
-
-// Start will start the main WorkerPool management loop in a new goroutine, along
-// with requested number of child worker goroutines. Returns false if already running.
-func (pool *WorkerPool) Start(workers int, queue int) bool {
- // Attempt to start the svc
- ctx, ok := pool.svc.doStart()
- if !ok {
- return false
- }
-
- if workers <= 0 {
- // Use $GOMAXPROCS as default.
- workers = runtime.GOMAXPROCS(0)
- }
-
- if queue < 0 {
- // Use reasonable queue default.
- queue = workers * 10
- }
-
- // Allocate pool queue of given size.
- //
- // This MUST be set BEFORE we return and NOT in
- // the launched goroutine, or there is a risk that
- // the pool may appear as closed for a short time
- // until the main goroutine has been entered.
- fns := make(chan WorkerFunc, queue)
- pool.fns = fns
-
- go func() {
- defer func() {
- // unlock single wait
- pool.svc.wait.Unlock()
-
- // ensure stopped
- pool.svc.Stop()
- }()
-
- var wait sync.WaitGroup
-
- // Start goroutine worker functions
- for i := 0; i < workers; i++ {
- wait.Add(1)
-
- go func() {
- defer wait.Done()
-
- // Run worker function (retry on panic)
- for !worker_run(CancelCtx(ctx), fns) {
- }
- }()
- }
-
- // Wait on ctx
- <-ctx
-
- // Drain function queue.
- //
- // All functions in the queue MUST be
- // run, so we pass them a closed context.
- //
- // This mainly allows us to block until
- // the function queue is empty, as worker
- // functions will also continue draining in
- // the background with the (now) closed ctx.
- for !drain_queue(fns) {
- // retry on panic
- }
-
- // Now the queue is empty, we can
- // safely close the channel signalling
- // all of the workers to return.
- close(fns)
- wait.Wait()
- }()
-
- return true
-}
-
-// Stop will stop the WorkerPool management loop, blocking until stopped.
-func (pool *WorkerPool) Stop() bool {
- return pool.svc.Stop()
-}
-
-// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
-func (pool *WorkerPool) Running() bool {
- return pool.svc.Running()
-}
-
-// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
-func (pool *WorkerPool) Done() <-chan struct{} {
- return pool.svc.Done()
-}
-
-// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
-// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
-// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
-// WorkerFuncs MUST respect the passed context.
-func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
- // Check valid fn
- if fn == nil {
- return
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- fn(closedctx)
-
- // Placed fn in queue
- case pool.fns <- fn:
- }
-}
-
-// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
-// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
-func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Caller ctx cancelled
- case <-ctx.Done():
- return false
-
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
- }
-}
-
-// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case
-// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue().
-// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed.
-func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- case <-ctx.Done():
- // We failed to add this entry to the worker queue before the
- // incoming context was cancelled. So to ensure processing
- // we simply queue it asynchronously and return early to caller.
- go pool.Enqueue(fn)
- return false
-
- case <-pool.svc.Done():
- // Pool ctx cancelled
- fn(closedctx)
- return false
-
- case pool.fns <- fn:
- // Placed fn in queue
- return true
- }
-}
-
-// EnqueueNow attempts Enqueue but returns false if not executed.
-func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
-
- // Queue is full
- default:
- return false
- }
-}
-
-// Queue returns the number of currently queued WorkerFuncs.
-func (pool *WorkerPool) Queue() int {
- var l int
- pool.svc.While(func() {
- l = len(pool.fns)
- })
- return l
-}
-
-// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
-func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- // Wait on next func
- fn, ok := <-fns
- if !ok {
- return true
- }
-
- // Run with ctx
- fn(ctx)
- }
-}
-
-// drain_queue will drain and run all functions in worker queue, passing in a closed context.
-func drain_queue(fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- select {
- // Run with closed ctx
- case fn := <-fns:
- fn(closedctx)
-
- // Queue is empty
- default:
- return true
- }
- }
-}
-
-// gatherFrames collates runtime frames from a frame iterator.
-func gatherFrames(iter *runtime.Frames, n int) errors.Callers {
- if iter == nil {
- return nil
- }
- frames := make([]runtime.Frame, 0, n)
- for {
- f, ok := iter.Next()
- if !ok {
- break
- }
- frames = append(frames, f)
- }
- return frames
-}