diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 92 |
1 files changed, 66 insertions, 26 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index 8d076fea0..bdf0a371d 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -2,13 +2,18 @@ package sched import ( "context" + "runtime" "sort" + "sync" + "sync/atomic" "time" - "codeberg.org/gruf/go-atomics" "codeberg.org/gruf/go-runners" ) +// precision is the maximum time we can offer scheduler run-time precision down to. +const precision = time.Millisecond + var ( // neverticks is a timer channel that never ticks (it's starved). neverticks = make(chan time.Time) @@ -27,20 +32,41 @@ type Scheduler struct { jobs []*Job // jobs is a list of tracked Jobs to be executed jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs svc runners.Service // svc manages the main scheduler routine - jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs -} - -// New returns a new Scheduler instance with given job change queue size. -func NewScheduler(queue int) Scheduler { - if queue < 0 { - queue = 10 - } - return Scheduler{jch: make(chan interface{}, queue)} + jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs } // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. func (sch *Scheduler) Start() bool { - return sch.svc.Run(sch.run) + var block sync.Mutex + + // Use mutex to synchronize between started + // goroutine and ourselves, to ensure that + // we don't return before Scheduler init'd. + block.Lock() + defer block.Unlock() + + ok := sch.svc.GoRun(func(ctx context.Context) { + // Create Scheduler job channel + sch.jch = make(chan interface{}) + + // Unlock start routine + block.Unlock() + + // Set GC finalizer to ensure scheduler stopped + runtime.SetFinalizer(sch, func(sch *Scheduler) { + _ = sch.Stop() + }) + + // Enter main loop + sch.run(ctx) + }) + + if ok { + // Wait on goroutine + block.Lock() + } + + return ok } // Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. @@ -55,24 +81,41 @@ func (sch *Scheduler) Running() bool { // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - if job == nil { - // Ensure there's a job! + switch { + // Check a job was passed + case job == nil: panic("nil job") + + // Check we are running + case sch.jch == nil: + panic("scheduler not running") } - // Get last known job ID + // Calculate next job ID last := sch.jid.Load() - - // Give this job an ID and check overflow - if job.id = sch.jid.Add(1); job.id < last { - panic("scheduler job id overflow") + next := sch.jid.Add(1) + if next < last { + panic("job id overflow") } // Pass job to scheduler + job.id = next sch.jch <- job + // Take ptrs to current state chs + ctx := sch.svc.Done() + jch := sch.jch + // Return cancel function for job ID - return func() { sch.jch <- job.id } + return func() { + select { + // Sched stopped + case <-ctx: + + // Cancel this job + case jch <- next: + } + } } // run is the main scheduler run routine, which runs for as long as ctx is valid. @@ -136,11 +179,8 @@ func (sch *Scheduler) run(ctx context.Context) { // don't bother sleeping. It's wasted cycles only // sleeping for some obscenely tiny amount of time // we can't guarantee precision for. - const precision = time.Millisecond - if until := next.Sub(now); until <= precision/1e3 { - // This job is behind schedule, - // set timer to always tick + // This job is behind schedule, set to always tick. tch = alwaysticks } else { // Reset timer to period @@ -216,13 +256,13 @@ func (sch *Scheduler) schedule(now time.Time) { return } + // Pass job to runner + go job.Run(now) + // Update the next call time next := job.timing.Next(now) job.next.Store(next) - // Run this job async! - go job.Run(now) - if next.IsZero() { // Zero time, this job is done and can be dropped sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) |