diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 343 |
1 files changed, 0 insertions, 343 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go deleted file mode 100644 index 9ab5b8bc8..000000000 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ /dev/null @@ -1,343 +0,0 @@ -package sched - -import ( - "context" - "sort" - "sync" - "sync/atomic" - "time" - "unsafe" - - "codeberg.org/gruf/go-runners" -) - -// Precision is the maximum time we can -// offer scheduler run-time precision down to. -const Precision = 2 * time.Millisecond - -var ( - // neverticks is a timer channel - // that never ticks (it's starved). - neverticks = make(chan time.Time) - - // alwaysticks is a timer channel - // that always ticks (it's closed). - alwaysticks = func() chan time.Time { - ch := make(chan time.Time) - close(ch) - return ch - }() -) - -// Scheduler provides a means of running jobs at specific times and -// regular intervals, all while sharing a single underlying timer. -type Scheduler struct { - svc runners.Service // svc manages the main scheduler routine - jobs []*Job // jobs is a list of tracked Jobs to be executed - jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs - jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs -} - -// Start will attempt to start the Scheduler. Immediately returns false -// if the Service is already running, and true after completed run. -func (sch *Scheduler) Start() bool { - var wait sync.WaitGroup - - // Use waiter to synchronize between started - // goroutine and ourselves, to ensure that - // we don't return before Scheduler init'd. - wait.Add(1) - - ok := sch.svc.GoRun(func(ctx context.Context) { - // Prepare new channel. - ch := new(channel) - ch.ctx = ctx.Done() - ch.ch = make(chan interface{}) - sch.jch.Store(ch) - - // Release - // start fn - wait.Done() - - // Main loop - sch.run(ch) - }) - - if ok { - // Wait on - // goroutine - wait.Wait() - } else { - // Release - wait.Done() - } - - return ok -} - -// Stop will attempt to stop the Scheduler. Immediately returns false -// if not running, and true only after Scheduler is fully stopped. -func (sch *Scheduler) Stop() bool { - return sch.svc.Stop() -} - -// Running will return whether Scheduler is running (i.e. NOT stopped / stopping). -func (sch *Scheduler) Running() bool { - return sch.svc.Running() -} - -// Done returns a channel that's closed when Scheduler.Stop() is called. -func (sch *Scheduler) Done() <-chan struct{} { - return sch.svc.Done() -} - -// Schedule will add provided Job to the Scheduler, returning a cancel function. -func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - if job == nil { - panic("nil job") - } - - // Load job channel. - ch := sch.jch.Load() - if ch == nil { - panic("not running") - } - - // Calculate next job ID. - job.id = sch.jid.Add(1) - - // Pass job - // to channel. - if !ch.w(job) { - panic("not running") - } - - // Return cancel function for job - return func() { ch.w(job.id) } -} - -// run is the main scheduler run routine, which runs for as long as ctx is valid. -func (sch *Scheduler) run(ch *channel) { - defer ch.close() - if ch == nil { - panic("nil channel") - } else if sch == nil { - panic("nil scheduler") - } - - var ( - // now stores the current time, and will only be - // set when the timer channel is set to be the - // 'alwaysticks' channel. this allows minimizing - // the number of calls required to time.Now(). - now time.Time - - // timerset represents whether timer was running - // for a particular run of the loop. false means - // that tch == neverticks || tch == alwaysticks. - timerset bool - - // timer tick channel (or always / never ticks). - tch <-chan time.Time - - // timer notifies this main routine to wake when - // the job queued needs to be checked for executions. - timer *time.Timer - - // stopdrain will stop and drain the timer - // if it has been running (i.e. timerset == true). - stopdrain = func() { - if timerset && !timer.Stop() { - <-timer.C - } - } - ) - - // Create a stopped timer. - timer = time.NewTimer(1) - <-timer.C - - for { - // Reset timer state. - timerset = false - - if len(sch.jobs) > 0 { - // Get now time. - now = time.Now() - - // Sort by next occurring. - sort.Sort(byNext(sch.jobs)) - - // Get next job time. - next := sch.jobs[0].Next() - - // If this job is *just* about to be ready, we don't bother - // sleeping. It's wasted cycles only sleeping for some obscenely - // tiny amount of time we can't guarantee precision for. - if until := next.Sub(now); until <= Precision/1e3 { - - // This job is behind, - // set to always tick. - tch = alwaysticks - } else { - - // Reset timer to period. - timer.Reset(until) - timerset = true - tch = timer.C - } - } else { - - // Unset timer - tch = neverticks - } - - select { - // Scheduler stopped - case <-ch.done(): - stopdrain() - return - - // Timer ticked, - // run scheduled. - case t, ok := <-tch: - if !ok { - // 'alwaysticks' returns zero - // times, BUT 'now' will have - // been set during above sort. - t = now - } - sch.schedule(t) - - // Received update, - // handle job/id. - case v := <-ch.r(): - sch.handle(v) - stopdrain() - } - } -} - -// handle takes an interfaces received from Scheduler.jch and handles either: -// - Job --> new job to add. -// - uint64 --> job ID to remove. -func (sch *Scheduler) handle(v interface{}) { - switch v := v.(type) { - // New job added - case *Job: - // Get current time. - now := time.Now() - - // Update next call time. - next := v.timing.Next(now) - v.next.Store(next) - - // Append this job to queued/ - sch.jobs = append(sch.jobs, v) - - // Job removed - case uint64: - for i := 0; i < len(sch.jobs); i++ { - if sch.jobs[i].id == v { - // This is the job we're looking for! Drop this. - sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) - return - } - } - } -} - -// schedule will iterate through the scheduler jobs and -// execute those necessary, updating their next call time. -func (sch *Scheduler) schedule(now time.Time) { - for i := 0; i < len(sch.jobs); { - // Scope our own var. - job := sch.jobs[i] - - // We know these jobs are ordered by .Next(), so as soon - // as we reach one with .Next() after now, we can return. - if job.Next().After(now) { - return - } - - // Run the job. - go job.Run(now) - - // Update the next call time. - next := job.timing.Next(now) - job.next.Store(next) - - if next.IsZero() { - // Zero time, this job is done and can be dropped. - sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) - continue - } - - // Iter - i++ - } -} - -// byNext is an implementation of sort.Interface -// to sort Jobs by their .Next() time. -type byNext []*Job - -func (by byNext) Len() int { - return len(by) -} - -func (by byNext) Less(i int, j int) bool { - return by[i].Next().Before(by[j].Next()) -} - -func (by byNext) Swap(i int, j int) { - by[i], by[j] = by[j], by[i] -} - -// atomic_channel wraps a *channel{} with atomic store / load. -type atomic_channel struct{ p unsafe.Pointer } - -func (c *atomic_channel) Load() *channel { - if p := atomic.LoadPointer(&c.p); p != nil { - return (*channel)(p) - } - return nil -} - -func (c *atomic_channel) Store(v *channel) { - atomic.StorePointer(&c.p, unsafe.Pointer(v)) -} - -// channel wraps both a context done -// channel and a generic interface channel -// to support safe writing to an underlying -// channel that correctly fails after close. -type channel struct { - ctx <-chan struct{} - ch chan interface{} -} - -// done returns internal context channel. -func (ch *channel) done() <-chan struct{} { - return ch.ctx -} - -// r returns internal channel for read. -func (ch *channel) r() chan interface{} { - return ch.ch -} - -// w writes 'v' to channel, or returns false if closed. -func (ch *channel) w(v interface{}) bool { - select { - case <-ch.ctx: - return false - case ch.ch <- v: - return true - } -} - -// close closes underlying channel. -func (ch *channel) close() { - close(ch.ch) -} |
