summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-runners
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners')
-rw-r--r--vendor/codeberg.org/gruf/go-runners/context.go9
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pointer.go22
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go292
-rw-r--r--vendor/codeberg.org/gruf/go-runners/process.go116
-rw-r--r--vendor/codeberg.org/gruf/go-runners/service.go311
5 files changed, 241 insertions, 509 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go
index 12f7f1a10..e02dcab22 100644
--- a/vendor/codeberg.org/gruf/go-runners/context.go
+++ b/vendor/codeberg.org/gruf/go-runners/context.go
@@ -2,6 +2,7 @@ package runners
import (
"context"
+ "sync/atomic"
"time"
)
@@ -19,9 +20,13 @@ func Closed() context.Context {
// CtxWithCancel returns a new context.Context impl with cancel.
func CtxWithCancel() (context.Context, context.CancelFunc) {
+ var once atomic.Uint32
ctx := make(chan struct{})
- cncl := func() { close(ctx) }
- return CancelCtx(ctx), cncl
+ return CancelCtx(ctx), func() {
+ if once.CompareAndSwap(0, 1) {
+ close(ctx)
+ }
+ }
}
// CancelCtx is the simplest possible cancellable context.
diff --git a/vendor/codeberg.org/gruf/go-runners/pointer.go b/vendor/codeberg.org/gruf/go-runners/pointer.go
new file mode 100644
index 000000000..cc139309f
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-runners/pointer.go
@@ -0,0 +1,22 @@
+package runners
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+// atomic_pointer wraps an unsafe.Pointer with
+// receiver methods for their atomic counterparts.
+type atomic_pointer struct{ p unsafe.Pointer }
+
+func (p *atomic_pointer) Load() unsafe.Pointer {
+ return atomic.LoadPointer(&p.p)
+}
+
+func (p *atomic_pointer) Store(ptr unsafe.Pointer) {
+ atomic.StorePointer(&p.p, ptr)
+}
+
+func (p *atomic_pointer) CAS(old, new unsafe.Pointer) bool {
+ return atomic.CompareAndSwapPointer(&p.p, old, new)
+}
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
deleted file mode 100644
index 644cde0b9..000000000
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ /dev/null
@@ -1,292 +0,0 @@
-package runners
-
-import (
- "context"
- "fmt"
- "os"
- "runtime"
- "sync"
-
- "codeberg.org/gruf/go-errors/v2"
-)
-
-// WorkerFunc represents a function processable by a worker in WorkerPool. Note
-// that implementations absolutely MUST check whether passed context is <-ctx.Done()
-// otherwise stopping the pool may block indefinitely.
-type WorkerFunc func(context.Context)
-
-// WorkerPool provides a means of enqueuing asynchronous work.
-type WorkerPool struct {
- fns chan WorkerFunc
- svc Service
-}
-
-// Start will start the main WorkerPool management loop in a new goroutine, along
-// with requested number of child worker goroutines. Returns false if already running.
-func (pool *WorkerPool) Start(workers int, queue int) bool {
- // Attempt to start the svc
- ctx, ok := pool.svc.doStart()
- if !ok {
- return false
- }
-
- if workers <= 0 {
- // Use $GOMAXPROCS as default.
- workers = runtime.GOMAXPROCS(0)
- }
-
- if queue < 0 {
- // Use reasonable queue default.
- queue = workers * 10
- }
-
- // Allocate pool queue of given size.
- //
- // This MUST be set BEFORE we return and NOT in
- // the launched goroutine, or there is a risk that
- // the pool may appear as closed for a short time
- // until the main goroutine has been entered.
- fns := make(chan WorkerFunc, queue)
- pool.fns = fns
-
- go func() {
- defer func() {
- // unlock single wait
- pool.svc.wait.Unlock()
-
- // ensure stopped
- pool.svc.Stop()
- }()
-
- var wait sync.WaitGroup
-
- // Start goroutine worker functions
- for i := 0; i < workers; i++ {
- wait.Add(1)
-
- go func() {
- defer wait.Done()
-
- // Run worker function (retry on panic)
- for !worker_run(CancelCtx(ctx), fns) {
- }
- }()
- }
-
- // Wait on ctx
- <-ctx
-
- // Drain function queue.
- //
- // All functions in the queue MUST be
- // run, so we pass them a closed context.
- //
- // This mainly allows us to block until
- // the function queue is empty, as worker
- // functions will also continue draining in
- // the background with the (now) closed ctx.
- for !drain_queue(fns) {
- // retry on panic
- }
-
- // Now the queue is empty, we can
- // safely close the channel signalling
- // all of the workers to return.
- close(fns)
- wait.Wait()
- }()
-
- return true
-}
-
-// Stop will stop the WorkerPool management loop, blocking until stopped.
-func (pool *WorkerPool) Stop() bool {
- return pool.svc.Stop()
-}
-
-// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
-func (pool *WorkerPool) Running() bool {
- return pool.svc.Running()
-}
-
-// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
-func (pool *WorkerPool) Done() <-chan struct{} {
- return pool.svc.Done()
-}
-
-// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
-// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
-// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
-// WorkerFuncs MUST respect the passed context.
-func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
- // Check valid fn
- if fn == nil {
- return
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- fn(closedctx)
-
- // Placed fn in queue
- case pool.fns <- fn:
- }
-}
-
-// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
-// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
-func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Caller ctx cancelled
- case <-ctx.Done():
- return false
-
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
- }
-}
-
-// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case
-// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue().
-// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed.
-func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- case <-ctx.Done():
- // We failed to add this entry to the worker queue before the
- // incoming context was cancelled. So to ensure processing
- // we simply queue it asynchronously and return early to caller.
- go pool.Enqueue(fn)
- return false
-
- case <-pool.svc.Done():
- // Pool ctx cancelled
- fn(closedctx)
- return false
-
- case pool.fns <- fn:
- // Placed fn in queue
- return true
- }
-}
-
-// EnqueueNow attempts Enqueue but returns false if not executed.
-func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
-
- // Queue is full
- default:
- return false
- }
-}
-
-// Queue returns the number of currently queued WorkerFuncs.
-func (pool *WorkerPool) Queue() int {
- var l int
- pool.svc.While(func() {
- l = len(pool.fns)
- })
- return l
-}
-
-// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
-func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- // Wait on next func
- fn, ok := <-fns
- if !ok {
- return true
- }
-
- // Run with ctx
- fn(ctx)
- }
-}
-
-// drain_queue will drain and run all functions in worker queue, passing in a closed context.
-func drain_queue(fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- select {
- // Run with closed ctx
- case fn := <-fns:
- fn(closedctx)
-
- // Queue is empty
- default:
- return true
- }
- }
-}
-
-// gatherFrames collates runtime frames from a frame iterator.
-func gatherFrames(iter *runtime.Frames, n int) errors.Callers {
- if iter == nil {
- return nil
- }
- frames := make([]runtime.Frame, 0, n)
- for {
- f, ok := iter.Next()
- if !ok {
- break
- }
- frames = append(frames, f)
- }
- return frames
-}
diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go
index ca39ac0d0..3feeff258 100644
--- a/vendor/codeberg.org/gruf/go-runners/process.go
+++ b/vendor/codeberg.org/gruf/go-runners/process.go
@@ -2,80 +2,72 @@ package runners
import (
"fmt"
+ "unsafe"
+
"sync"
)
-// Processable defines a runnable process with error return
-// that can be passed to a Processor instance for managed running.
-type Processable func() error
-
// Processor acts similarly to a sync.Once object, except that it is reusable. After
// the first call to Process(), any further calls before this first has returned will
// block until the first call has returned, and return the same error. This ensures
// that only a single instance of it is ever running at any one time.
-type Processor struct {
- mutex sync.Mutex
- wait *sync.WaitGroup
- err *error
-}
+type Processor struct{ p atomic_pointer }
// Process will process the given function if first-call, else blocking until
// the first function has returned, returning the same error result.
-func (p *Processor) Process(proc Processable) (err error) {
- // Acquire state lock.
- p.mutex.Lock()
-
- if p.wait != nil {
- // Already running.
- //
- // Get current ptrs.
- waitPtr := p.wait
- errPtr := p.err
-
- // Free state lock.
- p.mutex.Unlock()
-
- // Wait for finish.
- waitPtr.Wait()
- return *errPtr
- }
-
- // Alloc waiter for new process.
- var wait sync.WaitGroup
-
- // No need to alloc new error as
- // we use the alloc'd named error
- // return required for panic handling.
-
- // Reset ptrs.
- p.wait = &wait
- p.err = &err
-
- // Set started.
- wait.Add(1)
- p.mutex.Unlock()
-
- defer func() {
- if r := recover(); r != nil {
- if err != nil {
- rOld := r // wrap the panic so we don't lose existing returned error
- r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld)
- }
-
- // Catch any panics and wrap as error.
- err = fmt.Errorf("caught panic: %v", r)
+func (p *Processor) Process(proc func() error) (err error) {
+ var i *proc_instance
+
+ for {
+ // Attempt to load existing instance.
+ ptr := (*proc_instance)(p.p.Load())
+ if ptr != nil {
+
+ // Wait on existing.
+ ptr.wait.Wait()
+ err = ptr.err
+ return
}
- // Mark done.
- wait.Done()
+ if i == nil {
+ // Allocate instance.
+ i = new(proc_instance)
+ i.wait.Add(1)
+ }
- // Set stopped.
- p.mutex.Lock()
- p.wait = nil
- p.mutex.Unlock()
- }()
+ // Try to acquire start slot by
+ // setting ptr to *our* instance.
+ if p.p.CAS(nil, unsafe.Pointer(i)) {
+ defer func() {
+ if r := recover(); r != nil {
+ if i.err != nil {
+ rOld := r // wrap the panic so we don't lose existing returned error
+ r = fmt.Errorf("panic occured after error %q: %v", i.err.Error(), rOld)
+ }
+
+ // Catch panics and wrap as error return.
+ i.err = fmt.Errorf("caught panic: %v", r)
+ }
+
+ // Set return.
+ err = i.err
+
+ // Release the
+ // goroutines.
+ i.wait.Done()
+
+ // Free processor.
+ p.p.Store(nil)
+ }()
+
+ // Run func.
+ i.err = proc()
+ return
+ }
+ }
+}
- // Run process.
- err = proc()
- return
+type proc_instance struct {
+ wait sync.WaitGroup
+ err error
}
diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go
index 8a7c0051a..fe41807f9 100644
--- a/vendor/codeberg.org/gruf/go-runners/service.go
+++ b/vendor/codeberg.org/gruf/go-runners/service.go
@@ -3,215 +3,220 @@ package runners
import (
"context"
"sync"
+ "sync/atomic"
+ "unsafe"
)
-// Service provides a means of tracking a single long-running service, provided protected state
-// changes and preventing multiple instances running. Also providing service state information.
-type Service struct {
- state uint32 // 0=stopped, 1=running, 2=stopping
- mutex sync.Mutex // mutex protects overall state changes
- wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
- ctx chan struct{} // ctx is the current context for running function (or nil if not running)
-}
+// Service provides a means of tracking a single long-running Service, provided protected state
+// changes and preventing multiple instances running. Also providing Service state information.
+type Service struct{ p atomic_pointer }
// Run will run the supplied function until completion, using given context to propagate cancel.
// Immediately returns false if the Service is already running, and true after completed run.
-func (svc *Service) Run(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
+func (svc *Service) Run(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
+
+ // Attempt to start.
+ ptr, ok = svc.start()
if !ok {
- return false
+ return
}
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
-
- // ensure stopped
- _ = svc.Stop()
- }()
-
- // Run with context.
- fn(CancelCtx(ctx))
-
- return true
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
+ return
}
// GoRun will run the supplied function until completion in a goroutine, using given context to
// propagate cancel. Immediately returns boolean indicating success, or that service is already running.
-func (svc *Service) GoRun(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
+func (svc *Service) GoRun(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
+
+ // Attempt to start.
+ ptr, ok = svc.start()
if !ok {
- return false
+ return
}
go func() {
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
-
- // ensure stopped
- _ = svc.Stop()
- }()
-
- // Run with context.
- fn(CancelCtx(ctx))
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
}()
- return true
+ return
}
// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
-func (svc *Service) RunWait(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
+func (svc *Service) RunWait(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
+
+ // Attempt to start.
+ ptr, ok = svc.start()
if !ok {
- <-ctx // block
- return false
+ <-ptr.done
+ return
}
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
+ return
+}
- // ensure stopped
- _ = svc.Stop()
- }()
+// GoRunWait is functionally the same as .RunWait(), but blocks until the first instance of RunWait() returns.
+func (svc *Service) GoRunWait(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
- // Run with context.
- fn(CancelCtx(ctx))
+ // Attempt to start.
+ ptr, ok = svc.start()
+ if !ok {
+ <-ptr.done
+ return
+ }
+
+ go func() {
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
+ }()
- return true
+ return
}
// Stop will attempt to stop the service, cancelling the running function's context. Immediately
// returns false if not running, and true only after Service is fully stopped.
func (svc *Service) Stop() bool {
- // Attempt to stop the svc
- ctx, ok := svc.doStop()
- if !ok {
- return false
- }
-
- defer func() {
- // Get svc lock
- svc.mutex.Lock()
-
- // Wait until stopped
- svc.wait.Lock()
- svc.wait.Unlock()
+ return svc.must_get().stop()
+}
- // Reset the svc
- svc.ctx = nil
- svc.state = 0
- svc.mutex.Unlock()
- }()
+// Running returns if Service is running (i.e. NOT fully stopped, but may be *stopping*).
+func (svc *Service) Running() bool {
+ return svc.must_get().running()
+}
- // Cancel ctx
- close(ctx)
+// Done returns a channel that's closed when Service.Stop() is called. It is
+// the same channel provided to the currently running service function.
+func (svc *Service) Done() <-chan struct{} {
+ return svc.must_get().done
+}
- return true
+func (svc *Service) start() (*svc_instance, bool) {
+ ptr := svc.must_get()
+ return ptr, ptr.start()
}
-// While allows you to execute given function guaranteed within current
-// service state. Please note that this will hold the underlying service
-// state change mutex open while executing the function.
-func (svc *Service) While(fn func()) {
- // Protect state change
- svc.mutex.Lock()
- defer svc.mutex.Unlock()
+func (svc *Service) on_done(ptr *svc_instance) {
+ // Ensure stopped.
+ ptr.stop_private()
- // Run
- fn()
+ // Free service.
+ svc.p.Store(nil)
}
-// doStart will safely set Service state to started, returning a ptr to this context insance.
-func (svc *Service) doStart() (chan struct{}, bool) {
- // Protect startup
- svc.mutex.Lock()
+func (svc *Service) must_get() *svc_instance {
+ var newptr *svc_instance
- if svc.ctx == nil {
- // this will only have been allocated
- // if svc.Done() was already called.
- svc.ctx = make(chan struct{})
- }
-
- // Take our own ptr
- ctx := svc.ctx
+ for {
+ // Try to load existing instance.
+ ptr := (*svc_instance)(svc.p.Load())
+ if ptr != nil {
+ return ptr
+ }
- if svc.state != 0 {
- // State was not stopped.
- svc.mutex.Unlock()
- return ctx, false
- }
+ if newptr == nil {
+ // Allocate new instance.
+ newptr = new(svc_instance)
+ newptr.done = make(chan struct{})
+ }
- // Set started.
- svc.state = 1
+ // Attempt to acquire slot by setting our ptr.
+ if !svc.p.CAS(nil, unsafe.Pointer(newptr)) {
+ continue
+ }
- // Start waiter.
- svc.wait.Lock()
+ return newptr
+ }
+}
- // Unlock and return
- svc.mutex.Unlock()
- return ctx, true
+type svc_instance struct {
+ wait sync.WaitGroup
+ done chan struct{}
+ state atomic.Uint32
}
-// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
-func (svc *Service) doStop() (chan struct{}, bool) {
- // Protect stop
- svc.mutex.Lock()
+const (
+ started_bit = uint32(1) << 0
+ stopping_bit = uint32(1) << 1
+ finished_bit = uint32(1) << 2
+)
- if svc.state != 1 /* not started */ {
- svc.mutex.Unlock()
- return nil, false
- }
+func (i *svc_instance) start() (ok bool) {
+ // Acquire start by setting 'started' bit.
+ switch old := i.state.Or(started_bit); {
- // state stopping
- svc.state = 2
+ case old&finished_bit != 0:
+ // Already finished.
- // Take our own ptr
- // and unlock state
- ctx := svc.ctx
- svc.mutex.Unlock()
+ case old&started_bit == 0:
+ // Successfully started!
+ i.wait.Add(1)
+ ok = true
+ }
- return ctx, true
+ return
}
-// Running returns if Service is running (i.e. state NOT stopped / stopping).
-func (svc *Service) Running() bool {
- svc.mutex.Lock()
- state := svc.state
- svc.mutex.Unlock()
- return (state == 1)
-}
+// NOTE: MAY ONLY BE CALLED BY STARTING GOROUTINE.
+func (i *svc_instance) stop_private() {
+ // Attempt set both stopping and finished bits.
+ old := i.state.Or(stopping_bit | finished_bit)
-// Done returns a channel that's closed when Service.Stop() is called. It is
-// the same channel provided to the currently running service function.
-func (svc *Service) Done() <-chan struct{} {
- var done <-chan struct{}
-
- svc.mutex.Lock()
- switch svc.state {
- // stopped
- case 0:
- if svc.ctx == nil {
- // here we create a new context so that the
- // returned 'done' channel here will still
- // be valid for when Service is next started.
- svc.ctx = make(chan struct{})
- }
- done = svc.ctx
+ // Only if we weren't already
+ // stopping do we close channel.
+ if old&stopping_bit == 0 {
+ close(i.done)
+ }
- // started
- case 1:
- done = svc.ctx
+ // Release
+ // waiters.
+ i.wait.Done()
+}
- // stopping
- case 2:
- done = svc.ctx
+func (i *svc_instance) stop() (ok bool) {
+ // Attempt to set the 'stopping' bit.
+ switch old := i.state.Or(stopping_bit); {
+
+ case old&finished_bit != 0:
+ // Already finished.
+ return
+
+ case old&started_bit == 0:
+ // This was never started
+ // to begin with, just mark
+ // as fully finished here.
+ _ = i.state.Or(finished_bit)
+ return
+
+ case old&stopping_bit == 0:
+ // We succesfully stopped
+ // instance, close channel.
+ close(i.done)
+ ok = true
}
- svc.mutex.Unlock()
- return done
+ // Wait on stop.
+ i.wait.Wait()
+ return
+}
+
+// running returns whether service was started and
+// is not yet finished. that indicates that it may
+// have been started and not yet stopped, or that
+// it was started, stopped and not yet returned.
+func (i *svc_instance) running() bool {
+ val := i.state.Load()
+ return val&started_bit != 0 &&
+ val&finished_bit == 0
}