summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-sched/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go250
1 files changed, 0 insertions, 250 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
deleted file mode 100644
index 8d076fea0..000000000
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ /dev/null
@@ -1,250 +0,0 @@
-package sched
-
-import (
- "context"
- "sort"
- "time"
-
- "codeberg.org/gruf/go-atomics"
- "codeberg.org/gruf/go-runners"
-)
-
-var (
- // neverticks is a timer channel that never ticks (it's starved).
- neverticks = make(chan time.Time)
-
- // alwaysticks is a timer channel that always ticks (it's closed).
- alwaysticks = func() chan time.Time {
- ch := make(chan time.Time)
- close(ch)
- return ch
- }()
-)
-
-// Scheduler provides a means of running jobs at specific times and
-// regular intervals, all while sharing a single underlying timer.
-type Scheduler struct {
- jobs []*Job // jobs is a list of tracked Jobs to be executed
- jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
- svc runners.Service // svc manages the main scheduler routine
- jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs
-}
-
-// New returns a new Scheduler instance with given job change queue size.
-func NewScheduler(queue int) Scheduler {
- if queue < 0 {
- queue = 10
- }
- return Scheduler{jch: make(chan interface{}, queue)}
-}
-
-// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
-func (sch *Scheduler) Start() bool {
- return sch.svc.Run(sch.run)
-}
-
-// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
-func (sch *Scheduler) Stop() bool {
- return sch.svc.Stop()
-}
-
-// Running will return whether Scheduler is running.
-func (sch *Scheduler) Running() bool {
- return sch.svc.Running()
-}
-
-// Schedule will add provided Job to the Scheduler, returning a cancel function.
-func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
- if job == nil {
- // Ensure there's a job!
- panic("nil job")
- }
-
- // Get last known job ID
- last := sch.jid.Load()
-
- // Give this job an ID and check overflow
- if job.id = sch.jid.Add(1); job.id < last {
- panic("scheduler job id overflow")
- }
-
- // Pass job to scheduler
- sch.jch <- job
-
- // Return cancel function for job ID
- return func() { sch.jch <- job.id }
-}
-
-// run is the main scheduler run routine, which runs for as long as ctx is valid.
-func (sch *Scheduler) run(ctx context.Context) {
- var (
- // timerset represents whether timer was running
- // for a particular run of the loop. false means
- // that tch == neverticks || tch == alwaysticks
- timerset bool
-
- // timer tick channel (or a never-tick channel)
- tch <-chan time.Time
-
- // timer notifies this main routine to wake when
- // the job queued needs to be checked for executions
- timer *time.Timer
-
- // stopdrain will stop and drain the timer
- // if it has been running (i.e. timerset == true)
- stopdrain = func() {
- if timerset && !timer.Stop() {
- <-timer.C
- }
- }
- )
-
- for {
- select {
- // Handle received job/id
- case v := <-sch.jch:
- sch.handle(v)
- continue
-
- // No more
- default:
- }
-
- // Done
- break
- }
-
- // Create a stopped timer
- timer = time.NewTimer(1)
- <-timer.C
-
- for {
- // Reset timer state
- timerset = false
-
- if len(sch.jobs) > 0 {
- // Sort jobs by next occurring
- sort.Sort(byNext(sch.jobs))
-
- // Get execution time
- now := time.Now()
-
- // Get next job time
- next := sch.jobs[0].Next()
-
- // If this job is _just_ about to be ready, we
- // don't bother sleeping. It's wasted cycles only
- // sleeping for some obscenely tiny amount of time
- // we can't guarantee precision for.
- const precision = time.Millisecond
-
- if until := next.Sub(now); until <= precision/1e3 {
- // This job is behind schedule,
- // set timer to always tick
- tch = alwaysticks
- } else {
- // Reset timer to period
- timer.Reset(until)
- tch = timer.C
- timerset = true
- }
- } else {
- // Unset timer
- tch = neverticks
- }
-
- select {
- // Scheduler stopped
- case <-ctx.Done():
- stopdrain()
- return
-
- // Timer ticked, run scheduled
- case now := <-tch:
- if !timerset {
- // alwaysticks returns zero times
- now = time.Now()
- }
- sch.schedule(now)
-
- // Received update, handle job/id
- case v := <-sch.jch:
- sch.handle(v)
- stopdrain()
- }
- }
-}
-
-// handle takes an interfaces received from Scheduler.jch and handles either:
-// - Job --> new job to add.
-// - uint64 --> job ID to remove.
-func (sch *Scheduler) handle(v interface{}) {
- switch v := v.(type) {
- // New job added
- case *Job:
- // Get current time
- now := time.Now()
-
- // Update the next call time
- next := v.timing.Next(now)
- v.next.Store(next)
-
- // Append this job to queued
- sch.jobs = append(sch.jobs, v)
-
- // Job removed
- case uint64:
- for i := 0; i < len(sch.jobs); i++ {
- if sch.jobs[i].id == v {
- // This is the job we're looking for! Drop this
- sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
- return
- }
- }
- }
-}
-
-// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
-func (sch *Scheduler) schedule(now time.Time) {
- for i := 0; i < len(sch.jobs); {
- // Scope our own var
- job := sch.jobs[i]
-
- // We know these jobs are ordered by .Next(), so as soon
- // as we reach one with .Next() after now, we can return
- if job.Next().After(now) {
- return
- }
-
- // Update the next call time
- next := job.timing.Next(now)
- job.next.Store(next)
-
- // Run this job async!
- go job.Run(now)
-
- if next.IsZero() {
- // Zero time, this job is done and can be dropped
- sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
- continue
- }
-
- // Iter
- i++
- }
-}
-
-// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
-type byNext []*Job
-
-func (by byNext) Len() int {
- return len(by)
-}
-
-func (by byNext) Less(i int, j int) bool {
- return by[i].Next().Before(by[j].Next())
-}
-
-func (by byNext) Swap(i int, j int) {
- by[i], by[j] = by[j], by[i]
-}