summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-runners
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners')
-rw-r--r--vendor/codeberg.org/gruf/go-runners/LICENSE9
-rw-r--r--vendor/codeberg.org/gruf/go-runners/README.md3
-rw-r--r--vendor/codeberg.org/gruf/go-runners/context.go64
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go292
-rw-r--r--vendor/codeberg.org/gruf/go-runners/process.go81
-rw-r--r--vendor/codeberg.org/gruf/go-runners/service.go217
6 files changed, 0 insertions, 666 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/LICENSE b/vendor/codeberg.org/gruf/go-runners/LICENSE
deleted file mode 100644
index b7c4417ac..000000000
--- a/vendor/codeberg.org/gruf/go-runners/LICENSE
+++ /dev/null
@@ -1,9 +0,0 @@
-MIT License
-
-Copyright (c) 2021 gruf
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/codeberg.org/gruf/go-runners/README.md b/vendor/codeberg.org/gruf/go-runners/README.md
deleted file mode 100644
index 91cc1528d..000000000
--- a/vendor/codeberg.org/gruf/go-runners/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-# go-runners
-
-Provides a means a simple means of managing long-running functions and services \ No newline at end of file
diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go
deleted file mode 100644
index 12f7f1a10..000000000
--- a/vendor/codeberg.org/gruf/go-runners/context.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package runners
-
-import (
- "context"
- "time"
-)
-
-// closedctx is an always closed context.
-var closedctx = func() context.Context {
- ctx := make(chan struct{})
- close(ctx)
- return CancelCtx(ctx)
-}()
-
-// Closed returns an always closed context.
-func Closed() context.Context {
- return closedctx
-}
-
-// CtxWithCancel returns a new context.Context impl with cancel.
-func CtxWithCancel() (context.Context, context.CancelFunc) {
- ctx := make(chan struct{})
- cncl := func() { close(ctx) }
- return CancelCtx(ctx), cncl
-}
-
-// CancelCtx is the simplest possible cancellable context.
-type CancelCtx (<-chan struct{})
-
-func (CancelCtx) Deadline() (time.Time, bool) {
- return time.Time{}, false
-}
-
-func (ctx CancelCtx) Done() <-chan struct{} {
- return ctx
-}
-
-func (ctx CancelCtx) Err() error {
- select {
- case <-ctx:
- return context.Canceled
- default:
- return nil
- }
-}
-
-func (CancelCtx) Value(key interface{}) interface{} {
- return nil
-}
-
-func (ctx CancelCtx) String() string {
- var state string
- select {
- case <-ctx:
- state = "closed"
- default:
- state = "open"
- }
- return "CancelCtx{state:" + state + "}"
-}
-
-func (ctx CancelCtx) GoString() string {
- return "runners." + ctx.String()
-}
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
deleted file mode 100644
index 644cde0b9..000000000
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ /dev/null
@@ -1,292 +0,0 @@
-package runners
-
-import (
- "context"
- "fmt"
- "os"
- "runtime"
- "sync"
-
- "codeberg.org/gruf/go-errors/v2"
-)
-
-// WorkerFunc represents a function processable by a worker in WorkerPool. Note
-// that implementations absolutely MUST check whether passed context is <-ctx.Done()
-// otherwise stopping the pool may block indefinitely.
-type WorkerFunc func(context.Context)
-
-// WorkerPool provides a means of enqueuing asynchronous work.
-type WorkerPool struct {
- fns chan WorkerFunc
- svc Service
-}
-
-// Start will start the main WorkerPool management loop in a new goroutine, along
-// with requested number of child worker goroutines. Returns false if already running.
-func (pool *WorkerPool) Start(workers int, queue int) bool {
- // Attempt to start the svc
- ctx, ok := pool.svc.doStart()
- if !ok {
- return false
- }
-
- if workers <= 0 {
- // Use $GOMAXPROCS as default.
- workers = runtime.GOMAXPROCS(0)
- }
-
- if queue < 0 {
- // Use reasonable queue default.
- queue = workers * 10
- }
-
- // Allocate pool queue of given size.
- //
- // This MUST be set BEFORE we return and NOT in
- // the launched goroutine, or there is a risk that
- // the pool may appear as closed for a short time
- // until the main goroutine has been entered.
- fns := make(chan WorkerFunc, queue)
- pool.fns = fns
-
- go func() {
- defer func() {
- // unlock single wait
- pool.svc.wait.Unlock()
-
- // ensure stopped
- pool.svc.Stop()
- }()
-
- var wait sync.WaitGroup
-
- // Start goroutine worker functions
- for i := 0; i < workers; i++ {
- wait.Add(1)
-
- go func() {
- defer wait.Done()
-
- // Run worker function (retry on panic)
- for !worker_run(CancelCtx(ctx), fns) {
- }
- }()
- }
-
- // Wait on ctx
- <-ctx
-
- // Drain function queue.
- //
- // All functions in the queue MUST be
- // run, so we pass them a closed context.
- //
- // This mainly allows us to block until
- // the function queue is empty, as worker
- // functions will also continue draining in
- // the background with the (now) closed ctx.
- for !drain_queue(fns) {
- // retry on panic
- }
-
- // Now the queue is empty, we can
- // safely close the channel signalling
- // all of the workers to return.
- close(fns)
- wait.Wait()
- }()
-
- return true
-}
-
-// Stop will stop the WorkerPool management loop, blocking until stopped.
-func (pool *WorkerPool) Stop() bool {
- return pool.svc.Stop()
-}
-
-// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
-func (pool *WorkerPool) Running() bool {
- return pool.svc.Running()
-}
-
-// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
-func (pool *WorkerPool) Done() <-chan struct{} {
- return pool.svc.Done()
-}
-
-// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
-// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
-// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
-// WorkerFuncs MUST respect the passed context.
-func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
- // Check valid fn
- if fn == nil {
- return
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- fn(closedctx)
-
- // Placed fn in queue
- case pool.fns <- fn:
- }
-}
-
-// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
-// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
-func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Caller ctx cancelled
- case <-ctx.Done():
- return false
-
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
- }
-}
-
-// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case
-// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue().
-// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed.
-func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- case <-ctx.Done():
- // We failed to add this entry to the worker queue before the
- // incoming context was cancelled. So to ensure processing
- // we simply queue it asynchronously and return early to caller.
- go pool.Enqueue(fn)
- return false
-
- case <-pool.svc.Done():
- // Pool ctx cancelled
- fn(closedctx)
- return false
-
- case pool.fns <- fn:
- // Placed fn in queue
- return true
- }
-}
-
-// EnqueueNow attempts Enqueue but returns false if not executed.
-func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
-
- // Queue is full
- default:
- return false
- }
-}
-
-// Queue returns the number of currently queued WorkerFuncs.
-func (pool *WorkerPool) Queue() int {
- var l int
- pool.svc.While(func() {
- l = len(pool.fns)
- })
- return l
-}
-
-// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
-func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- // Wait on next func
- fn, ok := <-fns
- if !ok {
- return true
- }
-
- // Run with ctx
- fn(ctx)
- }
-}
-
-// drain_queue will drain and run all functions in worker queue, passing in a closed context.
-func drain_queue(fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- select {
- // Run with closed ctx
- case fn := <-fns:
- fn(closedctx)
-
- // Queue is empty
- default:
- return true
- }
- }
-}
-
-// gatherFrames collates runtime frames from a frame iterator.
-func gatherFrames(iter *runtime.Frames, n int) errors.Callers {
- if iter == nil {
- return nil
- }
- frames := make([]runtime.Frame, 0, n)
- for {
- f, ok := iter.Next()
- if !ok {
- break
- }
- frames = append(frames, f)
- }
- return frames
-}
diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go
deleted file mode 100644
index ca39ac0d0..000000000
--- a/vendor/codeberg.org/gruf/go-runners/process.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package runners
-
-import (
- "fmt"
- "sync"
-)
-
-// Processable defines a runnable process with error return
-// that can be passed to a Processor instance for managed running.
-type Processable func() error
-
-// Processor acts similarly to a sync.Once object, except that it is reusable. After
-// the first call to Process(), any further calls before this first has returned will
-// block until the first call has returned, and return the same error. This ensures
-// that only a single instance of it is ever running at any one time.
-type Processor struct {
- mutex sync.Mutex
- wait *sync.WaitGroup
- err *error
-}
-
-// Process will process the given function if first-call, else blocking until
-// the first function has returned, returning the same error result.
-func (p *Processor) Process(proc Processable) (err error) {
- // Acquire state lock.
- p.mutex.Lock()
-
- if p.wait != nil {
- // Already running.
- //
- // Get current ptrs.
- waitPtr := p.wait
- errPtr := p.err
-
- // Free state lock.
- p.mutex.Unlock()
-
- // Wait for finish.
- waitPtr.Wait()
- return *errPtr
- }
-
- // Alloc waiter for new process.
- var wait sync.WaitGroup
-
- // No need to alloc new error as
- // we use the alloc'd named error
- // return required for panic handling.
-
- // Reset ptrs.
- p.wait = &wait
- p.err = &err
-
- // Set started.
- wait.Add(1)
- p.mutex.Unlock()
-
- defer func() {
- if r := recover(); r != nil {
- if err != nil {
- rOld := r // wrap the panic so we don't lose existing returned error
- r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld)
- }
-
- // Catch any panics and wrap as error.
- err = fmt.Errorf("caught panic: %v", r)
- }
-
- // Mark done.
- wait.Done()
-
- // Set stopped.
- p.mutex.Lock()
- p.wait = nil
- p.mutex.Unlock()
- }()
-
- // Run process.
- err = proc()
- return
-}
diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go
deleted file mode 100644
index 8a7c0051a..000000000
--- a/vendor/codeberg.org/gruf/go-runners/service.go
+++ /dev/null
@@ -1,217 +0,0 @@
-package runners
-
-import (
- "context"
- "sync"
-)
-
-// Service provides a means of tracking a single long-running service, provided protected state
-// changes and preventing multiple instances running. Also providing service state information.
-type Service struct {
- state uint32 // 0=stopped, 1=running, 2=stopping
- mutex sync.Mutex // mutex protects overall state changes
- wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
- ctx chan struct{} // ctx is the current context for running function (or nil if not running)
-}
-
-// Run will run the supplied function until completion, using given context to propagate cancel.
-// Immediately returns false if the Service is already running, and true after completed run.
-func (svc *Service) Run(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
- if !ok {
- return false
- }
-
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
-
- // ensure stopped
- _ = svc.Stop()
- }()
-
- // Run with context.
- fn(CancelCtx(ctx))
-
- return true
-}
-
-// GoRun will run the supplied function until completion in a goroutine, using given context to
-// propagate cancel. Immediately returns boolean indicating success, or that service is already running.
-func (svc *Service) GoRun(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
- if !ok {
- return false
- }
-
- go func() {
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
-
- // ensure stopped
- _ = svc.Stop()
- }()
-
- // Run with context.
- fn(CancelCtx(ctx))
- }()
-
- return true
-}
-
-// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
-func (svc *Service) RunWait(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
- if !ok {
- <-ctx // block
- return false
- }
-
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
-
- // ensure stopped
- _ = svc.Stop()
- }()
-
- // Run with context.
- fn(CancelCtx(ctx))
-
- return true
-}
-
-// Stop will attempt to stop the service, cancelling the running function's context. Immediately
-// returns false if not running, and true only after Service is fully stopped.
-func (svc *Service) Stop() bool {
- // Attempt to stop the svc
- ctx, ok := svc.doStop()
- if !ok {
- return false
- }
-
- defer func() {
- // Get svc lock
- svc.mutex.Lock()
-
- // Wait until stopped
- svc.wait.Lock()
- svc.wait.Unlock()
-
- // Reset the svc
- svc.ctx = nil
- svc.state = 0
- svc.mutex.Unlock()
- }()
-
- // Cancel ctx
- close(ctx)
-
- return true
-}
-
-// While allows you to execute given function guaranteed within current
-// service state. Please note that this will hold the underlying service
-// state change mutex open while executing the function.
-func (svc *Service) While(fn func()) {
- // Protect state change
- svc.mutex.Lock()
- defer svc.mutex.Unlock()
-
- // Run
- fn()
-}
-
-// doStart will safely set Service state to started, returning a ptr to this context insance.
-func (svc *Service) doStart() (chan struct{}, bool) {
- // Protect startup
- svc.mutex.Lock()
-
- if svc.ctx == nil {
- // this will only have been allocated
- // if svc.Done() was already called.
- svc.ctx = make(chan struct{})
- }
-
- // Take our own ptr
- ctx := svc.ctx
-
- if svc.state != 0 {
- // State was not stopped.
- svc.mutex.Unlock()
- return ctx, false
- }
-
- // Set started.
- svc.state = 1
-
- // Start waiter.
- svc.wait.Lock()
-
- // Unlock and return
- svc.mutex.Unlock()
- return ctx, true
-}
-
-// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
-func (svc *Service) doStop() (chan struct{}, bool) {
- // Protect stop
- svc.mutex.Lock()
-
- if svc.state != 1 /* not started */ {
- svc.mutex.Unlock()
- return nil, false
- }
-
- // state stopping
- svc.state = 2
-
- // Take our own ptr
- // and unlock state
- ctx := svc.ctx
- svc.mutex.Unlock()
-
- return ctx, true
-}
-
-// Running returns if Service is running (i.e. state NOT stopped / stopping).
-func (svc *Service) Running() bool {
- svc.mutex.Lock()
- state := svc.state
- svc.mutex.Unlock()
- return (state == 1)
-}
-
-// Done returns a channel that's closed when Service.Stop() is called. It is
-// the same channel provided to the currently running service function.
-func (svc *Service) Done() <-chan struct{} {
- var done <-chan struct{}
-
- svc.mutex.Lock()
- switch svc.state {
- // stopped
- case 0:
- if svc.ctx == nil {
- // here we create a new context so that the
- // returned 'done' channel here will still
- // be valid for when Service is next started.
- svc.ctx = make(chan struct{})
- }
- done = svc.ctx
-
- // started
- case 1:
- done = svc.ctx
-
- // stopping
- case 2:
- done = svc.ctx
- }
- svc.mutex.Unlock()
-
- return done
-}