diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 250 |
1 files changed, 0 insertions, 250 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go deleted file mode 100644 index 8d076fea0..000000000 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ /dev/null @@ -1,250 +0,0 @@ -package sched - -import ( - "context" - "sort" - "time" - - "codeberg.org/gruf/go-atomics" - "codeberg.org/gruf/go-runners" -) - -var ( - // neverticks is a timer channel that never ticks (it's starved). - neverticks = make(chan time.Time) - - // alwaysticks is a timer channel that always ticks (it's closed). - alwaysticks = func() chan time.Time { - ch := make(chan time.Time) - close(ch) - return ch - }() -) - -// Scheduler provides a means of running jobs at specific times and -// regular intervals, all while sharing a single underlying timer. -type Scheduler struct { - jobs []*Job // jobs is a list of tracked Jobs to be executed - jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs - svc runners.Service // svc manages the main scheduler routine - jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs -} - -// New returns a new Scheduler instance with given job change queue size. -func NewScheduler(queue int) Scheduler { - if queue < 0 { - queue = 10 - } - return Scheduler{jch: make(chan interface{}, queue)} -} - -// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start() bool { - return sch.svc.Run(sch.run) -} - -// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. -func (sch *Scheduler) Stop() bool { - return sch.svc.Stop() -} - -// Running will return whether Scheduler is running. -func (sch *Scheduler) Running() bool { - return sch.svc.Running() -} - -// Schedule will add provided Job to the Scheduler, returning a cancel function. -func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - if job == nil { - // Ensure there's a job! - panic("nil job") - } - - // Get last known job ID - last := sch.jid.Load() - - // Give this job an ID and check overflow - if job.id = sch.jid.Add(1); job.id < last { - panic("scheduler job id overflow") - } - - // Pass job to scheduler - sch.jch <- job - - // Return cancel function for job ID - return func() { sch.jch <- job.id } -} - -// run is the main scheduler run routine, which runs for as long as ctx is valid. -func (sch *Scheduler) run(ctx context.Context) { - var ( - // timerset represents whether timer was running - // for a particular run of the loop. false means - // that tch == neverticks || tch == alwaysticks - timerset bool - - // timer tick channel (or a never-tick channel) - tch <-chan time.Time - - // timer notifies this main routine to wake when - // the job queued needs to be checked for executions - timer *time.Timer - - // stopdrain will stop and drain the timer - // if it has been running (i.e. timerset == true) - stopdrain = func() { - if timerset && !timer.Stop() { - <-timer.C - } - } - ) - - for { - select { - // Handle received job/id - case v := <-sch.jch: - sch.handle(v) - continue - - // No more - default: - } - - // Done - break - } - - // Create a stopped timer - timer = time.NewTimer(1) - <-timer.C - - for { - // Reset timer state - timerset = false - - if len(sch.jobs) > 0 { - // Sort jobs by next occurring - sort.Sort(byNext(sch.jobs)) - - // Get execution time - now := time.Now() - - // Get next job time - next := sch.jobs[0].Next() - - // If this job is _just_ about to be ready, we - // don't bother sleeping. It's wasted cycles only - // sleeping for some obscenely tiny amount of time - // we can't guarantee precision for. - const precision = time.Millisecond - - if until := next.Sub(now); until <= precision/1e3 { - // This job is behind schedule, - // set timer to always tick - tch = alwaysticks - } else { - // Reset timer to period - timer.Reset(until) - tch = timer.C - timerset = true - } - } else { - // Unset timer - tch = neverticks - } - - select { - // Scheduler stopped - case <-ctx.Done(): - stopdrain() - return - - // Timer ticked, run scheduled - case now := <-tch: - if !timerset { - // alwaysticks returns zero times - now = time.Now() - } - sch.schedule(now) - - // Received update, handle job/id - case v := <-sch.jch: - sch.handle(v) - stopdrain() - } - } -} - -// handle takes an interfaces received from Scheduler.jch and handles either: -// - Job --> new job to add. -// - uint64 --> job ID to remove. -func (sch *Scheduler) handle(v interface{}) { - switch v := v.(type) { - // New job added - case *Job: - // Get current time - now := time.Now() - - // Update the next call time - next := v.timing.Next(now) - v.next.Store(next) - - // Append this job to queued - sch.jobs = append(sch.jobs, v) - - // Job removed - case uint64: - for i := 0; i < len(sch.jobs); i++ { - if sch.jobs[i].id == v { - // This is the job we're looking for! Drop this - sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) - return - } - } - } -} - -// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. -func (sch *Scheduler) schedule(now time.Time) { - for i := 0; i < len(sch.jobs); { - // Scope our own var - job := sch.jobs[i] - - // We know these jobs are ordered by .Next(), so as soon - // as we reach one with .Next() after now, we can return - if job.Next().After(now) { - return - } - - // Update the next call time - next := job.timing.Next(now) - job.next.Store(next) - - // Run this job async! - go job.Run(now) - - if next.IsZero() { - // Zero time, this job is done and can be dropped - sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) - continue - } - - // Iter - i++ - } -} - -// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. -type byNext []*Job - -func (by byNext) Len() int { - return len(by) -} - -func (by byNext) Less(i int, j int) bool { - return by[i].Next().Before(by[j].Next()) -} - -func (by byNext) Swap(i int, j int) { - by[i], by[j] = by[j], by[i] -} |