diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 28 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/process.go | 75 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/run.go | 124 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 52 | 
4 files changed, 141 insertions, 138 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index 16222b2e1..3d9105986 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -157,6 +157,34 @@ func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {  	}  } +// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case +// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue(). +// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed. +func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) { +	// Check valid fn +	if fn == nil { +		return false +	} + +	select { +	case <-ctx.Done(): +		// We failed to add this entry to the worker queue before the +		// incoming context was cancelled. So to ensure processing +		// we simply queue it asynchronously and return early to caller. +		go pool.Enqueue(fn) +		return false + +	case <-pool.svc.Done(): +		// Pool ctx cancelled +		fn(closedctx) +		return false + +	case pool.fns <- fn: +		// Placed fn in queue +		return true +	} +} +  // EnqueueNow attempts Enqueue but returns false if not executed.  func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {  	// Check valid fn diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go new file mode 100644 index 000000000..908e6edca --- /dev/null +++ b/vendor/codeberg.org/gruf/go-runners/process.go @@ -0,0 +1,75 @@ +package runners + +import ( +	"fmt" +	"sync" +) + +// Processable defines a runnable process with error return +// that can be passed to a Processor instance for managed running. +type Processable func() error + +// Processor acts similarly to a sync.Once object, except that it is reusable. After +// the first call to Process(), any further calls before this first has returned will +// block until the first call has returned, and return the same error. This ensures +// that only a single instance of it is ever running at any one time. +type Processor struct { +	mutex sync.Mutex +	state uint32 +	wait  sync.WaitGroup +	err   *error +} + +// Process will process the given function if first-call, else blocking until +// the first function has returned, returning the same error result. +func (p *Processor) Process(proc Processable) (err error) { +	// Acquire state lock. +	p.mutex.Lock() + +	if p.state != 0 { +		// Already running. +		// +		// Get current err ptr. +		errPtr := p.err + +		// Wait until finish. +		p.mutex.Unlock() +		p.wait.Wait() +		return *errPtr +	} + +	// Reset error ptr. +	p.err = new(error) + +	// Set started. +	p.wait.Add(1) +	p.state = 1 +	p.mutex.Unlock() + +	defer func() { +		if r := recover(); r != nil { +			if err != nil { +				rOld := r // wrap the panic so we don't lose existing returned error +				r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) +			} + +			// Catch any panics and wrap as error. +			err = fmt.Errorf("caught panic: %v", r) +		} + +		// Store error. +		*p.err = err + +		// Mark done. +		p.wait.Done() + +		// Set stopped. +		p.mutex.Lock() +		p.state = 0 +		p.mutex.Unlock() +	}() + +	// Run process. +	err = proc() +	return +} diff --git a/vendor/codeberg.org/gruf/go-runners/run.go b/vendor/codeberg.org/gruf/go-runners/run.go deleted file mode 100644 index 67d19b40c..000000000 --- a/vendor/codeberg.org/gruf/go-runners/run.go +++ /dev/null @@ -1,124 +0,0 @@ -package runners - -import ( -	"context" -	"errors" -	"fmt" -	"time" - -	"codeberg.org/gruf/go-atomics" -) - -// FuncRunner provides a means of managing long-running functions e.g. main logic loops. -type FuncRunner struct { -	// HandOff is the time after which a blocking function will be considered handed off -	HandOff time.Duration - -	// ErrorHandler is the function that errors are passed to when encountered by the -	// provided function. This can be used both for logging, and for error filtering -	ErrorHandler func(err error) error - -	svc Service // underlying service to manage start/stop -	err atomics.Error -} - -// Go will attempt to run 'fn' asynchronously. The provided context is used to propagate requested -// cancel if FuncRunner.Stop() is called. Any returned error will be passed to FuncRunner.ErrorHandler -// for filtering/logging/etc. Any blocking functions will be waited on for FuncRunner.HandOff amount of -// time before considering the function as handed off. Returned bool is success state, i.e. returns true -// if function is successfully handed off or returns within hand off time with nil error. -func (r *FuncRunner) Go(fn func(ctx context.Context) error) bool { -	var has bool - -	done := make(chan struct{}) - -	go func() { -		var cancelled bool - -		has = r.svc.Run(func(ctx context.Context) { -			// reset error -			r.err.Store(nil) - -			// Run supplied func and set errror if returned -			if err := Run(func() error { return fn(ctx) }); err != nil { -				r.err.Store(err) -			} - -			// signal done -			close(done) - -			// Check if cancelled -			select { -			case <-ctx.Done(): -				cancelled = true -			default: -				cancelled = false -			} -		}) - -		switch has { -		// returned after starting -		case true: -			// Load set error -			err := r.err.Load() - -			// filter out errors due FuncRunner.Stop() being called -			if cancelled && errors.Is(err, context.Canceled) { -				// filter out errors from FuncRunner.Stop() being called -				r.err.Store(nil) -			} else if err != nil && r.ErrorHandler != nil { -				// pass any non-nil error to set handler -				r.err.Store(r.ErrorHandler(err)) -			} - -		// already running -		case false: -			close(done) -		} -	}() - -	// get valid handoff to use -	handoff := r.HandOff -	if handoff < 1 { -		handoff = time.Second * 5 -	} - -	select { -	// handed off (long-run successful) -	case <-time.After(handoff): -		return true - -	// 'fn' returned, check error -	case <-done: -		return has -	} -} - -// Stop will cancel the context supplied to the running function. -func (r *FuncRunner) Stop() bool { -	return r.svc.Stop() -} - -// Err returns the last-set error value. -func (r *FuncRunner) Err() error { -	return r.err.Load() -} - -// Run will execute the supplied 'fn' catching any panics. Returns either function-returned error or formatted panic. -func Run(fn func() error) (err error) { -	defer func() { -		if r := recover(); r != nil { -			if e, ok := r.(error); ok { -				// wrap and preserve existing error -				err = fmt.Errorf("caught panic: %w", e) -			} else { -				// simply create new error fromt iface -				err = fmt.Errorf("caught panic: %v", r) -			} -		} -	}() - -	// run supplied func -	err = fn() -	return -} diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index c019a10f6..8a7c0051a 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -9,7 +9,7 @@ import (  // changes and preventing multiple instances running. Also providing service state information.  type Service struct {  	state uint32        // 0=stopped, 1=running, 2=stopping -	mutex sync.Mutex    // mutext protects overall state changes +	mutex sync.Mutex    // mutex protects overall state changes  	wait  sync.Mutex    // wait is used as a single-entity wait-group, only ever locked within 'mutex'  	ctx   chan struct{} // ctx is the current context for running function (or nil if not running)  } @@ -62,6 +62,29 @@ func (svc *Service) GoRun(fn func(context.Context)) bool {  	return true  } +// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns. +func (svc *Service) RunWait(fn func(context.Context)) bool { +	// Attempt to start the svc +	ctx, ok := svc.doStart() +	if !ok { +		<-ctx // block +		return false +	} + +	defer func() { +		// unlock single wait +		svc.wait.Unlock() + +		// ensure stopped +		_ = svc.Stop() +	}() + +	// Run with context. +	fn(CancelCtx(ctx)) + +	return true +} +  // Stop will attempt to stop the service, cancelling the running function's context. Immediately  // returns false if not running, and true only after Service is fully stopped.  func (svc *Service) Stop() bool { @@ -108,28 +131,29 @@ func (svc *Service) doStart() (chan struct{}, bool) {  	// Protect startup  	svc.mutex.Lock() -	if svc.state != 0 /* not stopped */ { -		svc.mutex.Unlock() -		return nil, false -	} - -	// state started -	svc.state = 1 -  	if svc.ctx == nil {  		// this will only have been allocated  		// if svc.Done() was already called.  		svc.ctx = make(chan struct{})  	} -	// Start the waiter -	svc.wait.Lock() -  	// Take our own ptr -	// and unlock state  	ctx := svc.ctx -	svc.mutex.Unlock() +	if svc.state != 0 { +		// State was not stopped. +		svc.mutex.Unlock() +		return ctx, false +	} + +	// Set started. +	svc.state = 1 + +	// Start waiter. +	svc.wait.Lock() + +	// Unlock and return +	svc.mutex.Unlock()  	return ctx, true  }  | 
