summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-sched/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go92
1 files changed, 66 insertions, 26 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
index 8d076fea0..bdf0a371d 100644
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go
@@ -2,13 +2,18 @@ package sched
import (
"context"
+ "runtime"
"sort"
+ "sync"
+ "sync/atomic"
"time"
- "codeberg.org/gruf/go-atomics"
"codeberg.org/gruf/go-runners"
)
+// precision is the maximum time we can offer scheduler run-time precision down to.
+const precision = time.Millisecond
+
var (
// neverticks is a timer channel that never ticks (it's starved).
neverticks = make(chan time.Time)
@@ -27,20 +32,41 @@ type Scheduler struct {
jobs []*Job // jobs is a list of tracked Jobs to be executed
jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
svc runners.Service // svc manages the main scheduler routine
- jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs
-}
-
-// New returns a new Scheduler instance with given job change queue size.
-func NewScheduler(queue int) Scheduler {
- if queue < 0 {
- queue = 10
- }
- return Scheduler{jch: make(chan interface{}, queue)}
+ jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
}
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
func (sch *Scheduler) Start() bool {
- return sch.svc.Run(sch.run)
+ var block sync.Mutex
+
+ // Use mutex to synchronize between started
+ // goroutine and ourselves, to ensure that
+ // we don't return before Scheduler init'd.
+ block.Lock()
+ defer block.Unlock()
+
+ ok := sch.svc.GoRun(func(ctx context.Context) {
+ // Create Scheduler job channel
+ sch.jch = make(chan interface{})
+
+ // Unlock start routine
+ block.Unlock()
+
+ // Set GC finalizer to ensure scheduler stopped
+ runtime.SetFinalizer(sch, func(sch *Scheduler) {
+ _ = sch.Stop()
+ })
+
+ // Enter main loop
+ sch.run(ctx)
+ })
+
+ if ok {
+ // Wait on goroutine
+ block.Lock()
+ }
+
+ return ok
}
// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
@@ -55,24 +81,41 @@ func (sch *Scheduler) Running() bool {
// Schedule will add provided Job to the Scheduler, returning a cancel function.
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
- if job == nil {
- // Ensure there's a job!
+ switch {
+ // Check a job was passed
+ case job == nil:
panic("nil job")
+
+ // Check we are running
+ case sch.jch == nil:
+ panic("scheduler not running")
}
- // Get last known job ID
+ // Calculate next job ID
last := sch.jid.Load()
-
- // Give this job an ID and check overflow
- if job.id = sch.jid.Add(1); job.id < last {
- panic("scheduler job id overflow")
+ next := sch.jid.Add(1)
+ if next < last {
+ panic("job id overflow")
}
// Pass job to scheduler
+ job.id = next
sch.jch <- job
+ // Take ptrs to current state chs
+ ctx := sch.svc.Done()
+ jch := sch.jch
+
// Return cancel function for job ID
- return func() { sch.jch <- job.id }
+ return func() {
+ select {
+ // Sched stopped
+ case <-ctx:
+
+ // Cancel this job
+ case jch <- next:
+ }
+ }
}
// run is the main scheduler run routine, which runs for as long as ctx is valid.
@@ -136,11 +179,8 @@ func (sch *Scheduler) run(ctx context.Context) {
// don't bother sleeping. It's wasted cycles only
// sleeping for some obscenely tiny amount of time
// we can't guarantee precision for.
- const precision = time.Millisecond
-
if until := next.Sub(now); until <= precision/1e3 {
- // This job is behind schedule,
- // set timer to always tick
+ // This job is behind schedule, set to always tick.
tch = alwaysticks
} else {
// Reset timer to period
@@ -216,13 +256,13 @@ func (sch *Scheduler) schedule(now time.Time) {
return
}
+ // Pass job to runner
+ go job.Run(now)
+
// Update the next call time
next := job.timing.Next(now)
job.next.Store(next)
- // Run this job async!
- go job.Run(now)
-
if next.IsZero() {
// Zero time, this job is done and can be dropped
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)