diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go new file mode 100644 index 000000000..d017ddcf6 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -0,0 +1,240 @@ +package sched + +import ( + "context" + "sort" + "time" + + "codeberg.org/gruf/go-atomics" + "codeberg.org/gruf/go-runners" +) + +var ( + // neverticks is a timer channel that never ticks (it's starved). + neverticks = make(chan time.Time) + + // alwaysticks is a timer channel that always ticks (it's closed). + alwaysticks = func() chan time.Time { + ch := make(chan time.Time) + close(ch) + return ch + }() +) + +// Scheduler provides a means of running jobs at specific times and +// regular intervals, all while sharing a single underlying timer. +type Scheduler struct { + jobs []*Job // jobs is a list of tracked Jobs to be executed + jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs + svc runners.Service // svc manages the main scheduler routine + jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs +} + +// New returns a new Scheduler instance with given job change queue size. +func NewScheduler(queue int) Scheduler { + if queue < 0 { + queue = 10 + } + return Scheduler{jch: make(chan interface{}, queue)} +} + +// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. +func (sch *Scheduler) Start() bool { + return sch.svc.Run(sch.run) +} + +// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. +func (sch *Scheduler) Stop() bool { + return sch.svc.Stop() +} + +// Running will return whether Scheduler is running. +func (sch *Scheduler) Running() bool { + return sch.svc.Running() +} + +// Schedule will add provided Job to the Scheduler, returning a cancel function. +func (sch *Scheduler) Schedule(job *Job) (cancel func()) { + if job == nil { + // Ensure there's a job! + panic("nil job") + } + + // Get last known job ID + last := sch.jid.Load() + + // Give this job an ID and check overflow + if job.id = sch.jid.Add(1); job.id < last { + panic("scheduler job id overflow") + } + + // Pass job to scheduler + sch.jch <- job + + // Return cancel function for job ID + return func() { sch.jch <- job.id } +} + +// run is the main scheduler run routine, which runs for as long as ctx is valid. +func (sch *Scheduler) run(ctx context.Context) { + var ( + // timerset represents whether timer was running + // for a particular run of the loop. false means + // that tch == neverticks || tch == alwaysticks + timerset bool + + // timer tick channel (or a never-tick channel) + tch <-chan time.Time + + // timer notifies this main routine to wake when + // the job queued needs to be checked for executions + timer *time.Timer + + // stopdrain will stop and drain the timer + // if it has been running (i.e. timerset == true) + stopdrain = func() { + if timerset && !timer.Stop() { + <-timer.C + } + } + ) + + for { + select { + // Handle received job/id + case v := <-sch.jch: + sch.handle(v) + continue + + // No more + default: + } + + // Done + break + } + + // Create a stopped timer + timer = time.NewTimer(1) + <-timer.C + + for { + // Reset timer state + timerset = false + + if len(sch.jobs) > 0 { + // Sort jobs by next occurring + sort.Sort(byNext(sch.jobs)) + + // Get execution time + now := time.Now() + + // Get next job time + next := sch.jobs[0].Next() + + if until := next.Sub(now); until <= 0 { + // This job is behind schedule, + // set timer to always tick + tch = alwaysticks + } else { + // Reset timer to period + timer.Reset(until) + tch = timer.C + timerset = true + } + } else { + // Unset timer + tch = neverticks + } + + select { + // Scheduler stopped + case <-ctx.Done(): + stopdrain() + return + + // Timer ticked, run scheduled + case now := <-tch: + sch.schedule(now) + + // Received update, handle job/id + case v := <-sch.jch: + sch.handle(v) + stopdrain() + } + } +} + +// handle takes an interfaces received from Scheduler.jch and handles either: +// - Job --> new job to add. +// - uint64 --> job ID to remove. +func (sch *Scheduler) handle(v interface{}) { + switch v := v.(type) { + // New job added + case *Job: + // Get current time + now := time.Now() + + // Update the next call time + next := v.timing.Next(now) + v.next.Store(next) + + // Append this job to queued + sch.jobs = append(sch.jobs, v) + + // Job removed + case uint64: + for i := 0; i < len(sch.jobs); i++ { + if sch.jobs[i].id == v { + // This is the job we're looking for! Drop this + sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) + return + } + } + } +} + +// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. +func (sch *Scheduler) schedule(now time.Time) { + for i := 0; i < len(sch.jobs); { + // Scope our own var + job := sch.jobs[i] + + // We know these jobs are ordered by .Next(), so as soon + // as we reach one with .Next() after now, we can return + if job.Next().After(now) { + return + } + + // Update the next call time + next := job.timing.Next(now) + job.next.Store(next) + + // Run this job async! + go job.Run(now) + + if job.Next().IsZero() { + // Zero time, this job is done and can be dropped + sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) + continue + } + + // Iter + i++ + } +} + +// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. +type byNext []*Job + +func (by byNext) Len() int { + return len(by) +} + +func (by byNext) Less(i int, j int) bool { + return by[i].Next().Before(by[j].Next()) +} + +func (by byNext) Swap(i int, j int) { + by[i], by[j] = by[j], by[i] +} |