summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r--vendor/codeberg.org/gruf/go-cache/v2/scheduler.go7
-rw-r--r--vendor/codeberg.org/gruf/go-kv/field.go2
-rw-r--r--vendor/codeberg.org/gruf/go-kv/field_fmt.go33
-rw-r--r--vendor/codeberg.org/gruf/go-kv/field_format.go2
-rw-r--r--vendor/codeberg.org/gruf/go-kv/format/format.go4
-rw-r--r--vendor/codeberg.org/gruf/go-kv/format/util.go13
-rw-r--r--vendor/codeberg.org/gruf/go-kv/util.go12
-rw-r--r--vendor/codeberg.org/gruf/go-logger/v2/level/levels.go26
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go197
-rw-r--r--vendor/codeberg.org/gruf/go-runners/service.go29
-rw-r--r--vendor/codeberg.org/gruf/go-sched/job.go15
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go92
12 files changed, 282 insertions, 150 deletions
diff --git a/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go b/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go
index bc1d8074a..ca93a0fcd 100644
--- a/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go
@@ -8,10 +8,13 @@ import (
// scheduler is the global cache runtime scheduler
// for handling regular cache evictions.
-var scheduler = sched.NewScheduler(5)
+var scheduler sched.Scheduler
// schedule will given sweep routine to the global scheduler, and start global scheduler.
func schedule(sweep func(time.Time), freq time.Duration) func() {
- go scheduler.Start() // does nothing if already running
+ if !scheduler.Running() {
+ // ensure running
+ _ = scheduler.Start()
+ }
return scheduler.Schedule(sched.NewJob(sweep).Every(freq))
}
diff --git a/vendor/codeberg.org/gruf/go-kv/field.go b/vendor/codeberg.org/gruf/go-kv/field.go
index 7ff6f1ca3..86a53b173 100644
--- a/vendor/codeberg.org/gruf/go-kv/field.go
+++ b/vendor/codeberg.org/gruf/go-kv/field.go
@@ -75,7 +75,7 @@ type Field struct {
// Key returns the formatted key string of this Field.
func (f Field) Key() string {
buf := byteutil.Buffer{B: make([]byte, 0, bufsize/2)}
- appendQuoteKey(&buf, f.K)
+ AppendQuoteKey(&buf, f.K)
return buf.String()
}
diff --git a/vendor/codeberg.org/gruf/go-kv/field_fmt.go b/vendor/codeberg.org/gruf/go-kv/field_fmt.go
index c62393fe0..fcdcc5e17 100644
--- a/vendor/codeberg.org/gruf/go-kv/field_fmt.go
+++ b/vendor/codeberg.org/gruf/go-kv/field_fmt.go
@@ -5,10 +5,18 @@ package kv
import (
"fmt"
+ "sync"
"codeberg.org/gruf/go-byteutil"
)
+// bufPool is a memory pool of byte buffers.
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return &byteutil.Buffer{B: make([]byte, 0, 512)}
+ },
+}
+
// AppendFormat will append formatted format of Field to 'buf'. See .String() for details.
func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) {
var fmtstr string
@@ -17,9 +25,9 @@ func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) {
} else /* regular */ {
fmtstr = `%+v`
}
- appendQuoteKey(buf, f.K)
+ AppendQuoteKey(buf, f.K)
buf.WriteByte('=')
- appendQuoteValue(buf, fmt.Sprintf(fmtstr, f.V))
+ appendValuef(buf, fmtstr, f.V)
}
// Value returns the formatted value string of this Field.
@@ -31,6 +39,25 @@ func (f Field) Value(vbose bool) string {
fmtstr = `%+v`
}
buf := byteutil.Buffer{B: make([]byte, 0, bufsize/2)}
- appendQuoteValue(&buf, fmt.Sprintf(fmtstr, f.V))
+ appendValuef(&buf, fmtstr, f.V)
return buf.String()
}
+
+// appendValuef appends a quoted value string (formatted by fmt.Appendf) to 'buf'.
+func appendValuef(buf *byteutil.Buffer, format string, args ...interface{}) {
+ // Write format string to a byte buffer
+ fmtbuf := bufPool.Get().(*byteutil.Buffer)
+ fmtbuf.B = fmt.Appendf(fmtbuf.B, format, args...)
+
+ // Append quoted value to dst buffer
+ AppendQuoteValue(buf, fmtbuf.String())
+
+ // Drop overly large capacity buffers
+ if fmtbuf.Cap() > int(^uint16(0)) {
+ return
+ }
+
+ // Replace buffer in pool
+ fmtbuf.Reset()
+ bufPool.Put(fmtbuf)
+}
diff --git a/vendor/codeberg.org/gruf/go-kv/field_format.go b/vendor/codeberg.org/gruf/go-kv/field_format.go
index db1ffc721..4fa7a8dcf 100644
--- a/vendor/codeberg.org/gruf/go-kv/field_format.go
+++ b/vendor/codeberg.org/gruf/go-kv/field_format.go
@@ -16,7 +16,7 @@ func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) {
} else /* regular */ {
fmtstr = "{:v}"
}
- appendQuoteKey(buf, f.K)
+ AppendQuoteKey(buf, f.K)
buf.WriteByte('=')
format.Appendf(buf, fmtstr, f.V)
}
diff --git a/vendor/codeberg.org/gruf/go-kv/format/format.go b/vendor/codeberg.org/gruf/go-kv/format/format.go
index df5a94b7c..7edc4475f 100644
--- a/vendor/codeberg.org/gruf/go-kv/format/format.go
+++ b/vendor/codeberg.org/gruf/go-kv/format/format.go
@@ -311,7 +311,7 @@ func (f format) AppendString(s string) {
}
func (f format) AppendStringKey(s string) {
- if !strconv.CanBackquote(s) {
+ if len(s) > SingleTermLine || !strconv.CanBackquote(s) {
// Requires quoting AND escaping
f.Buffer.B = strconv.AppendQuote(f.Buffer.B, s)
} else if ContainsDoubleQuote(s) {
@@ -329,7 +329,7 @@ func (f format) AppendStringKey(s string) {
}
func (f format) AppendStringQuoted(s string) {
- if !strconv.CanBackquote(s) {
+ if len(s) > SingleTermLine || !strconv.CanBackquote(s) {
// Requires quoting AND escaping
f.Buffer.B = strconv.AppendQuote(f.Buffer.B, s)
} else if ContainsDoubleQuote(s) {
diff --git a/vendor/codeberg.org/gruf/go-kv/format/util.go b/vendor/codeberg.org/gruf/go-kv/format/util.go
index c4c42329e..674f81be5 100644
--- a/vendor/codeberg.org/gruf/go-kv/format/util.go
+++ b/vendor/codeberg.org/gruf/go-kv/format/util.go
@@ -5,6 +5,19 @@ import (
"unsafe"
)
+const (
+ // SingleTermLine: beyond a certain length of string, all of the
+ // extra checks to handle quoting/not-quoting add a significant
+ // amount of extra processing time. Quoting in this manner only really
+ // effects readability on a single line, so a max string length that
+ // encompasses the maximum number of columns on *most* terminals was
+ // selected. This was chosen using the metric that 1080p is one of the
+ // most common display resolutions, and that a relatively small font size
+ // of 7 requires 223 columns. So 256 should be >= $COLUMNS (fullscreen)
+ // in 99% of usecases (these figures all pulled out of my ass).
+ SingleTermLine = 256
+)
+
// ContainsSpaceOrTab checks if "s" contains space or tabs.
func ContainsSpaceOrTab(s string) bool {
if i := strings.IndexByte(s, ' '); i != -1 {
diff --git a/vendor/codeberg.org/gruf/go-kv/util.go b/vendor/codeberg.org/gruf/go-kv/util.go
index a9526bf3d..3d249e5d4 100644
--- a/vendor/codeberg.org/gruf/go-kv/util.go
+++ b/vendor/codeberg.org/gruf/go-kv/util.go
@@ -7,10 +7,10 @@ import (
"codeberg.org/gruf/go-kv/format"
)
-// appendQuoteKey will append and escape/quote a formatted key string.
-func appendQuoteKey(buf *byteutil.Buffer, str string) {
+// AppendQuoteKey will append and escape/quote a formatted key string.
+func AppendQuoteKey(buf *byteutil.Buffer, str string) {
switch {
- case !strconv.CanBackquote(str):
+ case len(str) > format.SingleTermLine || !strconv.CanBackquote(str):
// Append quoted and escaped string
buf.B = strconv.AppendQuote(buf.B, str)
case format.ContainsDoubleQuote(str):
@@ -27,10 +27,10 @@ func appendQuoteKey(buf *byteutil.Buffer, str string) {
}
}
-// appendQuoteValue will append and escape/quote a formatted value string.
-func appendQuoteValue(buf *byteutil.Buffer, str string) {
+// AppendQuoteValue will append and escape/quote a formatted value string.
+func AppendQuoteValue(buf *byteutil.Buffer, str string) {
switch {
- case !strconv.CanBackquote(str):
+ case len(str) > format.SingleTermLine || !strconv.CanBackquote(str):
// Append quoted and escaped string
buf.B = strconv.AppendQuote(buf.B, str)
return
diff --git a/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go b/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go
index 1804bdb23..3b1715a3c 100644
--- a/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go
+++ b/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go
@@ -1,5 +1,10 @@
package level
+import (
+ "fmt"
+ "strings"
+)
+
// LEVEL defines a level of logging.
type LEVEL uint8
@@ -34,12 +39,6 @@ func Default() Levels {
ERROR: "ERROR",
FATAL: "FATAL",
PANIC: "PANIC",
-
- // we set these just so that
- // it can be debugged when someone
- // attempts to log with ALL/UNSET
- ALL: "{all}",
- UNSET: "{unset}",
}
}
@@ -47,3 +46,18 @@ func Default() Levels {
func (l Levels) Get(lvl LEVEL) string {
return l[int(lvl)]
}
+
+// Parse will attempt to decode a LEVEL from given string, checking (case insensitive) against strings in Levels.
+func (l Levels) Parse(s string) (LEVEL, error) {
+ // Ensure consistent casing
+ s = strings.ToUpper(s)
+
+ for lvl := LEVEL(0); int(lvl) < len(l); lvl++ {
+ // Compare to eqach known level
+ if strings.ToUpper(l[lvl]) == s {
+ return lvl, nil
+ }
+ }
+
+ return 0, fmt.Errorf("unrecognized log level: %s", s)
+}
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
index ca8849f30..b6be57d0a 100644
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ b/vendor/codeberg.org/gruf/go-runners/pool.go
@@ -7,124 +7,105 @@ import (
)
// WorkerFunc represents a function processable by a worker in WorkerPool. Note
-// that implementations absolutely MUST check whether passed context is Done()
-// otherwise stopping the pool may block for large periods of time.
+// that implementations absolutely MUST check whether passed context is <-ctx.Done()
+// otherwise stopping the pool may block indefinitely.
type WorkerFunc func(context.Context)
// WorkerPool provides a means of enqueuing asynchronous work.
type WorkerPool struct {
- queue chan WorkerFunc
- free chan struct{}
- wait sync.WaitGroup
- svc Service
+ fns chan WorkerFunc
+ svc Service
}
-// NewWorkerPool returns a new WorkerPool with provided worker count and WorkerFunc queue size.
-// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the
-// queue size represents the max number of WorkerFuncs that can be queued at any one time.
-func NewWorkerPool(workers int, queue int) WorkerPool {
+// Start will start the main WorkerPool management loop in a new goroutine, along
+// with requested number of child worker goroutines. Returns false if already running.
+func (pool *WorkerPool) Start(workers int, queue int) bool {
+ // Attempt to start the svc
+ ctx, ok := pool.svc.doStart()
+ if !ok {
+ return false
+ }
+
if workers < 1 {
+ // Use $GOMAXPROCS as default worker count
workers = runtime.GOMAXPROCS(0)
}
- if queue < 1 {
+
+ if queue < 0 {
+ // Set a reasonable queue default
queue = workers * 2
}
- return WorkerPool{
- queue: make(chan WorkerFunc, queue),
- free: make(chan struct{}, workers),
- }
-}
-// Start will attempt to start the worker pool, asynchronously. Return is success state.
-func (pool *WorkerPool) Start() bool {
- ok := true
+ // Allocate pool queue of given size
+ fns := make(chan WorkerFunc, queue)
+ pool.fns = fns
- done := make(chan struct{})
go func() {
- ok = pool.svc.Run(func(ctx context.Context) {
- close(done)
- pool.process(ctx)
- })
- if !ok {
- close(done)
- }
- }()
- <-done
-
- return ok
-}
-
-// Stop will attempt to stop the worker pool, this will block until stopped. Return is success state.
-func (pool *WorkerPool) Stop() bool {
- return pool.svc.Stop()
-}
+ defer func() {
+ // unlock single wait
+ pool.svc.wait.Unlock()
-// Running returns whether the worker pool is running.
-func (pool *WorkerPool) Running() bool {
- return pool.svc.Running()
-}
+ // ensure stopped
+ pool.svc.Stop()
+ }()
-// execute will take a queued function and pass it to a free worker when available.
-func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) {
- var acquired bool
+ var wait sync.WaitGroup
- // Set as running
- pool.wait.Add(1)
+ // Start goroutine worker functions
+ for i := 0; i < workers; i++ {
+ go func() {
+ // Trigger start / stop
+ wait.Add(1)
+ defer wait.Done()
- select {
- // Pool context cancelled
- // (we fall through and let
- // the function execute).
- case <-ctx.Done():
+ // Keep workers running on panic
+ for !workerstart(ctx, fns) {
+ }
+ }()
+ }
- // Free worker acquired.
- case pool.free <- struct{}{}:
- acquired = true
- }
+ // Set GC finalizer to stop pool on dealloc
+ runtime.SetFinalizer(pool, func(pool *WorkerPool) {
+ pool.svc.Stop()
+ })
- go func() {
- defer func() {
- // defer in case panic
- if acquired {
- <-pool.free
- }
- pool.wait.Done()
- }()
+ // Wait on ctx
+ <-ctx.Done()
- // Run queued
- fn(ctx)
+ // Stop all workers
+ close(pool.fns)
+ wait.Wait()
}()
+
+ return true
}
-// process is the background processing routine that passes queued functions to workers.
-func (pool *WorkerPool) process(ctx context.Context) {
- for {
- select {
- // Pool context cancelled
- case <-ctx.Done():
- for {
- select {
- // Pop and execute queued
- case fn := <-pool.queue:
- fn(ctx) // ctx is closed
-
- // Empty, wait for workers
- default:
- pool.wait.Wait()
- return
- }
- }
+// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed.
+func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool {
+ // Recover and drop any panic
+ defer func() { recover() }()
- // Queued func received
- case fn := <-pool.queue:
- pool.execute(ctx, fn)
+ for {
+ // Wait on next func
+ fn, ok := <-fns
+ if !ok {
+ return true
}
+
+ // Run with ctx
+ fn(ctx)
}
}
+// Stop will stop the WorkerPool management loop, blocking until stopped.
+func (pool *WorkerPool) Stop() bool {
+ return pool.svc.Stop()
+}
+
// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
-// This will block until the function has been queued. 'fn' will ALWAYS be executed, even on pool
-// close, which can be determined via context <-ctx.Done(). WorkerFuncs MUST respect the passed context.
+// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
+// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
+// WorkerFuncs MUST respect the passed context.
func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
// Check valid fn
if fn == nil {
@@ -132,29 +113,50 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
}
select {
- // Pool context cancelled
+ // Pool ctx cancelled
+ case <-pool.svc.Done():
+ fn(closedctx)
+
+ // Placed fn in queue
+ case pool.fns <- fn:
+ }
+}
+
+// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
+// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
+func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) {
+ // Check valid fn
+ if fn == nil {
+ return
+ }
+
+ select {
+ // Caller ctx cancelled
+ case <-ctx.Done():
+
+ // Pool ctx cancelled
case <-pool.svc.Done():
fn(closedctx)
// Placed fn in queue
- case pool.queue <- fn:
+ case pool.fns <- fn:
}
}
-// EnqueueNoBlock attempts Enqueue but returns false if not executed.
-func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {
+// EnqueueNow attempts Enqueue but returns false if not executed.
+func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
// Check valid fn
if fn == nil {
return false
}
select {
- // Pool context cancelled
+ // Pool ctx cancelled
case <-pool.svc.Done():
return false
// Placed fn in queue
- case pool.queue <- fn:
+ case pool.fns <- fn:
return true
// Queue is full
@@ -165,10 +167,5 @@ func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {
// Queue returns the number of currently queued WorkerFuncs.
func (pool *WorkerPool) Queue() int {
- return len(pool.queue)
-}
-
-// Workers returns the number of currently active workers.
-func (pool *WorkerPool) Workers() int {
- return len(pool.free)
+ return len(pool.fns)
}
diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go
index c0f878c45..68e8ea384 100644
--- a/vendor/codeberg.org/gruf/go-runners/service.go
+++ b/vendor/codeberg.org/gruf/go-runners/service.go
@@ -15,7 +15,7 @@ type Service struct {
mu sync.Mutex // mu protects state changes
}
-// Run will run the supplied function until completion, use given context to propagate cancel.
+// Run will run the supplied function until completion, using given context to propagate cancel.
// Immediately returns false if the Service is already running, and true after completed run.
func (svc *Service) Run(fn func(context.Context)) bool {
// Attempt to start the svc
@@ -39,6 +39,33 @@ func (svc *Service) Run(fn func(context.Context)) bool {
return true
}
+// GoRun will run the supplied function until completion in a goroutine, using given context to
+// propagate cancel. Immediately returns boolean indicating success, or that service is already running.
+func (svc *Service) GoRun(fn func(context.Context)) bool {
+ // Attempt to start the svc
+ ctx, ok := svc.doStart()
+ if !ok {
+ return false
+ }
+
+ go func() {
+ defer func() {
+ // unlock single wait
+ svc.wait.Unlock()
+
+ // ensure stopped
+ svc.Stop()
+ }()
+
+ // Run user func
+ if fn != nil {
+ fn(ctx)
+ }
+ }()
+
+ return true
+}
+
// Stop will attempt to stop the service, cancelling the running function's context. Immediately
// returns false if not running, and true only after Service is fully stopped.
func (svc *Service) Stop() bool {
diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go
index 7831a39bd..e94c00024 100644
--- a/vendor/codeberg.org/gruf/go-sched/job.go
+++ b/vendor/codeberg.org/gruf/go-sched/job.go
@@ -61,6 +61,11 @@ func (job *Job) With(t Timing) *Job {
panic("nil Timing")
}
+ if job.id != 0 {
+ // Cannot update scheduled job
+ panic("job already scheduled")
+ }
+
if job.timing == emptytiming {
// Set new timing
job.timing = t
@@ -76,12 +81,18 @@ func (job *Job) With(t Timing) *Job {
return job
}
-// Panic specifics how this job handles panics, default is an actual panic.
-func (job *Job) Panic(fn func(interface{})) *Job {
+// OnPanic specifies how this job handles panics, default is an actual panic.
+func (job *Job) OnPanic(fn func(interface{})) *Job {
if fn == nil {
// Ensure a function
panic("nil func")
}
+
+ if job.id != 0 {
+ // Cannot update scheduled job
+ panic("job already scheduled")
+ }
+
job.panic = fn
return job
}
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
index 8d076fea0..bdf0a371d 100644
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go
@@ -2,13 +2,18 @@ package sched
import (
"context"
+ "runtime"
"sort"
+ "sync"
+ "sync/atomic"
"time"
- "codeberg.org/gruf/go-atomics"
"codeberg.org/gruf/go-runners"
)
+// precision is the maximum time we can offer scheduler run-time precision down to.
+const precision = time.Millisecond
+
var (
// neverticks is a timer channel that never ticks (it's starved).
neverticks = make(chan time.Time)
@@ -27,20 +32,41 @@ type Scheduler struct {
jobs []*Job // jobs is a list of tracked Jobs to be executed
jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
svc runners.Service // svc manages the main scheduler routine
- jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs
-}
-
-// New returns a new Scheduler instance with given job change queue size.
-func NewScheduler(queue int) Scheduler {
- if queue < 0 {
- queue = 10
- }
- return Scheduler{jch: make(chan interface{}, queue)}
+ jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
}
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
func (sch *Scheduler) Start() bool {
- return sch.svc.Run(sch.run)
+ var block sync.Mutex
+
+ // Use mutex to synchronize between started
+ // goroutine and ourselves, to ensure that
+ // we don't return before Scheduler init'd.
+ block.Lock()
+ defer block.Unlock()
+
+ ok := sch.svc.GoRun(func(ctx context.Context) {
+ // Create Scheduler job channel
+ sch.jch = make(chan interface{})
+
+ // Unlock start routine
+ block.Unlock()
+
+ // Set GC finalizer to ensure scheduler stopped
+ runtime.SetFinalizer(sch, func(sch *Scheduler) {
+ _ = sch.Stop()
+ })
+
+ // Enter main loop
+ sch.run(ctx)
+ })
+
+ if ok {
+ // Wait on goroutine
+ block.Lock()
+ }
+
+ return ok
}
// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
@@ -55,24 +81,41 @@ func (sch *Scheduler) Running() bool {
// Schedule will add provided Job to the Scheduler, returning a cancel function.
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
- if job == nil {
- // Ensure there's a job!
+ switch {
+ // Check a job was passed
+ case job == nil:
panic("nil job")
+
+ // Check we are running
+ case sch.jch == nil:
+ panic("scheduler not running")
}
- // Get last known job ID
+ // Calculate next job ID
last := sch.jid.Load()
-
- // Give this job an ID and check overflow
- if job.id = sch.jid.Add(1); job.id < last {
- panic("scheduler job id overflow")
+ next := sch.jid.Add(1)
+ if next < last {
+ panic("job id overflow")
}
// Pass job to scheduler
+ job.id = next
sch.jch <- job
+ // Take ptrs to current state chs
+ ctx := sch.svc.Done()
+ jch := sch.jch
+
// Return cancel function for job ID
- return func() { sch.jch <- job.id }
+ return func() {
+ select {
+ // Sched stopped
+ case <-ctx:
+
+ // Cancel this job
+ case jch <- next:
+ }
+ }
}
// run is the main scheduler run routine, which runs for as long as ctx is valid.
@@ -136,11 +179,8 @@ func (sch *Scheduler) run(ctx context.Context) {
// don't bother sleeping. It's wasted cycles only
// sleeping for some obscenely tiny amount of time
// we can't guarantee precision for.
- const precision = time.Millisecond
-
if until := next.Sub(now); until <= precision/1e3 {
- // This job is behind schedule,
- // set timer to always tick
+ // This job is behind schedule, set to always tick.
tch = alwaysticks
} else {
// Reset timer to period
@@ -216,13 +256,13 @@ func (sch *Scheduler) schedule(now time.Time) {
return
}
+ // Pass job to runner
+ go job.Run(now)
+
// Update the next call time
next := job.timing.Next(now)
job.next.Store(next)
- // Run this job async!
- go job.Run(now)
-
if next.IsZero() {
// Zero time, this job is done and can be dropped
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)