summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r--vendor/codeberg.org/gruf/go-runners/context.go31
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go22
-rw-r--r--vendor/codeberg.org/gruf/go-runners/service.go24
3 files changed, 41 insertions, 36 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go
index 9cb6aa5f7..12f7f1a10 100644
--- a/vendor/codeberg.org/gruf/go-runners/context.go
+++ b/vendor/codeberg.org/gruf/go-runners/context.go
@@ -7,9 +7,9 @@ import (
// closedctx is an always closed context.
var closedctx = func() context.Context {
- ctx := make(cancelctx)
+ ctx := make(chan struct{})
close(ctx)
- return ctx
+ return CancelCtx(ctx)
}()
// Closed returns an always closed context.
@@ -17,24 +17,25 @@ func Closed() context.Context {
return closedctx
}
-// ContextWithCancel returns a new context.Context impl with cancel.
-func ContextWithCancel() (context.Context, context.CancelFunc) {
- ctx := make(cancelctx)
- return ctx, func() { close(ctx) }
+// CtxWithCancel returns a new context.Context impl with cancel.
+func CtxWithCancel() (context.Context, context.CancelFunc) {
+ ctx := make(chan struct{})
+ cncl := func() { close(ctx) }
+ return CancelCtx(ctx), cncl
}
-// cancelctx is the simplest possible cancellable context.
-type cancelctx (chan struct{})
+// CancelCtx is the simplest possible cancellable context.
+type CancelCtx (<-chan struct{})
-func (cancelctx) Deadline() (time.Time, bool) {
+func (CancelCtx) Deadline() (time.Time, bool) {
return time.Time{}, false
}
-func (ctx cancelctx) Done() <-chan struct{} {
+func (ctx CancelCtx) Done() <-chan struct{} {
return ctx
}
-func (ctx cancelctx) Err() error {
+func (ctx CancelCtx) Err() error {
select {
case <-ctx:
return context.Canceled
@@ -43,11 +44,11 @@ func (ctx cancelctx) Err() error {
}
}
-func (cancelctx) Value(key interface{}) interface{} {
+func (CancelCtx) Value(key interface{}) interface{} {
return nil
}
-func (ctx cancelctx) String() string {
+func (ctx CancelCtx) String() string {
var state string
select {
case <-ctx:
@@ -55,9 +56,9 @@ func (ctx cancelctx) String() string {
default:
state = "open"
}
- return "cancelctx{state:" + state + "}"
+ return "CancelCtx{state:" + state + "}"
}
-func (ctx cancelctx) GoString() string {
+func (ctx CancelCtx) GoString() string {
return "runners." + ctx.String()
}
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
index 1d83e85c7..16222b2e1 100644
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ b/vendor/codeberg.org/gruf/go-runners/pool.go
@@ -67,20 +67,14 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {
go func() {
defer wait.Done()
- // Run worker function.
- for !worker_run(ctx, fns) {
- // retry on panic
+ // Run worker function (retry on panic)
+ for !worker_run(CancelCtx(ctx), fns) {
}
}()
}
- // Set GC finalizer to stop pool on dealloc.
- runtime.SetFinalizer(pool, func(pool *WorkerPool) {
- _ = pool.svc.Stop()
- })
-
// Wait on ctx
- <-ctx.Done()
+ <-ctx
// Drain function queue.
//
@@ -110,6 +104,16 @@ func (pool *WorkerPool) Stop() bool {
return pool.svc.Stop()
}
+// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
+func (pool *WorkerPool) Running() bool {
+ return pool.svc.Running()
+}
+
+// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
+func (pool *WorkerPool) Done() <-chan struct{} {
+ return pool.svc.Done()
+}
+
// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go
index 2c9be8225..c019a10f6 100644
--- a/vendor/codeberg.org/gruf/go-runners/service.go
+++ b/vendor/codeberg.org/gruf/go-runners/service.go
@@ -8,10 +8,10 @@ import (
// Service provides a means of tracking a single long-running service, provided protected state
// changes and preventing multiple instances running. Also providing service state information.
type Service struct {
- state uint32 // 0=stopped, 1=running, 2=stopping
- mutex sync.Mutex // mutext protects overall state changes
- wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
- ctx cancelctx // ctx is the current context for running function (or nil if not running)
+ state uint32 // 0=stopped, 1=running, 2=stopping
+ mutex sync.Mutex // mutext protects overall state changes
+ wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
+ ctx chan struct{} // ctx is the current context for running function (or nil if not running)
}
// Run will run the supplied function until completion, using given context to propagate cancel.
@@ -31,8 +31,8 @@ func (svc *Service) Run(fn func(context.Context)) bool {
_ = svc.Stop()
}()
- // Run
- fn(ctx)
+ // Run with context.
+ fn(CancelCtx(ctx))
return true
}
@@ -55,8 +55,8 @@ func (svc *Service) GoRun(fn func(context.Context)) bool {
_ = svc.Stop()
}()
- // Run
- fn(ctx)
+ // Run with context.
+ fn(CancelCtx(ctx))
}()
return true
@@ -104,7 +104,7 @@ func (svc *Service) While(fn func()) {
}
// doStart will safely set Service state to started, returning a ptr to this context insance.
-func (svc *Service) doStart() (cancelctx, bool) {
+func (svc *Service) doStart() (chan struct{}, bool) {
// Protect startup
svc.mutex.Lock()
@@ -119,7 +119,7 @@ func (svc *Service) doStart() (cancelctx, bool) {
if svc.ctx == nil {
// this will only have been allocated
// if svc.Done() was already called.
- svc.ctx = make(cancelctx)
+ svc.ctx = make(chan struct{})
}
// Start the waiter
@@ -134,7 +134,7 @@ func (svc *Service) doStart() (cancelctx, bool) {
}
// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
-func (svc *Service) doStop() (cancelctx, bool) {
+func (svc *Service) doStop() (chan struct{}, bool) {
// Protect stop
svc.mutex.Lock()
@@ -175,7 +175,7 @@ func (svc *Service) Done() <-chan struct{} {
// here we create a new context so that the
// returned 'done' channel here will still
// be valid for when Service is next started.
- svc.ctx = make(cancelctx)
+ svc.ctx = make(chan struct{})
}
done = svc.ctx