summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-runners/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/pool.go')
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go127
1 files changed, 93 insertions, 34 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
index b6be57d0a..1d83e85c7 100644
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ b/vendor/codeberg.org/gruf/go-runners/pool.go
@@ -2,8 +2,12 @@ package runners
import (
"context"
+ "fmt"
+ "os"
"runtime"
"sync"
+
+ "codeberg.org/gruf/go-errors/v2"
)
// WorkerFunc represents a function processable by a worker in WorkerPool. Note
@@ -26,17 +30,22 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {
return false
}
- if workers < 1 {
- // Use $GOMAXPROCS as default worker count
+ if workers <= 0 {
+ // Use $GOMAXPROCS as default.
workers = runtime.GOMAXPROCS(0)
}
if queue < 0 {
- // Set a reasonable queue default
- queue = workers * 2
+ // Use reasonable queue default.
+ queue = workers * 10
}
- // Allocate pool queue of given size
+ // Allocate pool queue of given size.
+ //
+ // This MUST be set BEFORE we return and NOT in
+ // the launched goroutine, or there is a risk that
+ // the pool may appear as closed for a short time
+ // until the main goroutine has been entered.
fns := make(chan WorkerFunc, queue)
pool.fns = fns
@@ -53,50 +62,49 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {
// Start goroutine worker functions
for i := 0; i < workers; i++ {
+ wait.Add(1)
+
go func() {
- // Trigger start / stop
- wait.Add(1)
defer wait.Done()
- // Keep workers running on panic
- for !workerstart(ctx, fns) {
+ // Run worker function.
+ for !worker_run(ctx, fns) {
+ // retry on panic
}
}()
}
- // Set GC finalizer to stop pool on dealloc
+ // Set GC finalizer to stop pool on dealloc.
runtime.SetFinalizer(pool, func(pool *WorkerPool) {
- pool.svc.Stop()
+ _ = pool.svc.Stop()
})
// Wait on ctx
<-ctx.Done()
- // Stop all workers
- close(pool.fns)
+ // Drain function queue.
+ //
+ // All functions in the queue MUST be
+ // run, so we pass them a closed context.
+ //
+ // This mainly allows us to block until
+ // the function queue is empty, as worker
+ // functions will also continue draining in
+ // the background with the (now) closed ctx.
+ for !drain_queue(fns) {
+ // retry on panic
+ }
+
+ // Now the queue is empty, we can
+ // safely close the channel signalling
+ // all of the workers to return.
+ close(fns)
wait.Wait()
}()
return true
}
-// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed.
-func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool {
- // Recover and drop any panic
- defer func() { recover() }()
-
- for {
- // Wait on next func
- fn, ok := <-fns
- if !ok {
- return true
- }
-
- // Run with ctx
- fn(ctx)
- }
-}
-
// Stop will stop the WorkerPool management loop, blocking until stopped.
func (pool *WorkerPool) Stop() bool {
return pool.svc.Stop()
@@ -124,22 +132,24 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
-func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) {
+func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
// Check valid fn
if fn == nil {
- return
+ return false
}
select {
// Caller ctx cancelled
case <-ctx.Done():
+ return false
// Pool ctx cancelled
case <-pool.svc.Done():
- fn(closedctx)
+ return false
// Placed fn in queue
case pool.fns <- fn:
+ return true
}
}
@@ -167,5 +177,54 @@ func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
// Queue returns the number of currently queued WorkerFuncs.
func (pool *WorkerPool) Queue() int {
- return len(pool.fns)
+ var l int
+ pool.svc.While(func() {
+ l = len(pool.fns)
+ })
+ return l
+}
+
+// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
+func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
+ defer func() {
+ // Recover and drop any panic
+ if r := recover(); r != nil {
+ const msg = "worker_run: recovered panic: %v\n\n%s\n"
+ fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
+ }
+ }()
+
+ for {
+ // Wait on next func
+ fn, ok := <-fns
+ if !ok {
+ return true
+ }
+
+ // Run with ctx
+ fn(ctx)
+ }
+}
+
+// drain_queue will drain and run all functions in worker queue, passing in a closed context.
+func drain_queue(fns <-chan WorkerFunc) bool {
+ defer func() {
+ // Recover and drop any panic
+ if r := recover(); r != nil {
+ const msg = "drain_queue: recovered panic: %v\n\n%s\n"
+ fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
+ }
+ }()
+
+ for {
+ select {
+ // Run with closed ctx
+ case fn := <-fns:
+ fn(closedctx)
+
+ // Queue is empty
+ default:
+ return true
+ }
+ }
}