diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/README.md | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/job.go | 64 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 225 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/timing.go | 25 |
4 files changed, 176 insertions, 140 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/README.md b/vendor/codeberg.org/gruf/go-sched/README.md index d32a961ae..0a9439577 100644 --- a/vendor/codeberg.org/gruf/go-sched/README.md +++ b/vendor/codeberg.org/gruf/go-sched/README.md @@ -2,4 +2,4 @@ A simple job (both run-once and recurring) queueing library with down-to millisecond precision. -Precision estimates based on test output (running on i7-11800h): 1ms precision with 80% tolerance.
\ No newline at end of file +Precision estimates based on test output (running on AMD Ryzen 7 7840u): 2ms precision with 95% tolerance. diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go index 2531769d6..f3bd869d2 100644 --- a/vendor/codeberg.org/gruf/go-sched/job.go +++ b/vendor/codeberg.org/gruf/go-sched/job.go @@ -14,10 +14,9 @@ import ( // holding onto a next execution time safely in a concurrent environment. type Job struct { id uint64 - next unsafe.Pointer // *time.Time + next atomic_time timing Timing call func(time.Time) - panic func(interface{}) } // NewJob returns a new Job to run given function. @@ -30,28 +29,31 @@ func NewJob(fn func(now time.Time)) *Job { j := &Job{ // set defaults timing: emptytiming, // i.e. fire immediately call: fn, - panic: func(i interface{}) { panic(i) }, } return j } -// At sets this Job to execute at time, by passing (*sched.Once)(&at) to .With(). See .With() for details. +// At sets this Job to execute at time, by passing +// (*sched.Once)(&at) to .With(). See .With() for details. func (job *Job) At(at time.Time) *Job { return job.With((*Once)(&at)) } -// Every sets this Job to execute every period, by passing sched.Period(period) to .With(). See .With() for details. +// Every sets this Job to execute every period, by passing +// sched.Period(period) to .With(). See .With() for details. func (job *Job) Every(period time.Duration) *Job { return job.With(Periodic(period)) } -// EveryAt sets this Job to execute every period starting at time, by passing &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details. +// EveryAt sets this Job to execute every period starting at time, by passing +// &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details. func (job *Job) EveryAt(at time.Time, period time.Duration) *Job { return job.With(&PeriodicAt{Once: Once(at), Period: Periodic(period)}) } -// With sets this Job's timing to given implementation, or if already set will wrap existing using sched.TimingWrap{}. +// With sets this Job's timing to given implementation, or +// if already set will wrap existing using sched.TimingWrap{}. func (job *Job) With(t Timing) *Job { if t == nil { // Ensure a timing @@ -78,44 +80,16 @@ func (job *Job) With(t Timing) *Job { return job } -// OnPanic specifies how this job handles panics, default is an actual panic. -func (job *Job) OnPanic(fn func(interface{})) *Job { - if fn == nil { - // Ensure a function - panic("nil func") - } - - if job.id != 0 { - // Cannot update scheduled job - panic("job already scheduled") - } - - job.panic = fn - return job -} - // Next returns the next time this Job is expected to run. func (job *Job) Next() time.Time { - return loadTime(&job.next) + return job.next.Load() } // Run will execute this Job and pass through given now time. -func (job *Job) Run(now time.Time) { - defer func() { - switch r := recover(); { - case r == nil: - // no panic - case job != nil && - job.panic != nil: - job.panic(r) - default: - panic(r) - } - }() - job.call(now) -} +func (job *Job) Run(now time.Time) { job.call(now) } -// String provides a debuggable string representation of Job including ID, next time and Timing type. +// String provides a debuggable string representation +// of Job including ID, next time and Timing type. func (job *Job) String() string { var buf strings.Builder buf.WriteByte('{') @@ -123,7 +97,7 @@ func (job *Job) String() string { buf.WriteString(strconv.FormatUint(job.id, 10)) buf.WriteByte(' ') buf.WriteString("next=") - buf.WriteString(loadTime(&job.next).Format(time.StampMicro)) + buf.WriteString(job.next.Load().Format(time.StampMicro)) buf.WriteByte(' ') buf.WriteString("timing=") buf.WriteString(reflect.TypeOf(job.timing).String()) @@ -131,13 +105,15 @@ func (job *Job) String() string { return buf.String() } -func loadTime(p *unsafe.Pointer) time.Time { - if p := atomic.LoadPointer(p); p != nil { +type atomic_time struct{ p unsafe.Pointer } + +func (t *atomic_time) Load() time.Time { + if p := atomic.LoadPointer(&t.p); p != nil { return *(*time.Time)(p) } return zerotime } -func storeTime(p *unsafe.Pointer, t time.Time) { - atomic.StorePointer(p, unsafe.Pointer(&t)) +func (t *atomic_time) Store(v time.Time) { + atomic.StorePointer(&t.p, unsafe.Pointer(&v)) } diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index 79913a9b3..9ab5b8bc8 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -6,18 +6,22 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "codeberg.org/gruf/go-runners" ) -// precision is the maximum time we can offer scheduler run-time precision down to. -const precision = time.Millisecond +// Precision is the maximum time we can +// offer scheduler run-time precision down to. +const Precision = 2 * time.Millisecond var ( - // neverticks is a timer channel that never ticks (it's starved). + // neverticks is a timer channel + // that never ticks (it's starved). neverticks = make(chan time.Time) - // alwaysticks is a timer channel that always ticks (it's closed). + // alwaysticks is a timer channel + // that always ticks (it's closed). alwaysticks = func() chan time.Time { ch := make(chan time.Time) close(ch) @@ -28,48 +32,51 @@ var ( // Scheduler provides a means of running jobs at specific times and // regular intervals, all while sharing a single underlying timer. type Scheduler struct { - jobs []*Job // jobs is a list of tracked Jobs to be executed - jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs - svc runners.Service // svc manages the main scheduler routine - jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs - rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs + svc runners.Service // svc manages the main scheduler routine + jobs []*Job // jobs is a list of tracked Jobs to be executed + jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs + jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs } -// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start(gorun func(func())) bool { - var block sync.Mutex +// Start will attempt to start the Scheduler. Immediately returns false +// if the Service is already running, and true after completed run. +func (sch *Scheduler) Start() bool { + var wait sync.WaitGroup - // Use mutex to synchronize between started + // Use waiter to synchronize between started // goroutine and ourselves, to ensure that // we don't return before Scheduler init'd. - block.Lock() - defer block.Unlock() + wait.Add(1) ok := sch.svc.GoRun(func(ctx context.Context) { - // Create Scheduler job channel - sch.jch = make(chan interface{}) - - // Set goroutine runner function - if sch.rgo = gorun; sch.rgo == nil { - sch.rgo = func(f func()) { go f() } - } - - // Unlock start routine - block.Unlock() - - // Enter main loop - sch.run(ctx) + // Prepare new channel. + ch := new(channel) + ch.ctx = ctx.Done() + ch.ch = make(chan interface{}) + sch.jch.Store(ch) + + // Release + // start fn + wait.Done() + + // Main loop + sch.run(ch) }) if ok { - // Wait on goroutine - block.Lock() + // Wait on + // goroutine + wait.Wait() + } else { + // Release + wait.Done() } return ok } -// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. +// Stop will attempt to stop the Scheduler. Immediately returns false +// if not running, and true only after Scheduler is fully stopped. func (sch *Scheduler) Stop() bool { return sch.svc.Stop() } @@ -86,45 +93,38 @@ func (sch *Scheduler) Done() <-chan struct{} { // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - switch { - // Check a job was passed - case job == nil: + if job == nil { panic("nil job") - - // Check we are running - case !sch.Running(): - panic("scheduler not running") } - // Calculate next job ID - last := sch.jid.Load() - next := sch.jid.Add(1) - if next < last { - panic("job id overflow") + // Load job channel. + ch := sch.jch.Load() + if ch == nil { + panic("not running") } - // Pass job to scheduler - job.id = next - sch.jch <- job + // Calculate next job ID. + job.id = sch.jid.Add(1) - // Take ptrs to current state chs - ctx := sch.svc.Done() - jch := sch.jch - - // Return cancel function for job ID - return func() { - select { - // Sched stopped - case <-ctx: - - // Cancel this job - case jch <- next: - } + // Pass job + // to channel. + if !ch.w(job) { + panic("not running") } + + // Return cancel function for job + return func() { ch.w(job.id) } } // run is the main scheduler run routine, which runs for as long as ctx is valid. -func (sch *Scheduler) run(ctx context.Context) { +func (sch *Scheduler) run(ch *channel) { + defer ch.close() + if ch == nil { + panic("nil channel") + } else if sch == nil { + panic("nil scheduler") + } + var ( // now stores the current time, and will only be // set when the timer channel is set to be the @@ -165,39 +165,43 @@ func (sch *Scheduler) run(ctx context.Context) { // Get now time. now = time.Now() - // Sort jobs by next occurring. + // Sort by next occurring. sort.Sort(byNext(sch.jobs)) // Get next job time. next := sch.jobs[0].Next() - // If this job is _just_ about to be ready, we don't bother + // If this job is *just* about to be ready, we don't bother // sleeping. It's wasted cycles only sleeping for some obscenely // tiny amount of time we can't guarantee precision for. - if until := next.Sub(now); until <= precision/1e3 { + if until := next.Sub(now); until <= Precision/1e3 { + // This job is behind, // set to always tick. tch = alwaysticks } else { + // Reset timer to period. timer.Reset(until) - tch = timer.C timerset = true + tch = timer.C } } else { + // Unset timer tch = neverticks } select { // Scheduler stopped - case <-ctx.Done(): + case <-ch.done(): stopdrain() return - // Timer ticked, run scheduled - case t := <-tch: - if !timerset { + // Timer ticked, + // run scheduled. + case t, ok := <-tch: + if !ok { // 'alwaysticks' returns zero // times, BUT 'now' will have // been set during above sort. @@ -205,8 +209,9 @@ func (sch *Scheduler) run(ctx context.Context) { } sch.schedule(t) - // Received update, handle job/id - case v := <-sch.jch: + // Received update, + // handle job/id. + case v := <-ch.r(): sch.handle(v) stopdrain() } @@ -220,21 +225,21 @@ func (sch *Scheduler) handle(v interface{}) { switch v := v.(type) { // New job added case *Job: - // Get current time + // Get current time. now := time.Now() - // Update the next call time + // Update next call time. next := v.timing.Next(now) - storeTime(&v.next, next) + v.next.Store(next) - // Append this job to queued + // Append this job to queued/ sch.jobs = append(sch.jobs, v) // Job removed case uint64: for i := 0; i < len(sch.jobs); i++ { if sch.jobs[i].id == v { - // This is the job we're looking for! Drop this + // This is the job we're looking for! Drop this. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) return } @@ -242,29 +247,28 @@ func (sch *Scheduler) handle(v interface{}) { } } -// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. +// schedule will iterate through the scheduler jobs and +// execute those necessary, updating their next call time. func (sch *Scheduler) schedule(now time.Time) { for i := 0; i < len(sch.jobs); { - // Scope our own var + // Scope our own var. job := sch.jobs[i] // We know these jobs are ordered by .Next(), so as soon - // as we reach one with .Next() after now, we can return + // as we reach one with .Next() after now, we can return. if job.Next().After(now) { return } - // Pass to runner - sch.rgo(func() { - job.Run(now) - }) + // Run the job. + go job.Run(now) - // Update the next call time + // Update the next call time. next := job.timing.Next(now) - storeTime(&job.next, next) + job.next.Store(next) if next.IsZero() { - // Zero time, this job is done and can be dropped + // Zero time, this job is done and can be dropped. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) continue } @@ -274,7 +278,8 @@ func (sch *Scheduler) schedule(now time.Time) { } } -// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. +// byNext is an implementation of sort.Interface +// to sort Jobs by their .Next() time. type byNext []*Job func (by byNext) Len() int { @@ -288,3 +293,51 @@ func (by byNext) Less(i int, j int) bool { func (by byNext) Swap(i int, j int) { by[i], by[j] = by[j], by[i] } + +// atomic_channel wraps a *channel{} with atomic store / load. +type atomic_channel struct{ p unsafe.Pointer } + +func (c *atomic_channel) Load() *channel { + if p := atomic.LoadPointer(&c.p); p != nil { + return (*channel)(p) + } + return nil +} + +func (c *atomic_channel) Store(v *channel) { + atomic.StorePointer(&c.p, unsafe.Pointer(v)) +} + +// channel wraps both a context done +// channel and a generic interface channel +// to support safe writing to an underlying +// channel that correctly fails after close. +type channel struct { + ctx <-chan struct{} + ch chan interface{} +} + +// done returns internal context channel. +func (ch *channel) done() <-chan struct{} { + return ch.ctx +} + +// r returns internal channel for read. +func (ch *channel) r() chan interface{} { + return ch.ch +} + +// w writes 'v' to channel, or returns false if closed. +func (ch *channel) w(v interface{}) bool { + select { + case <-ch.ctx: + return false + case ch.ch <- v: + return true + } +} + +// close closes underlying channel. +func (ch *channel) close() { + close(ch.ch) +} diff --git a/vendor/codeberg.org/gruf/go-sched/timing.go b/vendor/codeberg.org/gruf/go-sched/timing.go index 33c230fa5..cb9b4925a 100644 --- a/vendor/codeberg.org/gruf/go-sched/timing.go +++ b/vendor/codeberg.org/gruf/go-sched/timing.go @@ -5,11 +5,13 @@ import ( ) var ( - // zerotime is zero time.Time (unix epoch). - zerotime = time.Time{} + // zerotime is zero + // time.Time (unix epoch). + zerotime time.Time - // emptytiming is a global timingempty to check against. - emptytiming = timingempty{} + // emptytiming is a global + // timingempty to check against. + emptytiming timingempty ) // Timing provides scheduling for a Job, determining the next time @@ -20,14 +22,16 @@ type Timing interface { Next(time.Time) time.Time } -// timingempty is a 'zero' Timing implementation that always returns zero time. +// timingempty is a 'zero' Timing implementation +// that always returns zero time. type timingempty struct{} func (timingempty) Next(time.Time) time.Time { return zerotime } -// Once implements Timing to provide a run-once Job execution. +// Once implements Timing to +// provide a run-once Job execution. type Once time.Time func (o *Once) Next(time.Time) time.Time { @@ -36,14 +40,16 @@ func (o *Once) Next(time.Time) time.Time { return ret } -// Periodic implements Timing to provide a recurring Job execution. +// Periodic implements Timing to +// provide a recurring Job execution. type Periodic time.Duration func (p Periodic) Next(now time.Time) time.Time { return now.Add(time.Duration(p)) } -// PeriodicAt implements Timing to provide a recurring Job execution starting at 'Once' time. +// PeriodicAt implements Timing to provide a +// recurring Job execution starting at 'Once' time. type PeriodicAt struct { Once Once Period Periodic @@ -56,7 +62,8 @@ func (p *PeriodicAt) Next(now time.Time) time.Time { return p.Period.Next(now) } -// TimingWrap allows combining two different Timing implementations. +// TimingWrap allows combining two +// different Timing implementations. type TimingWrap struct { Outer Timing Inner Timing |
