summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r--vendor/codeberg.org/gruf/go-cache/v3/result/cache.go22
-rw-r--r--vendor/codeberg.org/gruf/go-cache/v3/result/key.go106
-rw-r--r--vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go8
-rw-r--r--vendor/codeberg.org/gruf/go-mangler/README.md2
-rw-r--r--vendor/codeberg.org/gruf/go-mangler/load.go38
-rw-r--r--vendor/codeberg.org/gruf/go-mangler/mangle.go64
-rw-r--r--vendor/codeberg.org/gruf/go-runners/context.go20
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go127
-rw-r--r--vendor/codeberg.org/gruf/go-runners/service.go105
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go35
10 files changed, 313 insertions, 214 deletions
diff --git a/vendor/codeberg.org/gruf/go-cache/v3/result/cache.go b/vendor/codeberg.org/gruf/go-cache/v3/result/cache.go
index 6c50e60b1..91ca2255e 100644
--- a/vendor/codeberg.org/gruf/go-cache/v3/result/cache.go
+++ b/vendor/codeberg.org/gruf/go-cache/v3/result/cache.go
@@ -56,8 +56,8 @@ func New[Value any](lookups []Lookup, copy func(Value) Value, cap int) *Cache[Va
c.lookups = make([]structKey, len(lookups))
for i, lookup := range lookups {
- // Generate keyed field info for lookup
- c.lookups[i] = genStructKey(lookup, t)
+ // Create keyed field info for lookup
+ c.lookups[i] = newStructKey(lookup, t)
}
// Create and initialize underlying cache
@@ -159,7 +159,7 @@ func (c *Cache[Value]) Load(lookup string, load func() (Value, error), keyParts
keyInfo := c.lookups.get(lookup)
// Generate cache key string.
- ckey := genKey(keyParts...)
+ ckey := keyInfo.genKey(keyParts)
// Acquire cache lock
c.cache.Lock()
@@ -248,17 +248,17 @@ func (c *Cache[Value]) Store(value Value, store func() error) error {
func (c *Cache[Value]) Has(lookup string, keyParts ...any) bool {
var res result[Value]
- // Get lookup key type by name.
- keyType := c.lookups.get(lookup)
+ // Get lookup key info by name.
+ keyInfo := c.lookups.get(lookup)
// Generate cache key string.
- ckey := genKey(keyParts...)
+ ckey := keyInfo.genKey(keyParts)
// Acquire cache lock
c.cache.Lock()
// Look for primary key for cache key
- pkey, ok := keyType.pkeys[ckey]
+ pkey, ok := keyInfo.pkeys[ckey]
if ok {
// Fetch the result for primary key
@@ -275,15 +275,15 @@ func (c *Cache[Value]) Has(lookup string, keyParts ...any) bool {
// Invalidate will invalidate any result from the cache found under given lookup and key parts.
func (c *Cache[Value]) Invalidate(lookup string, keyParts ...any) {
- // Get lookup key type by name.
- keyType := c.lookups.get(lookup)
+ // Get lookup key info by name.
+ keyInfo := c.lookups.get(lookup)
// Generate cache key string.
- ckey := genKey(keyParts...)
+ ckey := keyInfo.genKey(keyParts)
// Look for primary key for cache key
c.cache.Lock()
- pkey, ok := keyType.pkeys[ckey]
+ pkey, ok := keyInfo.pkeys[ckey]
c.cache.Unlock()
if !ok {
diff --git a/vendor/codeberg.org/gruf/go-cache/v3/result/key.go b/vendor/codeberg.org/gruf/go-cache/v3/result/key.go
index b7e75b6b8..6be316c2b 100644
--- a/vendor/codeberg.org/gruf/go-cache/v3/result/key.go
+++ b/vendor/codeberg.org/gruf/go-cache/v3/result/key.go
@@ -1,6 +1,7 @@
package result
import (
+ "fmt"
"reflect"
"strings"
"sync"
@@ -51,10 +52,10 @@ func (sk structKeys) generate(a any) []cacheKey {
buf.B = buf.B[:0]
// Append each field value to buffer.
- for _, idx := range sk[i].fields {
- fv := v.Field(idx)
+ for _, field := range sk[i].fields {
+ fv := v.Field(field.index)
fi := fv.Interface()
- buf.B = mangler.Append(buf.B, fi)
+ buf.B = field.mangle(buf.B, fi)
buf.B = append(buf.B, '.')
}
@@ -123,17 +124,58 @@ type structKey struct {
// fields is a slice of runtime struct field
// indices, of the fields encompassed by this key.
- fields []int
+
+ fields []structField
// pkeys is a lookup of stored struct key values
// to the primary cache lookup key (int64).
pkeys map[string]int64
}
-// genStructKey will generate a structKey{} information object for user-given lookup
+type structField struct {
+ // index is the reflect index of this struct field.
+ index int
+
+ // mangle is the mangler function for
+ // serializing values of this struct field.
+ mangle mangler.Mangler
+}
+
+// genKey generates a cache key string for given key parts (i.e. serializes them using "go-mangler").
+func (sk structKey) genKey(parts []any) string {
+ // Check this expected no. key parts.
+ if len(parts) != len(sk.fields) {
+ panic(fmt.Sprintf("incorrect no. key parts provided: want=%d received=%d", len(parts), len(sk.fields)))
+ }
+
+ // Acquire byte buffer
+ buf := getBuf()
+ defer putBuf(buf)
+ buf.Reset()
+
+ // Encode each key part
+ for i, part := range parts {
+ buf.B = sk.fields[i].mangle(buf.B, part)
+ buf.B = append(buf.B, '.')
+ }
+
+ // Drop last '.'
+ buf.Truncate(1)
+
+ // Return string copy
+ return string(buf.B)
+}
+
+// newStructKey will generate a structKey{} information object for user-given lookup
// key information, and the receiving generic paramter's type information. Panics on error.
-func genStructKey(lk Lookup, t reflect.Type) structKey {
- var zeros []any
+func newStructKey(lk Lookup, t reflect.Type) structKey {
+ var (
+ sk structKey
+ zeros []any
+ )
+
+ // Set the lookup name
+ sk.name = lk.Name
// Split dot-separated lookup to get
// the individual struct field names
@@ -142,8 +184,8 @@ func genStructKey(lk Lookup, t reflect.Type) structKey {
panic("no key fields specified")
}
- // Pre-allocate slice of expected length
- fields := make([]int, len(names))
+ // Allocate the mangler and field indices slice.
+ sk.fields = make([]structField, len(names))
for i, name := range names {
// Get field info for given name
@@ -158,60 +200,30 @@ func genStructKey(lk Lookup, t reflect.Type) structKey {
}
// Set the runtime field index
- fields[i] = ft.Index[0]
+ sk.fields[i].index = ft.Index[0]
// Allocate new instance of field
v := reflect.New(ft.Type)
v = v.Elem()
+ // Fetch mangler for field type.
+ sk.fields[i].mangle = mangler.Get(ft.Type)
+
if !lk.AllowZero {
// Append the zero value interface
zeros = append(zeros, v.Interface())
}
}
- var zvalue string
-
if len(zeros) > 0 {
// Generate zero value string
- zvalue = genKey(zeros...)
+ sk.zero = sk.genKey(zeros)
}
- return structKey{
- name: lk.Name,
- zero: zvalue,
- fields: fields,
- pkeys: make(map[string]int64),
- }
-}
+ // Allocate primary lookup map
+ sk.pkeys = make(map[string]int64)
-// genKey generates a cache key for given key values.
-func genKey(parts ...any) string {
- if len(parts) == 0 {
- // Panic to prevent annoying usecase
- // where user forgets to pass lookup
- // and instead only passes a key part,
- // e.g. cache.Get("key")
- // which then always returns false.
- panic("no key parts provided")
- }
-
- // Acquire byte buffer
- buf := getBuf()
- defer putBuf(buf)
- buf.Reset()
-
- // Encode each key part
- for _, part := range parts {
- buf.B = mangler.Append(buf.B, part)
- buf.B = append(buf.B, '.')
- }
-
- // Drop last '.'
- buf.Truncate(1)
-
- // Return string copy
- return string(buf.B)
+ return sk
}
// isExported checks whether function name is exported.
diff --git a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go
index 111de0757..a12b33ab9 100644
--- a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go
+++ b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go
@@ -6,15 +6,15 @@ import (
"codeberg.org/gruf/go-sched"
)
-// scheduler is the global cache runtime scheduler
-// for handling regular cache evictions.
+// scheduler is the global cache runtime
+// scheduler for handling cache evictions.
var scheduler sched.Scheduler
// schedule will given sweep routine to the global scheduler, and start global scheduler.
func schedule(sweep func(time.Time), freq time.Duration) func() {
if !scheduler.Running() {
- // ensure running
- _ = scheduler.Start()
+ // ensure sched running
+ _ = scheduler.Start(nil)
}
return scheduler.Schedule(sched.NewJob(sweep).Every(freq))
}
diff --git a/vendor/codeberg.org/gruf/go-mangler/README.md b/vendor/codeberg.org/gruf/go-mangler/README.md
index 15bbf57c4..c79b87546 100644
--- a/vendor/codeberg.org/gruf/go-mangler/README.md
+++ b/vendor/codeberg.org/gruf/go-mangler/README.md
@@ -19,8 +19,6 @@ pkg: codeberg.org/gruf/go-mangler
cpu: 11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz
BenchmarkMangle
BenchmarkMangle-8 723278 1593 ns/op 1168 B/op 120 allocs/op
-BenchmarkMangleHash
-BenchmarkMangleHash-8 405380 2788 ns/op 4496 B/op 214 allocs/op
BenchmarkJSON
BenchmarkJSON-8 199360 6116 ns/op 4243 B/op 142 allocs/op
BenchmarkBinary
diff --git a/vendor/codeberg.org/gruf/go-mangler/load.go b/vendor/codeberg.org/gruf/go-mangler/load.go
index fd742c17b..20850f55c 100644
--- a/vendor/codeberg.org/gruf/go-mangler/load.go
+++ b/vendor/codeberg.org/gruf/go-mangler/load.go
@@ -118,20 +118,16 @@ func loadReflect(t reflect.Type) (Mangler, rMangler) {
reflect.Uintptr:
return mangle_platform_int, nil
- case reflect.Int8,
- reflect.Uint8:
+ case reflect.Int8, reflect.Uint8:
return mangle_8bit, nil
- case reflect.Int16,
- reflect.Uint16:
+ case reflect.Int16, reflect.Uint16:
return mangle_16bit, nil
- case reflect.Int32,
- reflect.Uint32:
+ case reflect.Int32, reflect.Uint32:
return mangle_32bit, nil
- case reflect.Int64,
- reflect.Uint64:
+ case reflect.Int64, reflect.Uint64:
return mangle_64bit, nil
case reflect.Float32:
@@ -214,20 +210,16 @@ func loadReflectKnownPtr(et reflect.Type) Mangler {
reflect.Uintptr:
return mangle_platform_int_ptr
- case reflect.Int8,
- reflect.Uint8:
+ case reflect.Int8, reflect.Uint8:
return mangle_8bit_ptr
- case reflect.Int16,
- reflect.Uint16:
+ case reflect.Int16, reflect.Uint16:
return mangle_16bit_ptr
- case reflect.Int32,
- reflect.Uint32:
+ case reflect.Int32, reflect.Uint32:
return mangle_32bit_ptr
- case reflect.Int64,
- reflect.Uint64:
+ case reflect.Int64, reflect.Uint64:
return mangle_64bit_ptr
case reflect.Float32:
@@ -261,20 +253,16 @@ func loadReflectKnownSlice(et reflect.Type) Mangler {
reflect.Uintptr:
return mangle_platform_int_slice
- case reflect.Int8,
- reflect.Uint8:
+ case reflect.Int8, reflect.Uint8:
return mangle_8bit_slice
- case reflect.Int16,
- reflect.Uint16:
+ case reflect.Int16, reflect.Uint16:
return mangle_16bit_slice
- case reflect.Int32,
- reflect.Uint32:
+ case reflect.Int32, reflect.Uint32:
return mangle_32bit_slice
- case reflect.Int64,
- reflect.Uint64:
+ case reflect.Int64, reflect.Uint64:
return mangle_64bit_slice
case reflect.Float32:
@@ -305,7 +293,7 @@ func loadReflectArray(et reflect.Type) rMangler {
return nil
}
-// loadReflectMap ...
+// loadReflectMap loads an rMangler function for a map of given key and value types.
func loadReflectMap(kt, vt reflect.Type) rMangler {
var kmng, vmng rMangler
diff --git a/vendor/codeberg.org/gruf/go-mangler/mangle.go b/vendor/codeberg.org/gruf/go-mangler/mangle.go
index 7158893ae..983216003 100644
--- a/vendor/codeberg.org/gruf/go-mangler/mangle.go
+++ b/vendor/codeberg.org/gruf/go-mangler/mangle.go
@@ -3,15 +3,13 @@ package mangler
import (
"encoding/binary"
"reflect"
+ "sync"
"unsafe"
-
- "github.com/cespare/xxhash"
- "github.com/cornelk/hashmap"
)
var (
// manglers is a map of runtime type ptrs => Mangler functions.
- manglers = hashmap.New[uintptr, Mangler]()
+ manglers = sync.Map{}
// bin is a short-hand for our chosen byteorder encoding.
bin = binary.LittleEndian
@@ -36,12 +34,38 @@ type Mangler func(buf []byte, value any) []byte
type rMangler func(buf []byte, value reflect.Value) []byte
// Get will fetch the Mangler function for given runtime type.
-func Get(t reflect.Type) (Mangler, bool) {
- if t == nil {
- return nil, false
- }
+// Note that the returned mangler will be a no-op in the case
+// that an incorrect type is passed as the value argument.
+func Get(t reflect.Type) Mangler {
+ var mng Mangler
+
+ // Get raw runtime type ptr
uptr := uintptr(iface_value(t))
- return manglers.Get(uptr)
+
+ // Look for a cached mangler
+ v, ok := manglers.Load(uptr)
+
+ if !ok {
+ // Load mangler function
+ mng = loadMangler(nil, t)
+ } else {
+ // cast cached value
+ mng = v.(Mangler)
+ }
+
+ return func(buf []byte, value any) []byte {
+ // Type check passed value against original arg type.
+ if vt := reflect.TypeOf(value); vt != t {
+ return buf
+ }
+
+ // First write the type ptr (this adds
+ // a unique prefix for each runtime type).
+ buf = mangle_platform_int(buf, uptr)
+
+ // Finally, mangle value
+ return mng(buf, value)
+ }
}
// Register will register the given Mangler function for use with vars of given runtime type. This allows
@@ -57,17 +81,19 @@ func Register(t reflect.Type, m Mangler) {
uptr := uintptr(iface_value(t))
// Ensure this is a unique encoder
- if _, ok := manglers.Get(uptr); ok {
+ if _, ok := manglers.Load(uptr); ok {
panic("already registered mangler for type: " + t.String())
}
// Cache this encoder func
- manglers.Set(uptr, m)
+ manglers.Store(uptr, m)
}
// Append will append the mangled form of input value 'a' to buffer 'b'.
// See mangler.String() for more information on mangled output.
func Append(b []byte, a any) []byte {
+ var mng Mangler
+
// Get reflect type of 'a'
t := reflect.TypeOf(a)
@@ -75,12 +101,15 @@ func Append(b []byte, a any) []byte {
uptr := uintptr(iface_value(t))
// Look for a cached mangler
- mng, ok := manglers.Get(uptr)
+ v, ok := manglers.Load(uptr)
if !ok {
// Load mangler into cache
- mng = loadMangler(a, t)
- manglers.Set(uptr, mng)
+ mng = loadMangler(nil, t)
+ manglers.Store(uptr, mng)
+ } else {
+ // cast cached value
+ mng = v.(Mangler)
}
// First write the type ptr (this adds
@@ -123,10 +152,3 @@ func String(a any) string {
b := Append(make([]byte, 0, 32), a)
return *(*string)(unsafe.Pointer(&b))
}
-
-// Hash returns the xxHash digest of the result of mangler.Append(nil, 'a').
-func Hash(a any) uint64 {
- b := make([]byte, 0, 32)
- b = Append(b, a)
- return xxhash.Sum64(b)
-}
diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go
index 6a0c509cb..9cb6aa5f7 100644
--- a/vendor/codeberg.org/gruf/go-runners/context.go
+++ b/vendor/codeberg.org/gruf/go-runners/context.go
@@ -12,6 +12,11 @@ var closedctx = func() context.Context {
return ctx
}()
+// Closed returns an always closed context.
+func Closed() context.Context {
+ return closedctx
+}
+
// ContextWithCancel returns a new context.Context impl with cancel.
func ContextWithCancel() (context.Context, context.CancelFunc) {
ctx := make(cancelctx)
@@ -41,3 +46,18 @@ func (ctx cancelctx) Err() error {
func (cancelctx) Value(key interface{}) interface{} {
return nil
}
+
+func (ctx cancelctx) String() string {
+ var state string
+ select {
+ case <-ctx:
+ state = "closed"
+ default:
+ state = "open"
+ }
+ return "cancelctx{state:" + state + "}"
+}
+
+func (ctx cancelctx) GoString() string {
+ return "runners." + ctx.String()
+}
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
index b6be57d0a..1d83e85c7 100644
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ b/vendor/codeberg.org/gruf/go-runners/pool.go
@@ -2,8 +2,12 @@ package runners
import (
"context"
+ "fmt"
+ "os"
"runtime"
"sync"
+
+ "codeberg.org/gruf/go-errors/v2"
)
// WorkerFunc represents a function processable by a worker in WorkerPool. Note
@@ -26,17 +30,22 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {
return false
}
- if workers < 1 {
- // Use $GOMAXPROCS as default worker count
+ if workers <= 0 {
+ // Use $GOMAXPROCS as default.
workers = runtime.GOMAXPROCS(0)
}
if queue < 0 {
- // Set a reasonable queue default
- queue = workers * 2
+ // Use reasonable queue default.
+ queue = workers * 10
}
- // Allocate pool queue of given size
+ // Allocate pool queue of given size.
+ //
+ // This MUST be set BEFORE we return and NOT in
+ // the launched goroutine, or there is a risk that
+ // the pool may appear as closed for a short time
+ // until the main goroutine has been entered.
fns := make(chan WorkerFunc, queue)
pool.fns = fns
@@ -53,50 +62,49 @@ func (pool *WorkerPool) Start(workers int, queue int) bool {
// Start goroutine worker functions
for i := 0; i < workers; i++ {
+ wait.Add(1)
+
go func() {
- // Trigger start / stop
- wait.Add(1)
defer wait.Done()
- // Keep workers running on panic
- for !workerstart(ctx, fns) {
+ // Run worker function.
+ for !worker_run(ctx, fns) {
+ // retry on panic
}
}()
}
- // Set GC finalizer to stop pool on dealloc
+ // Set GC finalizer to stop pool on dealloc.
runtime.SetFinalizer(pool, func(pool *WorkerPool) {
- pool.svc.Stop()
+ _ = pool.svc.Stop()
})
// Wait on ctx
<-ctx.Done()
- // Stop all workers
- close(pool.fns)
+ // Drain function queue.
+ //
+ // All functions in the queue MUST be
+ // run, so we pass them a closed context.
+ //
+ // This mainly allows us to block until
+ // the function queue is empty, as worker
+ // functions will also continue draining in
+ // the background with the (now) closed ctx.
+ for !drain_queue(fns) {
+ // retry on panic
+ }
+
+ // Now the queue is empty, we can
+ // safely close the channel signalling
+ // all of the workers to return.
+ close(fns)
wait.Wait()
}()
return true
}
-// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed.
-func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool {
- // Recover and drop any panic
- defer func() { recover() }()
-
- for {
- // Wait on next func
- fn, ok := <-fns
- if !ok {
- return true
- }
-
- // Run with ctx
- fn(ctx)
- }
-}
-
// Stop will stop the WorkerPool management loop, blocking until stopped.
func (pool *WorkerPool) Stop() bool {
return pool.svc.Stop()
@@ -124,22 +132,24 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
-func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) {
+func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
// Check valid fn
if fn == nil {
- return
+ return false
}
select {
// Caller ctx cancelled
case <-ctx.Done():
+ return false
// Pool ctx cancelled
case <-pool.svc.Done():
- fn(closedctx)
+ return false
// Placed fn in queue
case pool.fns <- fn:
+ return true
}
}
@@ -167,5 +177,54 @@ func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
// Queue returns the number of currently queued WorkerFuncs.
func (pool *WorkerPool) Queue() int {
- return len(pool.fns)
+ var l int
+ pool.svc.While(func() {
+ l = len(pool.fns)
+ })
+ return l
+}
+
+// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
+func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
+ defer func() {
+ // Recover and drop any panic
+ if r := recover(); r != nil {
+ const msg = "worker_run: recovered panic: %v\n\n%s\n"
+ fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
+ }
+ }()
+
+ for {
+ // Wait on next func
+ fn, ok := <-fns
+ if !ok {
+ return true
+ }
+
+ // Run with ctx
+ fn(ctx)
+ }
+}
+
+// drain_queue will drain and run all functions in worker queue, passing in a closed context.
+func drain_queue(fns <-chan WorkerFunc) bool {
+ defer func() {
+ // Recover and drop any panic
+ if r := recover(); r != nil {
+ const msg = "drain_queue: recovered panic: %v\n\n%s\n"
+ fmt.Fprintf(os.Stderr, msg, r, errors.GetCallers(2, 10))
+ }
+ }()
+
+ for {
+ select {
+ // Run with closed ctx
+ case fn := <-fns:
+ fn(closedctx)
+
+ // Queue is empty
+ default:
+ return true
+ }
+ }
}
diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go
index 68e8ea384..2c9be8225 100644
--- a/vendor/codeberg.org/gruf/go-runners/service.go
+++ b/vendor/codeberg.org/gruf/go-runners/service.go
@@ -8,11 +8,10 @@ import (
// Service provides a means of tracking a single long-running service, provided protected state
// changes and preventing multiple instances running. Also providing service state information.
type Service struct {
- state uint32 // 0=stopped, 1=running, 2=stopping
- wait sync.Mutex // wait is the mutex used as a single-entity wait-group, i.e. just a "wait" :p
- cncl context.CancelFunc // cncl is the cancel function set for the current context
- ctx context.Context // ctx is the current context for running function (or nil if not running)
- mu sync.Mutex // mu protects state changes
+ state uint32 // 0=stopped, 1=running, 2=stopping
+ mutex sync.Mutex // mutext protects overall state changes
+ wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
+ ctx cancelctx // ctx is the current context for running function (or nil if not running)
}
// Run will run the supplied function until completion, using given context to propagate cancel.
@@ -29,13 +28,12 @@ func (svc *Service) Run(fn func(context.Context)) bool {
svc.wait.Unlock()
// ensure stopped
- svc.Stop()
+ _ = svc.Stop()
}()
- // Run user func
- if fn != nil {
- fn(ctx)
- }
+ // Run
+ fn(ctx)
+
return true
}
@@ -54,13 +52,11 @@ func (svc *Service) GoRun(fn func(context.Context)) bool {
svc.wait.Unlock()
// ensure stopped
- svc.Stop()
+ _ = svc.Stop()
}()
- // Run user func
- if fn != nil {
- fn(ctx)
- }
+ // Run
+ fn(ctx)
}()
return true
@@ -70,14 +66,14 @@ func (svc *Service) GoRun(fn func(context.Context)) bool {
// returns false if not running, and true only after Service is fully stopped.
func (svc *Service) Stop() bool {
// Attempt to stop the svc
- cncl, ok := svc.doStop()
+ ctx, ok := svc.doStop()
if !ok {
return false
}
defer func() {
// Get svc lock
- svc.mu.Lock()
+ svc.mutex.Lock()
// Wait until stopped
svc.wait.Lock()
@@ -85,53 +81,65 @@ func (svc *Service) Stop() bool {
// Reset the svc
svc.ctx = nil
- svc.cncl = nil
svc.state = 0
- svc.mu.Unlock()
+ svc.mutex.Unlock()
}()
- cncl() // cancel ctx
+ // Cancel ctx
+ close(ctx)
+
return true
}
+// While allows you to execute given function guaranteed within current
+// service state. Please note that this will hold the underlying service
+// state change mutex open while executing the function.
+func (svc *Service) While(fn func()) {
+ // Protect state change
+ svc.mutex.Lock()
+ defer svc.mutex.Unlock()
+
+ // Run
+ fn()
+}
+
// doStart will safely set Service state to started, returning a ptr to this context insance.
-func (svc *Service) doStart() (context.Context, bool) {
+func (svc *Service) doStart() (cancelctx, bool) {
// Protect startup
- svc.mu.Lock()
+ svc.mutex.Lock()
if svc.state != 0 /* not stopped */ {
- svc.mu.Unlock()
+ svc.mutex.Unlock()
return nil, false
}
// state started
svc.state = 1
- // Take our own ptr
- var ctx context.Context
-
if svc.ctx == nil {
- // Context required allocating
- svc.ctx, svc.cncl = ContextWithCancel()
+ // this will only have been allocated
+ // if svc.Done() was already called.
+ svc.ctx = make(cancelctx)
}
// Start the waiter
svc.wait.Lock()
- // Set our ptr + unlock
- ctx = svc.ctx
- svc.mu.Unlock()
+ // Take our own ptr
+ // and unlock state
+ ctx := svc.ctx
+ svc.mutex.Unlock()
return ctx, true
}
// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
-func (svc *Service) doStop() (context.CancelFunc, bool) {
+func (svc *Service) doStop() (cancelctx, bool) {
// Protect stop
- svc.mu.Lock()
+ svc.mutex.Lock()
if svc.state != 1 /* not started */ {
- svc.mu.Unlock()
+ svc.mutex.Unlock()
return nil, false
}
@@ -140,17 +148,17 @@ func (svc *Service) doStop() (context.CancelFunc, bool) {
// Take our own ptr
// and unlock state
- cncl := svc.cncl
- svc.mu.Unlock()
+ ctx := svc.ctx
+ svc.mutex.Unlock()
- return cncl, true
+ return ctx, true
}
// Running returns if Service is running (i.e. state NOT stopped / stopping).
func (svc *Service) Running() bool {
- svc.mu.Lock()
+ svc.mutex.Lock()
state := svc.state
- svc.mu.Unlock()
+ svc.mutex.Unlock()
return (state == 1)
}
@@ -159,28 +167,27 @@ func (svc *Service) Running() bool {
func (svc *Service) Done() <-chan struct{} {
var done <-chan struct{}
- svc.mu.Lock()
+ svc.mutex.Lock()
switch svc.state {
// stopped
- // (here we create a new context so that the
- // returned 'done' channel here will still
- // be valid for when Service is next started)
case 0:
if svc.ctx == nil {
- // need to allocate new context
- svc.ctx, svc.cncl = ContextWithCancel()
+ // here we create a new context so that the
+ // returned 'done' channel here will still
+ // be valid for when Service is next started.
+ svc.ctx = make(cancelctx)
}
- done = svc.ctx.Done()
+ done = svc.ctx
// started
case 1:
- done = svc.ctx.Done()
+ done = svc.ctx
// stopping
case 2:
- done = svc.ctx.Done()
+ done = svc.ctx
}
- svc.mu.Unlock()
+ svc.mutex.Unlock()
return done
}
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
index bdf0a371d..df19cf18b 100644
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go
@@ -33,10 +33,11 @@ type Scheduler struct {
jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
svc runners.Service // svc manages the main scheduler routine
jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
+ rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs
}
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
-func (sch *Scheduler) Start() bool {
+func (sch *Scheduler) Start(gorun func(func())) bool {
var block sync.Mutex
// Use mutex to synchronize between started
@@ -49,14 +50,19 @@ func (sch *Scheduler) Start() bool {
// Create Scheduler job channel
sch.jch = make(chan interface{})
- // Unlock start routine
- block.Unlock()
+ // Set goroutine runner function
+ if sch.rgo = gorun; sch.rgo == nil {
+ sch.rgo = func(f func()) { go f() }
+ }
// Set GC finalizer to ensure scheduler stopped
runtime.SetFinalizer(sch, func(sch *Scheduler) {
_ = sch.Stop()
})
+ // Unlock start routine
+ block.Unlock()
+
// Enter main loop
sch.run(ctx)
})
@@ -87,7 +93,7 @@ func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
panic("nil job")
// Check we are running
- case sch.jch == nil:
+ case !sch.Running():
panic("scheduler not running")
}
@@ -142,21 +148,6 @@ func (sch *Scheduler) run(ctx context.Context) {
}
)
- for {
- select {
- // Handle received job/id
- case v := <-sch.jch:
- sch.handle(v)
- continue
-
- // No more
- default:
- }
-
- // Done
- break
- }
-
// Create a stopped timer
timer = time.NewTimer(1)
<-timer.C
@@ -256,8 +247,10 @@ func (sch *Scheduler) schedule(now time.Time) {
return
}
- // Pass job to runner
- go job.Run(now)
+ // Pass to runner
+ sch.rgo(func() {
+ job.Run(now)
+ })
// Update the next call time
next := job.timing.Next(now)