summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-sched/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go290
1 files changed, 0 insertions, 290 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
deleted file mode 100644
index 79913a9b3..000000000
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ /dev/null
@@ -1,290 +0,0 @@
-package sched
-
-import (
- "context"
- "sort"
- "sync"
- "sync/atomic"
- "time"
-
- "codeberg.org/gruf/go-runners"
-)
-
-// precision is the maximum time we can offer scheduler run-time precision down to.
-const precision = time.Millisecond
-
-var (
- // neverticks is a timer channel that never ticks (it's starved).
- neverticks = make(chan time.Time)
-
- // alwaysticks is a timer channel that always ticks (it's closed).
- alwaysticks = func() chan time.Time {
- ch := make(chan time.Time)
- close(ch)
- return ch
- }()
-)
-
-// Scheduler provides a means of running jobs at specific times and
-// regular intervals, all while sharing a single underlying timer.
-type Scheduler struct {
- jobs []*Job // jobs is a list of tracked Jobs to be executed
- jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
- svc runners.Service // svc manages the main scheduler routine
- jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
- rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs
-}
-
-// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
-func (sch *Scheduler) Start(gorun func(func())) bool {
- var block sync.Mutex
-
- // Use mutex to synchronize between started
- // goroutine and ourselves, to ensure that
- // we don't return before Scheduler init'd.
- block.Lock()
- defer block.Unlock()
-
- ok := sch.svc.GoRun(func(ctx context.Context) {
- // Create Scheduler job channel
- sch.jch = make(chan interface{})
-
- // Set goroutine runner function
- if sch.rgo = gorun; sch.rgo == nil {
- sch.rgo = func(f func()) { go f() }
- }
-
- // Unlock start routine
- block.Unlock()
-
- // Enter main loop
- sch.run(ctx)
- })
-
- if ok {
- // Wait on goroutine
- block.Lock()
- }
-
- return ok
-}
-
-// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
-func (sch *Scheduler) Stop() bool {
- return sch.svc.Stop()
-}
-
-// Running will return whether Scheduler is running (i.e. NOT stopped / stopping).
-func (sch *Scheduler) Running() bool {
- return sch.svc.Running()
-}
-
-// Done returns a channel that's closed when Scheduler.Stop() is called.
-func (sch *Scheduler) Done() <-chan struct{} {
- return sch.svc.Done()
-}
-
-// Schedule will add provided Job to the Scheduler, returning a cancel function.
-func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
- switch {
- // Check a job was passed
- case job == nil:
- panic("nil job")
-
- // Check we are running
- case !sch.Running():
- panic("scheduler not running")
- }
-
- // Calculate next job ID
- last := sch.jid.Load()
- next := sch.jid.Add(1)
- if next < last {
- panic("job id overflow")
- }
-
- // Pass job to scheduler
- job.id = next
- sch.jch <- job
-
- // Take ptrs to current state chs
- ctx := sch.svc.Done()
- jch := sch.jch
-
- // Return cancel function for job ID
- return func() {
- select {
- // Sched stopped
- case <-ctx:
-
- // Cancel this job
- case jch <- next:
- }
- }
-}
-
-// run is the main scheduler run routine, which runs for as long as ctx is valid.
-func (sch *Scheduler) run(ctx context.Context) {
- var (
- // now stores the current time, and will only be
- // set when the timer channel is set to be the
- // 'alwaysticks' channel. this allows minimizing
- // the number of calls required to time.Now().
- now time.Time
-
- // timerset represents whether timer was running
- // for a particular run of the loop. false means
- // that tch == neverticks || tch == alwaysticks.
- timerset bool
-
- // timer tick channel (or always / never ticks).
- tch <-chan time.Time
-
- // timer notifies this main routine to wake when
- // the job queued needs to be checked for executions.
- timer *time.Timer
-
- // stopdrain will stop and drain the timer
- // if it has been running (i.e. timerset == true).
- stopdrain = func() {
- if timerset && !timer.Stop() {
- <-timer.C
- }
- }
- )
-
- // Create a stopped timer.
- timer = time.NewTimer(1)
- <-timer.C
-
- for {
- // Reset timer state.
- timerset = false
-
- if len(sch.jobs) > 0 {
- // Get now time.
- now = time.Now()
-
- // Sort jobs by next occurring.
- sort.Sort(byNext(sch.jobs))
-
- // Get next job time.
- next := sch.jobs[0].Next()
-
- // If this job is _just_ about to be ready, we don't bother
- // sleeping. It's wasted cycles only sleeping for some obscenely
- // tiny amount of time we can't guarantee precision for.
- if until := next.Sub(now); until <= precision/1e3 {
- // This job is behind,
- // set to always tick.
- tch = alwaysticks
- } else {
- // Reset timer to period.
- timer.Reset(until)
- tch = timer.C
- timerset = true
- }
- } else {
- // Unset timer
- tch = neverticks
- }
-
- select {
- // Scheduler stopped
- case <-ctx.Done():
- stopdrain()
- return
-
- // Timer ticked, run scheduled
- case t := <-tch:
- if !timerset {
- // 'alwaysticks' returns zero
- // times, BUT 'now' will have
- // been set during above sort.
- t = now
- }
- sch.schedule(t)
-
- // Received update, handle job/id
- case v := <-sch.jch:
- sch.handle(v)
- stopdrain()
- }
- }
-}
-
-// handle takes an interfaces received from Scheduler.jch and handles either:
-// - Job --> new job to add.
-// - uint64 --> job ID to remove.
-func (sch *Scheduler) handle(v interface{}) {
- switch v := v.(type) {
- // New job added
- case *Job:
- // Get current time
- now := time.Now()
-
- // Update the next call time
- next := v.timing.Next(now)
- storeTime(&v.next, next)
-
- // Append this job to queued
- sch.jobs = append(sch.jobs, v)
-
- // Job removed
- case uint64:
- for i := 0; i < len(sch.jobs); i++ {
- if sch.jobs[i].id == v {
- // This is the job we're looking for! Drop this
- sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
- return
- }
- }
- }
-}
-
-// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
-func (sch *Scheduler) schedule(now time.Time) {
- for i := 0; i < len(sch.jobs); {
- // Scope our own var
- job := sch.jobs[i]
-
- // We know these jobs are ordered by .Next(), so as soon
- // as we reach one with .Next() after now, we can return
- if job.Next().After(now) {
- return
- }
-
- // Pass to runner
- sch.rgo(func() {
- job.Run(now)
- })
-
- // Update the next call time
- next := job.timing.Next(now)
- storeTime(&job.next, next)
-
- if next.IsZero() {
- // Zero time, this job is done and can be dropped
- sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
- continue
- }
-
- // Iter
- i++
- }
-}
-
-// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
-type byNext []*Job
-
-func (by byNext) Len() int {
- return len(by)
-}
-
-func (by byNext) Less(i int, j int) bool {
- return by[i].Next().Before(by[j].Next())
-}
-
-func (by byNext) Swap(i int, j int) {
- by[i], by[j] = by[j], by[i]
-}