diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/context.go | 9 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pointer.go | 22 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 292 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/process.go | 116 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 311 |
5 files changed, 241 insertions, 509 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go index 12f7f1a10..e02dcab22 100644 --- a/vendor/codeberg.org/gruf/go-runners/context.go +++ b/vendor/codeberg.org/gruf/go-runners/context.go @@ -2,6 +2,7 @@ package runners import ( "context" + "sync/atomic" "time" ) @@ -19,9 +20,13 @@ func Closed() context.Context { // CtxWithCancel returns a new context.Context impl with cancel. func CtxWithCancel() (context.Context, context.CancelFunc) { + var once atomic.Uint32 ctx := make(chan struct{}) - cncl := func() { close(ctx) } - return CancelCtx(ctx), cncl + return CancelCtx(ctx), func() { + if once.CompareAndSwap(0, 1) { + close(ctx) + } + } } // CancelCtx is the simplest possible cancellable context. diff --git a/vendor/codeberg.org/gruf/go-runners/pointer.go b/vendor/codeberg.org/gruf/go-runners/pointer.go new file mode 100644 index 000000000..cc139309f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-runners/pointer.go @@ -0,0 +1,22 @@ +package runners + +import ( + "sync/atomic" + "unsafe" +) + +// atomic_pointer wraps an unsafe.Pointer with +// receiver methods for their atomic counterparts. +type atomic_pointer struct{ p unsafe.Pointer } + +func (p *atomic_pointer) Load() unsafe.Pointer { + return atomic.LoadPointer(&p.p) +} + +func (p *atomic_pointer) Store(ptr unsafe.Pointer) { + atomic.StorePointer(&p.p, ptr) +} + +func (p *atomic_pointer) CAS(old, new unsafe.Pointer) bool { + return atomic.CompareAndSwapPointer(&p.p, old, new) +} diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go deleted file mode 100644 index 644cde0b9..000000000 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ /dev/null @@ -1,292 +0,0 @@ -package runners - -import ( - "context" - "fmt" - "os" - "runtime" - "sync" - - "codeberg.org/gruf/go-errors/v2" -) - -// WorkerFunc represents a function processable by a worker in WorkerPool. Note -// that implementations absolutely MUST check whether passed context is <-ctx.Done() -// otherwise stopping the pool may block indefinitely. -type WorkerFunc func(context.Context) - -// WorkerPool provides a means of enqueuing asynchronous work. -type WorkerPool struct { - fns chan WorkerFunc - svc Service -} - -// Start will start the main WorkerPool management loop in a new goroutine, along -// with requested number of child worker goroutines. Returns false if already running. -func (pool *WorkerPool) Start(workers int, queue int) bool { - // Attempt to start the svc - ctx, ok := pool.svc.doStart() - if !ok { - return false - } - - if workers <= 0 { - // Use $GOMAXPROCS as default. - workers = runtime.GOMAXPROCS(0) - } - - if queue < 0 { - // Use reasonable queue default. - queue = workers * 10 - } - - // Allocate pool queue of given size. - // - // This MUST be set BEFORE we return and NOT in - // the launched goroutine, or there is a risk that - // the pool may appear as closed for a short time - // until the main goroutine has been entered. - fns := make(chan WorkerFunc, queue) - pool.fns = fns - - go func() { - defer func() { - // unlock single wait - pool.svc.wait.Unlock() - - // ensure stopped - pool.svc.Stop() - }() - - var wait sync.WaitGroup - - // Start goroutine worker functions - for i := 0; i < workers; i++ { - wait.Add(1) - - go func() { - defer wait.Done() - - // Run worker function (retry on panic) - for !worker_run(CancelCtx(ctx), fns) { - } - }() - } - - // Wait on ctx - <-ctx - - // Drain function queue. - // - // All functions in the queue MUST be - // run, so we pass them a closed context. - // - // This mainly allows us to block until - // the function queue is empty, as worker - // functions will also continue draining in - // the background with the (now) closed ctx. - for !drain_queue(fns) { - // retry on panic - } - - // Now the queue is empty, we can - // safely close the channel signalling - // all of the workers to return. - close(fns) - wait.Wait() - }() - - return true -} - -// Stop will stop the WorkerPool management loop, blocking until stopped. -func (pool *WorkerPool) Stop() bool { - return pool.svc.Stop() -} - -// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping). -func (pool *WorkerPool) Running() bool { - return pool.svc.Running() -} - -// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions. -func (pool *WorkerPool) Done() <-chan struct{} { - return pool.svc.Done() -} - -// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. -// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be -// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. -// WorkerFuncs MUST respect the passed context. -func (pool *WorkerPool) Enqueue(fn WorkerFunc) { - // Check valid fn - if fn == nil { - return - } - - select { - // Pool ctx cancelled - case <-pool.svc.Done(): - fn(closedctx) - - // Placed fn in queue - case pool.fns <- fn: - } -} - -// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the -// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. -func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool { - // Check valid fn - if fn == nil { - return false - } - - select { - // Caller ctx cancelled - case <-ctx.Done(): - return false - - // Pool ctx cancelled - case <-pool.svc.Done(): - return false - - // Placed fn in queue - case pool.fns <- fn: - return true - } -} - -// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case -// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue(). -// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed. -func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) { - // Check valid fn - if fn == nil { - return false - } - - select { - case <-ctx.Done(): - // We failed to add this entry to the worker queue before the - // incoming context was cancelled. So to ensure processing - // we simply queue it asynchronously and return early to caller. - go pool.Enqueue(fn) - return false - - case <-pool.svc.Done(): - // Pool ctx cancelled - fn(closedctx) - return false - - case pool.fns <- fn: - // Placed fn in queue - return true - } -} - -// EnqueueNow attempts Enqueue but returns false if not executed. -func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { - // Check valid fn - if fn == nil { - return false - } - - select { - // Pool ctx cancelled - case <-pool.svc.Done(): - return false - - // Placed fn in queue - case pool.fns <- fn: - return true - - // Queue is full - default: - return false - } -} - -// Queue returns the number of currently queued WorkerFuncs. -func (pool *WorkerPool) Queue() int { - var l int - pool.svc.While(func() { - l = len(pool.fns) - }) - return l -} - -// worker_run is the main worker routine, accepting functions from 'fns' until it is closed. -func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool { - defer func() { - // Recover and drop any panic - if r := recover(); r != nil { - - // Gather calling func frames. - pcs := make([]uintptr, 10) - n := runtime.Callers(3, pcs) - i := runtime.CallersFrames(pcs[:n]) - c := gatherFrames(i, n) - - const msg = "worker_run: recovered panic: %v\n\n%s\n" - fmt.Fprintf(os.Stderr, msg, r, c.String()) - } - }() - - for { - // Wait on next func - fn, ok := <-fns - if !ok { - return true - } - - // Run with ctx - fn(ctx) - } -} - -// drain_queue will drain and run all functions in worker queue, passing in a closed context. -func drain_queue(fns <-chan WorkerFunc) bool { - defer func() { - // Recover and drop any panic - if r := recover(); r != nil { - - // Gather calling func frames. - pcs := make([]uintptr, 10) - n := runtime.Callers(3, pcs) - i := runtime.CallersFrames(pcs[:n]) - c := gatherFrames(i, n) - - const msg = "worker_run: recovered panic: %v\n\n%s\n" - fmt.Fprintf(os.Stderr, msg, r, c.String()) - } - }() - - for { - select { - // Run with closed ctx - case fn := <-fns: - fn(closedctx) - - // Queue is empty - default: - return true - } - } -} - -// gatherFrames collates runtime frames from a frame iterator. -func gatherFrames(iter *runtime.Frames, n int) errors.Callers { - if iter == nil { - return nil - } - frames := make([]runtime.Frame, 0, n) - for { - f, ok := iter.Next() - if !ok { - break - } - frames = append(frames, f) - } - return frames -} diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go index ca39ac0d0..3feeff258 100644 --- a/vendor/codeberg.org/gruf/go-runners/process.go +++ b/vendor/codeberg.org/gruf/go-runners/process.go @@ -2,80 +2,72 @@ package runners import ( "fmt" + "unsafe" + "sync" ) -// Processable defines a runnable process with error return -// that can be passed to a Processor instance for managed running. -type Processable func() error - // Processor acts similarly to a sync.Once object, except that it is reusable. After // the first call to Process(), any further calls before this first has returned will // block until the first call has returned, and return the same error. This ensures // that only a single instance of it is ever running at any one time. -type Processor struct { - mutex sync.Mutex - wait *sync.WaitGroup - err *error -} +type Processor struct{ p atomic_pointer } // Process will process the given function if first-call, else blocking until // the first function has returned, returning the same error result. -func (p *Processor) Process(proc Processable) (err error) { - // Acquire state lock. - p.mutex.Lock() - - if p.wait != nil { - // Already running. - // - // Get current ptrs. - waitPtr := p.wait - errPtr := p.err - - // Free state lock. - p.mutex.Unlock() - - // Wait for finish. - waitPtr.Wait() - return *errPtr - } - - // Alloc waiter for new process. - var wait sync.WaitGroup - - // No need to alloc new error as - // we use the alloc'd named error - // return required for panic handling. - - // Reset ptrs. - p.wait = &wait - p.err = &err - - // Set started. - wait.Add(1) - p.mutex.Unlock() - - defer func() { - if r := recover(); r != nil { - if err != nil { - rOld := r // wrap the panic so we don't lose existing returned error - r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) - } - - // Catch any panics and wrap as error. - err = fmt.Errorf("caught panic: %v", r) +func (p *Processor) Process(proc func() error) (err error) { + var i *proc_instance + + for { + // Attempt to load existing instance. + ptr := (*proc_instance)(p.p.Load()) + if ptr != nil { + + // Wait on existing. + ptr.wait.Wait() + err = ptr.err + return } - // Mark done. - wait.Done() + if i == nil { + // Allocate instance. + i = new(proc_instance) + i.wait.Add(1) + } - // Set stopped. - p.mutex.Lock() - p.wait = nil - p.mutex.Unlock() - }() + // Try to acquire start slot by + // setting ptr to *our* instance. + if p.p.CAS(nil, unsafe.Pointer(i)) { + defer func() { + if r := recover(); r != nil { + if i.err != nil { + rOld := r // wrap the panic so we don't lose existing returned error + r = fmt.Errorf("panic occured after error %q: %v", i.err.Error(), rOld) + } + + // Catch panics and wrap as error return. + i.err = fmt.Errorf("caught panic: %v", r) + } + + // Set return. + err = i.err + + // Release the + // goroutines. + i.wait.Done() + + // Free processor. + p.p.Store(nil) + }() + + // Run func. + i.err = proc() + return + } + } +} - // Run process. - err = proc() - return +type proc_instance struct { + wait sync.WaitGroup + err error } diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index 8a7c0051a..fe41807f9 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -3,215 +3,220 @@ package runners import ( "context" "sync" + "sync/atomic" + "unsafe" ) -// Service provides a means of tracking a single long-running service, provided protected state -// changes and preventing multiple instances running. Also providing service state information. -type Service struct { - state uint32 // 0=stopped, 1=running, 2=stopping - mutex sync.Mutex // mutex protects overall state changes - wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' - ctx chan struct{} // ctx is the current context for running function (or nil if not running) -} +// Service provides a means of tracking a single long-running Service, provided protected state +// changes and preventing multiple instances running. Also providing Service state information. +type Service struct{ p atomic_pointer } // Run will run the supplied function until completion, using given context to propagate cancel. // Immediately returns false if the Service is already running, and true after completed run. -func (svc *Service) Run(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) Run(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) - - return true + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return } // GoRun will run the supplied function until completion in a goroutine, using given context to // propagate cancel. Immediately returns boolean indicating success, or that service is already running. -func (svc *Service) GoRun(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) GoRun(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } go func() { - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) }() - return true + return } // RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns. -func (svc *Service) RunWait(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) RunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - <-ctx // block - return false + <-ptr.done + return } - defer func() { - // unlock single wait - svc.wait.Unlock() + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return +} - // ensure stopped - _ = svc.Stop() - }() +// GoRunWait is functionally the same as .RunWait(), but blocks until the first instance of RunWait() returns. +func (svc *Service) GoRunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance - // Run with context. - fn(CancelCtx(ctx)) + // Attempt to start. + ptr, ok = svc.start() + if !ok { + <-ptr.done + return + } + + go func() { + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + }() - return true + return } // Stop will attempt to stop the service, cancelling the running function's context. Immediately // returns false if not running, and true only after Service is fully stopped. func (svc *Service) Stop() bool { - // Attempt to stop the svc - ctx, ok := svc.doStop() - if !ok { - return false - } - - defer func() { - // Get svc lock - svc.mutex.Lock() - - // Wait until stopped - svc.wait.Lock() - svc.wait.Unlock() + return svc.must_get().stop() +} - // Reset the svc - svc.ctx = nil - svc.state = 0 - svc.mutex.Unlock() - }() +// Running returns if Service is running (i.e. NOT fully stopped, but may be *stopping*). +func (svc *Service) Running() bool { + return svc.must_get().running() +} - // Cancel ctx - close(ctx) +// Done returns a channel that's closed when Service.Stop() is called. It is +// the same channel provided to the currently running service function. +func (svc *Service) Done() <-chan struct{} { + return svc.must_get().done +} - return true +func (svc *Service) start() (*svc_instance, bool) { + ptr := svc.must_get() + return ptr, ptr.start() } -// While allows you to execute given function guaranteed within current -// service state. Please note that this will hold the underlying service -// state change mutex open while executing the function. -func (svc *Service) While(fn func()) { - // Protect state change - svc.mutex.Lock() - defer svc.mutex.Unlock() +func (svc *Service) on_done(ptr *svc_instance) { + // Ensure stopped. + ptr.stop_private() - // Run - fn() + // Free service. + svc.p.Store(nil) } -// doStart will safely set Service state to started, returning a ptr to this context insance. -func (svc *Service) doStart() (chan struct{}, bool) { - // Protect startup - svc.mutex.Lock() +func (svc *Service) must_get() *svc_instance { + var newptr *svc_instance - if svc.ctx == nil { - // this will only have been allocated - // if svc.Done() was already called. - svc.ctx = make(chan struct{}) - } - - // Take our own ptr - ctx := svc.ctx + for { + // Try to load existing instance. + ptr := (*svc_instance)(svc.p.Load()) + if ptr != nil { + return ptr + } - if svc.state != 0 { - // State was not stopped. - svc.mutex.Unlock() - return ctx, false - } + if newptr == nil { + // Allocate new instance. + newptr = new(svc_instance) + newptr.done = make(chan struct{}) + } - // Set started. - svc.state = 1 + // Attempt to acquire slot by setting our ptr. + if !svc.p.CAS(nil, unsafe.Pointer(newptr)) { + continue + } - // Start waiter. - svc.wait.Lock() + return newptr + } +} - // Unlock and return - svc.mutex.Unlock() - return ctx, true +type svc_instance struct { + wait sync.WaitGroup + done chan struct{} + state atomic.Uint32 } -// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance. -func (svc *Service) doStop() (chan struct{}, bool) { - // Protect stop - svc.mutex.Lock() +const ( + started_bit = uint32(1) << 0 + stopping_bit = uint32(1) << 1 + finished_bit = uint32(1) << 2 +) - if svc.state != 1 /* not started */ { - svc.mutex.Unlock() - return nil, false - } +func (i *svc_instance) start() (ok bool) { + // Acquire start by setting 'started' bit. + switch old := i.state.Or(started_bit); { - // state stopping - svc.state = 2 + case old&finished_bit != 0: + // Already finished. - // Take our own ptr - // and unlock state - ctx := svc.ctx - svc.mutex.Unlock() + case old&started_bit == 0: + // Successfully started! + i.wait.Add(1) + ok = true + } - return ctx, true + return } -// Running returns if Service is running (i.e. state NOT stopped / stopping). -func (svc *Service) Running() bool { - svc.mutex.Lock() - state := svc.state - svc.mutex.Unlock() - return (state == 1) -} +// NOTE: MAY ONLY BE CALLED BY STARTING GOROUTINE. +func (i *svc_instance) stop_private() { + // Attempt set both stopping and finished bits. + old := i.state.Or(stopping_bit | finished_bit) -// Done returns a channel that's closed when Service.Stop() is called. It is -// the same channel provided to the currently running service function. -func (svc *Service) Done() <-chan struct{} { - var done <-chan struct{} - - svc.mutex.Lock() - switch svc.state { - // stopped - case 0: - if svc.ctx == nil { - // here we create a new context so that the - // returned 'done' channel here will still - // be valid for when Service is next started. - svc.ctx = make(chan struct{}) - } - done = svc.ctx + // Only if we weren't already + // stopping do we close channel. + if old&stopping_bit == 0 { + close(i.done) + } - // started - case 1: - done = svc.ctx + // Release + // waiters. + i.wait.Done() +} - // stopping - case 2: - done = svc.ctx +func (i *svc_instance) stop() (ok bool) { + // Attempt to set the 'stopping' bit. + switch old := i.state.Or(stopping_bit); { + + case old&finished_bit != 0: + // Already finished. + return + + case old&started_bit == 0: + // This was never started + // to begin with, just mark + // as fully finished here. + _ = i.state.Or(finished_bit) + return + + case old&stopping_bit == 0: + // We succesfully stopped + // instance, close channel. + close(i.done) + ok = true } - svc.mutex.Unlock() - return done + // Wait on stop. + i.wait.Wait() + return +} + +// running returns whether service was started and +// is not yet finished. that indicates that it may +// have been started and not yet stopped, or that +// it was started, stopped and not yet returned. +func (i *svc_instance) running() bool { + val := i.state.Load() + return val&started_bit != 0 && + val&finished_bit == 0 } |
