diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 63 |
1 files changed, 35 insertions, 28 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index df19cf18b..537e588fe 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -2,7 +2,6 @@ package sched import ( "context" - "runtime" "sort" "sync" "sync/atomic" @@ -55,11 +54,6 @@ func (sch *Scheduler) Start(gorun func(func())) bool { sch.rgo = func(f func()) { go f() } } - // Set GC finalizer to ensure scheduler stopped - runtime.SetFinalizer(sch, func(sch *Scheduler) { - _ = sch.Stop() - }) - // Unlock start routine block.Unlock() @@ -80,11 +74,16 @@ func (sch *Scheduler) Stop() bool { return sch.svc.Stop() } -// Running will return whether Scheduler is running. +// Running will return whether Scheduler is running (i.e. NOT stopped / stopping). func (sch *Scheduler) Running() bool { return sch.svc.Running() } +// Done returns a channel that's closed when Scheduler.Stop() is called. +func (sch *Scheduler) Done() <-chan struct{} { + return sch.svc.Done() +} + // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { switch { @@ -127,20 +126,26 @@ func (sch *Scheduler) Schedule(job *Job) (cancel func()) { // run is the main scheduler run routine, which runs for as long as ctx is valid. func (sch *Scheduler) run(ctx context.Context) { var ( + // now stores the current time, and will only be + // set when the timer channel is set to be the + // 'alwaysticks' channel. this allows minimizing + // the number of calls required to time.Now(). + now time.Time + // timerset represents whether timer was running // for a particular run of the loop. false means - // that tch == neverticks || tch == alwaysticks + // that tch == neverticks || tch == alwaysticks. timerset bool - // timer tick channel (or a never-tick channel) + // timer tick channel (or always / never ticks). tch <-chan time.Time // timer notifies this main routine to wake when - // the job queued needs to be checked for executions + // the job queued needs to be checked for executions. timer *time.Timer // stopdrain will stop and drain the timer - // if it has been running (i.e. timerset == true) + // if it has been running (i.e. timerset == true). stopdrain = func() { if timerset && !timer.Stop() { <-timer.C @@ -148,33 +153,33 @@ func (sch *Scheduler) run(ctx context.Context) { } ) - // Create a stopped timer + // Create a stopped timer. timer = time.NewTimer(1) <-timer.C for { - // Reset timer state + // Reset timer state. timerset = false if len(sch.jobs) > 0 { - // Sort jobs by next occurring - sort.Sort(byNext(sch.jobs)) + // Get now time. + now = time.Now() - // Get execution time - now := time.Now() + // Sort jobs by next occurring. + sort.Sort(byNext(sch.jobs)) - // Get next job time + // Get next job time. next := sch.jobs[0].Next() - // If this job is _just_ about to be ready, we - // don't bother sleeping. It's wasted cycles only - // sleeping for some obscenely tiny amount of time - // we can't guarantee precision for. + // If this job is _just_ about to be ready, we don't bother + // sleeping. It's wasted cycles only sleeping for some obscenely + // tiny amount of time we can't guarantee precision for. if until := next.Sub(now); until <= precision/1e3 { - // This job is behind schedule, set to always tick. + // This job is behind, + // set to always tick. tch = alwaysticks } else { - // Reset timer to period + // Reset timer to period. timer.Reset(until) tch = timer.C timerset = true @@ -191,12 +196,14 @@ func (sch *Scheduler) run(ctx context.Context) { return // Timer ticked, run scheduled - case now := <-tch: + case t := <-tch: if !timerset { - // alwaysticks returns zero times - now = time.Now() + // 'alwaysticks' returns zero + // times, BUT 'now' will have + // been set during above sort. + t = now } - sch.schedule(now) + sch.schedule(t) // Received update, handle job/id case v := <-sch.jch: |