diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/context.go | 31 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 22 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 24 | 
3 files changed, 41 insertions, 36 deletions
| diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go index 9cb6aa5f7..12f7f1a10 100644 --- a/vendor/codeberg.org/gruf/go-runners/context.go +++ b/vendor/codeberg.org/gruf/go-runners/context.go @@ -7,9 +7,9 @@ import (  // closedctx is an always closed context.  var closedctx = func() context.Context { -	ctx := make(cancelctx) +	ctx := make(chan struct{})  	close(ctx) -	return ctx +	return CancelCtx(ctx)  }()  // Closed returns an always closed context. @@ -17,24 +17,25 @@ func Closed() context.Context {  	return closedctx  } -// ContextWithCancel returns a new context.Context impl with cancel. -func ContextWithCancel() (context.Context, context.CancelFunc) { -	ctx := make(cancelctx) -	return ctx, func() { close(ctx) } +// CtxWithCancel returns a new context.Context impl with cancel. +func CtxWithCancel() (context.Context, context.CancelFunc) { +	ctx := make(chan struct{}) +	cncl := func() { close(ctx) } +	return CancelCtx(ctx), cncl  } -// cancelctx is the simplest possible cancellable context. -type cancelctx (chan struct{}) +// CancelCtx is the simplest possible cancellable context. +type CancelCtx (<-chan struct{}) -func (cancelctx) Deadline() (time.Time, bool) { +func (CancelCtx) Deadline() (time.Time, bool) {  	return time.Time{}, false  } -func (ctx cancelctx) Done() <-chan struct{} { +func (ctx CancelCtx) Done() <-chan struct{} {  	return ctx  } -func (ctx cancelctx) Err() error { +func (ctx CancelCtx) Err() error {  	select {  	case <-ctx:  		return context.Canceled @@ -43,11 +44,11 @@ func (ctx cancelctx) Err() error {  	}  } -func (cancelctx) Value(key interface{}) interface{} { +func (CancelCtx) Value(key interface{}) interface{} {  	return nil  } -func (ctx cancelctx) String() string { +func (ctx CancelCtx) String() string {  	var state string  	select {  	case <-ctx: @@ -55,9 +56,9 @@ func (ctx cancelctx) String() string {  	default:  		state = "open"  	} -	return "cancelctx{state:" + state + "}" +	return "CancelCtx{state:" + state + "}"  } -func (ctx cancelctx) GoString() string { +func (ctx CancelCtx) GoString() string {  	return "runners." + ctx.String()  } diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index 1d83e85c7..16222b2e1 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -67,20 +67,14 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {  			go func() {  				defer wait.Done() -				// Run worker function. -				for !worker_run(ctx, fns) { -					// retry on panic +				// Run worker function (retry on panic) +				for !worker_run(CancelCtx(ctx), fns) {  				}  			}()  		} -		// Set GC finalizer to stop pool on dealloc. -		runtime.SetFinalizer(pool, func(pool *WorkerPool) { -			_ = pool.svc.Stop() -		}) -  		// Wait on ctx -		<-ctx.Done() +		<-ctx  		// Drain function queue.  		// @@ -110,6 +104,16 @@ func (pool *WorkerPool) Stop() bool {  	return pool.svc.Stop()  } +// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping). +func (pool *WorkerPool) Running() bool { +	return pool.svc.Running() +} + +// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions. +func (pool *WorkerPool) Done() <-chan struct{} { +	return pool.svc.Done() +} +  // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.  // This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be  // executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index 2c9be8225..c019a10f6 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -8,10 +8,10 @@ import (  // Service provides a means of tracking a single long-running service, provided protected state  // changes and preventing multiple instances running. Also providing service state information.  type Service struct { -	state uint32     // 0=stopped, 1=running, 2=stopping -	mutex sync.Mutex // mutext protects overall state changes -	wait  sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' -	ctx   cancelctx  // ctx is the current context for running function (or nil if not running) +	state uint32        // 0=stopped, 1=running, 2=stopping +	mutex sync.Mutex    // mutext protects overall state changes +	wait  sync.Mutex    // wait is used as a single-entity wait-group, only ever locked within 'mutex' +	ctx   chan struct{} // ctx is the current context for running function (or nil if not running)  }  // Run will run the supplied function until completion, using given context to propagate cancel. @@ -31,8 +31,8 @@ func (svc *Service) Run(fn func(context.Context)) bool {  		_ = svc.Stop()  	}() -	// Run -	fn(ctx) +	// Run with context. +	fn(CancelCtx(ctx))  	return true  } @@ -55,8 +55,8 @@ func (svc *Service) GoRun(fn func(context.Context)) bool {  			_ = svc.Stop()  		}() -		// Run -		fn(ctx) +		// Run with context. +		fn(CancelCtx(ctx))  	}()  	return true @@ -104,7 +104,7 @@ func (svc *Service) While(fn func()) {  }  // doStart will safely set Service state to started, returning a ptr to this context insance. -func (svc *Service) doStart() (cancelctx, bool) { +func (svc *Service) doStart() (chan struct{}, bool) {  	// Protect startup  	svc.mutex.Lock() @@ -119,7 +119,7 @@ func (svc *Service) doStart() (cancelctx, bool) {  	if svc.ctx == nil {  		// this will only have been allocated  		// if svc.Done() was already called. -		svc.ctx = make(cancelctx) +		svc.ctx = make(chan struct{})  	}  	// Start the waiter @@ -134,7 +134,7 @@ func (svc *Service) doStart() (cancelctx, bool) {  }  // doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance. -func (svc *Service) doStop() (cancelctx, bool) { +func (svc *Service) doStop() (chan struct{}, bool) {  	// Protect stop  	svc.mutex.Lock() @@ -175,7 +175,7 @@ func (svc *Service) Done() <-chan struct{} {  			// here we create a new context so that the  			// returned 'done' channel here will still  			// be valid for when Service is next started. -			svc.ctx = make(cancelctx) +			svc.ctx = make(chan struct{})  		}  		done = svc.ctx | 
