diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 225 |
1 files changed, 139 insertions, 86 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index 79913a9b3..9ab5b8bc8 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -6,18 +6,22 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "codeberg.org/gruf/go-runners" ) -// precision is the maximum time we can offer scheduler run-time precision down to. -const precision = time.Millisecond +// Precision is the maximum time we can +// offer scheduler run-time precision down to. +const Precision = 2 * time.Millisecond var ( - // neverticks is a timer channel that never ticks (it's starved). + // neverticks is a timer channel + // that never ticks (it's starved). neverticks = make(chan time.Time) - // alwaysticks is a timer channel that always ticks (it's closed). + // alwaysticks is a timer channel + // that always ticks (it's closed). alwaysticks = func() chan time.Time { ch := make(chan time.Time) close(ch) @@ -28,48 +32,51 @@ var ( // Scheduler provides a means of running jobs at specific times and // regular intervals, all while sharing a single underlying timer. type Scheduler struct { - jobs []*Job // jobs is a list of tracked Jobs to be executed - jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs - svc runners.Service // svc manages the main scheduler routine - jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs - rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs + svc runners.Service // svc manages the main scheduler routine + jobs []*Job // jobs is a list of tracked Jobs to be executed + jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs + jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs } -// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start(gorun func(func())) bool { - var block sync.Mutex +// Start will attempt to start the Scheduler. Immediately returns false +// if the Service is already running, and true after completed run. +func (sch *Scheduler) Start() bool { + var wait sync.WaitGroup - // Use mutex to synchronize between started + // Use waiter to synchronize between started // goroutine and ourselves, to ensure that // we don't return before Scheduler init'd. - block.Lock() - defer block.Unlock() + wait.Add(1) ok := sch.svc.GoRun(func(ctx context.Context) { - // Create Scheduler job channel - sch.jch = make(chan interface{}) - - // Set goroutine runner function - if sch.rgo = gorun; sch.rgo == nil { - sch.rgo = func(f func()) { go f() } - } - - // Unlock start routine - block.Unlock() - - // Enter main loop - sch.run(ctx) + // Prepare new channel. + ch := new(channel) + ch.ctx = ctx.Done() + ch.ch = make(chan interface{}) + sch.jch.Store(ch) + + // Release + // start fn + wait.Done() + + // Main loop + sch.run(ch) }) if ok { - // Wait on goroutine - block.Lock() + // Wait on + // goroutine + wait.Wait() + } else { + // Release + wait.Done() } return ok } -// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. +// Stop will attempt to stop the Scheduler. Immediately returns false +// if not running, and true only after Scheduler is fully stopped. func (sch *Scheduler) Stop() bool { return sch.svc.Stop() } @@ -86,45 +93,38 @@ func (sch *Scheduler) Done() <-chan struct{} { // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - switch { - // Check a job was passed - case job == nil: + if job == nil { panic("nil job") - - // Check we are running - case !sch.Running(): - panic("scheduler not running") } - // Calculate next job ID - last := sch.jid.Load() - next := sch.jid.Add(1) - if next < last { - panic("job id overflow") + // Load job channel. + ch := sch.jch.Load() + if ch == nil { + panic("not running") } - // Pass job to scheduler - job.id = next - sch.jch <- job + // Calculate next job ID. + job.id = sch.jid.Add(1) - // Take ptrs to current state chs - ctx := sch.svc.Done() - jch := sch.jch - - // Return cancel function for job ID - return func() { - select { - // Sched stopped - case <-ctx: - - // Cancel this job - case jch <- next: - } + // Pass job + // to channel. + if !ch.w(job) { + panic("not running") } + + // Return cancel function for job + return func() { ch.w(job.id) } } // run is the main scheduler run routine, which runs for as long as ctx is valid. -func (sch *Scheduler) run(ctx context.Context) { +func (sch *Scheduler) run(ch *channel) { + defer ch.close() + if ch == nil { + panic("nil channel") + } else if sch == nil { + panic("nil scheduler") + } + var ( // now stores the current time, and will only be // set when the timer channel is set to be the @@ -165,39 +165,43 @@ func (sch *Scheduler) run(ctx context.Context) { // Get now time. now = time.Now() - // Sort jobs by next occurring. + // Sort by next occurring. sort.Sort(byNext(sch.jobs)) // Get next job time. next := sch.jobs[0].Next() - // If this job is _just_ about to be ready, we don't bother + // If this job is *just* about to be ready, we don't bother // sleeping. It's wasted cycles only sleeping for some obscenely // tiny amount of time we can't guarantee precision for. - if until := next.Sub(now); until <= precision/1e3 { + if until := next.Sub(now); until <= Precision/1e3 { + // This job is behind, // set to always tick. tch = alwaysticks } else { + // Reset timer to period. timer.Reset(until) - tch = timer.C timerset = true + tch = timer.C } } else { + // Unset timer tch = neverticks } select { // Scheduler stopped - case <-ctx.Done(): + case <-ch.done(): stopdrain() return - // Timer ticked, run scheduled - case t := <-tch: - if !timerset { + // Timer ticked, + // run scheduled. + case t, ok := <-tch: + if !ok { // 'alwaysticks' returns zero // times, BUT 'now' will have // been set during above sort. @@ -205,8 +209,9 @@ func (sch *Scheduler) run(ctx context.Context) { } sch.schedule(t) - // Received update, handle job/id - case v := <-sch.jch: + // Received update, + // handle job/id. + case v := <-ch.r(): sch.handle(v) stopdrain() } @@ -220,21 +225,21 @@ func (sch *Scheduler) handle(v interface{}) { switch v := v.(type) { // New job added case *Job: - // Get current time + // Get current time. now := time.Now() - // Update the next call time + // Update next call time. next := v.timing.Next(now) - storeTime(&v.next, next) + v.next.Store(next) - // Append this job to queued + // Append this job to queued/ sch.jobs = append(sch.jobs, v) // Job removed case uint64: for i := 0; i < len(sch.jobs); i++ { if sch.jobs[i].id == v { - // This is the job we're looking for! Drop this + // This is the job we're looking for! Drop this. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) return } @@ -242,29 +247,28 @@ func (sch *Scheduler) handle(v interface{}) { } } -// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. +// schedule will iterate through the scheduler jobs and +// execute those necessary, updating their next call time. func (sch *Scheduler) schedule(now time.Time) { for i := 0; i < len(sch.jobs); { - // Scope our own var + // Scope our own var. job := sch.jobs[i] // We know these jobs are ordered by .Next(), so as soon - // as we reach one with .Next() after now, we can return + // as we reach one with .Next() after now, we can return. if job.Next().After(now) { return } - // Pass to runner - sch.rgo(func() { - job.Run(now) - }) + // Run the job. + go job.Run(now) - // Update the next call time + // Update the next call time. next := job.timing.Next(now) - storeTime(&job.next, next) + job.next.Store(next) if next.IsZero() { - // Zero time, this job is done and can be dropped + // Zero time, this job is done and can be dropped. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) continue } @@ -274,7 +278,8 @@ func (sch *Scheduler) schedule(now time.Time) { } } -// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. +// byNext is an implementation of sort.Interface +// to sort Jobs by their .Next() time. type byNext []*Job func (by byNext) Len() int { @@ -288,3 +293,51 @@ func (by byNext) Less(i int, j int) bool { func (by byNext) Swap(i int, j int) { by[i], by[j] = by[j], by[i] } + +// atomic_channel wraps a *channel{} with atomic store / load. +type atomic_channel struct{ p unsafe.Pointer } + +func (c *atomic_channel) Load() *channel { + if p := atomic.LoadPointer(&c.p); p != nil { + return (*channel)(p) + } + return nil +} + +func (c *atomic_channel) Store(v *channel) { + atomic.StorePointer(&c.p, unsafe.Pointer(v)) +} + +// channel wraps both a context done +// channel and a generic interface channel +// to support safe writing to an underlying +// channel that correctly fails after close. +type channel struct { + ctx <-chan struct{} + ch chan interface{} +} + +// done returns internal context channel. +func (ch *channel) done() <-chan struct{} { + return ch.ctx +} + +// r returns internal channel for read. +func (ch *channel) r() chan interface{} { + return ch.ch +} + +// w writes 'v' to channel, or returns false if closed. +func (ch *channel) w(v interface{}) bool { + select { + case <-ch.ctx: + return false + case ch.ch <- v: + return true + } +} + +// close closes underlying channel. +func (ch *channel) close() { + close(ch.ch) +} |
