summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-sched
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched')
-rw-r--r--vendor/codeberg.org/gruf/go-sched/README.md2
-rw-r--r--vendor/codeberg.org/gruf/go-sched/job.go64
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go225
-rw-r--r--vendor/codeberg.org/gruf/go-sched/timing.go25
4 files changed, 176 insertions, 140 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/README.md b/vendor/codeberg.org/gruf/go-sched/README.md
index d32a961ae..0a9439577 100644
--- a/vendor/codeberg.org/gruf/go-sched/README.md
+++ b/vendor/codeberg.org/gruf/go-sched/README.md
@@ -2,4 +2,4 @@
A simple job (both run-once and recurring) queueing library with down-to millisecond precision.
-Precision estimates based on test output (running on i7-11800h): 1ms precision with 80% tolerance. \ No newline at end of file
+Precision estimates based on test output (running on AMD Ryzen 7 7840u): 2ms precision with 95% tolerance.
diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go
index 2531769d6..f3bd869d2 100644
--- a/vendor/codeberg.org/gruf/go-sched/job.go
+++ b/vendor/codeberg.org/gruf/go-sched/job.go
@@ -14,10 +14,9 @@ import (
// holding onto a next execution time safely in a concurrent environment.
type Job struct {
id uint64
- next unsafe.Pointer // *time.Time
+ next atomic_time
timing Timing
call func(time.Time)
- panic func(interface{})
}
// NewJob returns a new Job to run given function.
@@ -30,28 +29,31 @@ func NewJob(fn func(now time.Time)) *Job {
j := &Job{ // set defaults
timing: emptytiming, // i.e. fire immediately
call: fn,
- panic: func(i interface{}) { panic(i) },
}
return j
}
-// At sets this Job to execute at time, by passing (*sched.Once)(&at) to .With(). See .With() for details.
+// At sets this Job to execute at time, by passing
+// (*sched.Once)(&at) to .With(). See .With() for details.
func (job *Job) At(at time.Time) *Job {
return job.With((*Once)(&at))
}
-// Every sets this Job to execute every period, by passing sched.Period(period) to .With(). See .With() for details.
+// Every sets this Job to execute every period, by passing
+// sched.Period(period) to .With(). See .With() for details.
func (job *Job) Every(period time.Duration) *Job {
return job.With(Periodic(period))
}
-// EveryAt sets this Job to execute every period starting at time, by passing &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details.
+// EveryAt sets this Job to execute every period starting at time, by passing
+// &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details.
func (job *Job) EveryAt(at time.Time, period time.Duration) *Job {
return job.With(&PeriodicAt{Once: Once(at), Period: Periodic(period)})
}
-// With sets this Job's timing to given implementation, or if already set will wrap existing using sched.TimingWrap{}.
+// With sets this Job's timing to given implementation, or
+// if already set will wrap existing using sched.TimingWrap{}.
func (job *Job) With(t Timing) *Job {
if t == nil {
// Ensure a timing
@@ -78,44 +80,16 @@ func (job *Job) With(t Timing) *Job {
return job
}
-// OnPanic specifies how this job handles panics, default is an actual panic.
-func (job *Job) OnPanic(fn func(interface{})) *Job {
- if fn == nil {
- // Ensure a function
- panic("nil func")
- }
-
- if job.id != 0 {
- // Cannot update scheduled job
- panic("job already scheduled")
- }
-
- job.panic = fn
- return job
-}
-
// Next returns the next time this Job is expected to run.
func (job *Job) Next() time.Time {
- return loadTime(&job.next)
+ return job.next.Load()
}
// Run will execute this Job and pass through given now time.
-func (job *Job) Run(now time.Time) {
- defer func() {
- switch r := recover(); {
- case r == nil:
- // no panic
- case job != nil &&
- job.panic != nil:
- job.panic(r)
- default:
- panic(r)
- }
- }()
- job.call(now)
-}
+func (job *Job) Run(now time.Time) { job.call(now) }
-// String provides a debuggable string representation of Job including ID, next time and Timing type.
+// String provides a debuggable string representation
+// of Job including ID, next time and Timing type.
func (job *Job) String() string {
var buf strings.Builder
buf.WriteByte('{')
@@ -123,7 +97,7 @@ func (job *Job) String() string {
buf.WriteString(strconv.FormatUint(job.id, 10))
buf.WriteByte(' ')
buf.WriteString("next=")
- buf.WriteString(loadTime(&job.next).Format(time.StampMicro))
+ buf.WriteString(job.next.Load().Format(time.StampMicro))
buf.WriteByte(' ')
buf.WriteString("timing=")
buf.WriteString(reflect.TypeOf(job.timing).String())
@@ -131,13 +105,15 @@ func (job *Job) String() string {
return buf.String()
}
-func loadTime(p *unsafe.Pointer) time.Time {
- if p := atomic.LoadPointer(p); p != nil {
+type atomic_time struct{ p unsafe.Pointer }
+
+func (t *atomic_time) Load() time.Time {
+ if p := atomic.LoadPointer(&t.p); p != nil {
return *(*time.Time)(p)
}
return zerotime
}
-func storeTime(p *unsafe.Pointer, t time.Time) {
- atomic.StorePointer(p, unsafe.Pointer(&t))
+func (t *atomic_time) Store(v time.Time) {
+ atomic.StorePointer(&t.p, unsafe.Pointer(&v))
}
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
index 79913a9b3..9ab5b8bc8 100644
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go
@@ -6,18 +6,22 @@ import (
"sync"
"sync/atomic"
"time"
+ "unsafe"
"codeberg.org/gruf/go-runners"
)
-// precision is the maximum time we can offer scheduler run-time precision down to.
-const precision = time.Millisecond
+// Precision is the maximum time we can
+// offer scheduler run-time precision down to.
+const Precision = 2 * time.Millisecond
var (
- // neverticks is a timer channel that never ticks (it's starved).
+ // neverticks is a timer channel
+ // that never ticks (it's starved).
neverticks = make(chan time.Time)
- // alwaysticks is a timer channel that always ticks (it's closed).
+ // alwaysticks is a timer channel
+ // that always ticks (it's closed).
alwaysticks = func() chan time.Time {
ch := make(chan time.Time)
close(ch)
@@ -28,48 +32,51 @@ var (
// Scheduler provides a means of running jobs at specific times and
// regular intervals, all while sharing a single underlying timer.
type Scheduler struct {
- jobs []*Job // jobs is a list of tracked Jobs to be executed
- jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
- svc runners.Service // svc manages the main scheduler routine
- jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
- rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs
+ svc runners.Service // svc manages the main scheduler routine
+ jobs []*Job // jobs is a list of tracked Jobs to be executed
+ jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs
+ jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
}
-// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
-func (sch *Scheduler) Start(gorun func(func())) bool {
- var block sync.Mutex
+// Start will attempt to start the Scheduler. Immediately returns false
+// if the Service is already running, and true after completed run.
+func (sch *Scheduler) Start() bool {
+ var wait sync.WaitGroup
- // Use mutex to synchronize between started
+ // Use waiter to synchronize between started
// goroutine and ourselves, to ensure that
// we don't return before Scheduler init'd.
- block.Lock()
- defer block.Unlock()
+ wait.Add(1)
ok := sch.svc.GoRun(func(ctx context.Context) {
- // Create Scheduler job channel
- sch.jch = make(chan interface{})
-
- // Set goroutine runner function
- if sch.rgo = gorun; sch.rgo == nil {
- sch.rgo = func(f func()) { go f() }
- }
-
- // Unlock start routine
- block.Unlock()
-
- // Enter main loop
- sch.run(ctx)
+ // Prepare new channel.
+ ch := new(channel)
+ ch.ctx = ctx.Done()
+ ch.ch = make(chan interface{})
+ sch.jch.Store(ch)
+
+ // Release
+ // start fn
+ wait.Done()
+
+ // Main loop
+ sch.run(ch)
})
if ok {
- // Wait on goroutine
- block.Lock()
+ // Wait on
+ // goroutine
+ wait.Wait()
+ } else {
+ // Release
+ wait.Done()
}
return ok
}
-// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
+// Stop will attempt to stop the Scheduler. Immediately returns false
+// if not running, and true only after Scheduler is fully stopped.
func (sch *Scheduler) Stop() bool {
return sch.svc.Stop()
}
@@ -86,45 +93,38 @@ func (sch *Scheduler) Done() <-chan struct{} {
// Schedule will add provided Job to the Scheduler, returning a cancel function.
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
- switch {
- // Check a job was passed
- case job == nil:
+ if job == nil {
panic("nil job")
-
- // Check we are running
- case !sch.Running():
- panic("scheduler not running")
}
- // Calculate next job ID
- last := sch.jid.Load()
- next := sch.jid.Add(1)
- if next < last {
- panic("job id overflow")
+ // Load job channel.
+ ch := sch.jch.Load()
+ if ch == nil {
+ panic("not running")
}
- // Pass job to scheduler
- job.id = next
- sch.jch <- job
+ // Calculate next job ID.
+ job.id = sch.jid.Add(1)
- // Take ptrs to current state chs
- ctx := sch.svc.Done()
- jch := sch.jch
-
- // Return cancel function for job ID
- return func() {
- select {
- // Sched stopped
- case <-ctx:
-
- // Cancel this job
- case jch <- next:
- }
+ // Pass job
+ // to channel.
+ if !ch.w(job) {
+ panic("not running")
}
+
+ // Return cancel function for job
+ return func() { ch.w(job.id) }
}
// run is the main scheduler run routine, which runs for as long as ctx is valid.
-func (sch *Scheduler) run(ctx context.Context) {
+func (sch *Scheduler) run(ch *channel) {
+ defer ch.close()
+ if ch == nil {
+ panic("nil channel")
+ } else if sch == nil {
+ panic("nil scheduler")
+ }
+
var (
// now stores the current time, and will only be
// set when the timer channel is set to be the
@@ -165,39 +165,43 @@ func (sch *Scheduler) run(ctx context.Context) {
// Get now time.
now = time.Now()
- // Sort jobs by next occurring.
+ // Sort by next occurring.
sort.Sort(byNext(sch.jobs))
// Get next job time.
next := sch.jobs[0].Next()
- // If this job is _just_ about to be ready, we don't bother
+ // If this job is *just* about to be ready, we don't bother
// sleeping. It's wasted cycles only sleeping for some obscenely
// tiny amount of time we can't guarantee precision for.
- if until := next.Sub(now); until <= precision/1e3 {
+ if until := next.Sub(now); until <= Precision/1e3 {
+
// This job is behind,
// set to always tick.
tch = alwaysticks
} else {
+
// Reset timer to period.
timer.Reset(until)
- tch = timer.C
timerset = true
+ tch = timer.C
}
} else {
+
// Unset timer
tch = neverticks
}
select {
// Scheduler stopped
- case <-ctx.Done():
+ case <-ch.done():
stopdrain()
return
- // Timer ticked, run scheduled
- case t := <-tch:
- if !timerset {
+ // Timer ticked,
+ // run scheduled.
+ case t, ok := <-tch:
+ if !ok {
// 'alwaysticks' returns zero
// times, BUT 'now' will have
// been set during above sort.
@@ -205,8 +209,9 @@ func (sch *Scheduler) run(ctx context.Context) {
}
sch.schedule(t)
- // Received update, handle job/id
- case v := <-sch.jch:
+ // Received update,
+ // handle job/id.
+ case v := <-ch.r():
sch.handle(v)
stopdrain()
}
@@ -220,21 +225,21 @@ func (sch *Scheduler) handle(v interface{}) {
switch v := v.(type) {
// New job added
case *Job:
- // Get current time
+ // Get current time.
now := time.Now()
- // Update the next call time
+ // Update next call time.
next := v.timing.Next(now)
- storeTime(&v.next, next)
+ v.next.Store(next)
- // Append this job to queued
+ // Append this job to queued/
sch.jobs = append(sch.jobs, v)
// Job removed
case uint64:
for i := 0; i < len(sch.jobs); i++ {
if sch.jobs[i].id == v {
- // This is the job we're looking for! Drop this
+ // This is the job we're looking for! Drop this.
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
return
}
@@ -242,29 +247,28 @@ func (sch *Scheduler) handle(v interface{}) {
}
}
-// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
+// schedule will iterate through the scheduler jobs and
+// execute those necessary, updating their next call time.
func (sch *Scheduler) schedule(now time.Time) {
for i := 0; i < len(sch.jobs); {
- // Scope our own var
+ // Scope our own var.
job := sch.jobs[i]
// We know these jobs are ordered by .Next(), so as soon
- // as we reach one with .Next() after now, we can return
+ // as we reach one with .Next() after now, we can return.
if job.Next().After(now) {
return
}
- // Pass to runner
- sch.rgo(func() {
- job.Run(now)
- })
+ // Run the job.
+ go job.Run(now)
- // Update the next call time
+ // Update the next call time.
next := job.timing.Next(now)
- storeTime(&job.next, next)
+ job.next.Store(next)
if next.IsZero() {
- // Zero time, this job is done and can be dropped
+ // Zero time, this job is done and can be dropped.
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
continue
}
@@ -274,7 +278,8 @@ func (sch *Scheduler) schedule(now time.Time) {
}
}
-// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
+// byNext is an implementation of sort.Interface
+// to sort Jobs by their .Next() time.
type byNext []*Job
func (by byNext) Len() int {
@@ -288,3 +293,51 @@ func (by byNext) Less(i int, j int) bool {
func (by byNext) Swap(i int, j int) {
by[i], by[j] = by[j], by[i]
}
+
+// atomic_channel wraps a *channel{} with atomic store / load.
+type atomic_channel struct{ p unsafe.Pointer }
+
+func (c *atomic_channel) Load() *channel {
+ if p := atomic.LoadPointer(&c.p); p != nil {
+ return (*channel)(p)
+ }
+ return nil
+}
+
+func (c *atomic_channel) Store(v *channel) {
+ atomic.StorePointer(&c.p, unsafe.Pointer(v))
+}
+
+// channel wraps both a context done
+// channel and a generic interface channel
+// to support safe writing to an underlying
+// channel that correctly fails after close.
+type channel struct {
+ ctx <-chan struct{}
+ ch chan interface{}
+}
+
+// done returns internal context channel.
+func (ch *channel) done() <-chan struct{} {
+ return ch.ctx
+}
+
+// r returns internal channel for read.
+func (ch *channel) r() chan interface{} {
+ return ch.ch
+}
+
+// w writes 'v' to channel, or returns false if closed.
+func (ch *channel) w(v interface{}) bool {
+ select {
+ case <-ch.ctx:
+ return false
+ case ch.ch <- v:
+ return true
+ }
+}
+
+// close closes underlying channel.
+func (ch *channel) close() {
+ close(ch.ch)
+}
diff --git a/vendor/codeberg.org/gruf/go-sched/timing.go b/vendor/codeberg.org/gruf/go-sched/timing.go
index 33c230fa5..cb9b4925a 100644
--- a/vendor/codeberg.org/gruf/go-sched/timing.go
+++ b/vendor/codeberg.org/gruf/go-sched/timing.go
@@ -5,11 +5,13 @@ import (
)
var (
- // zerotime is zero time.Time (unix epoch).
- zerotime = time.Time{}
+ // zerotime is zero
+ // time.Time (unix epoch).
+ zerotime time.Time
- // emptytiming is a global timingempty to check against.
- emptytiming = timingempty{}
+ // emptytiming is a global
+ // timingempty to check against.
+ emptytiming timingempty
)
// Timing provides scheduling for a Job, determining the next time
@@ -20,14 +22,16 @@ type Timing interface {
Next(time.Time) time.Time
}
-// timingempty is a 'zero' Timing implementation that always returns zero time.
+// timingempty is a 'zero' Timing implementation
+// that always returns zero time.
type timingempty struct{}
func (timingempty) Next(time.Time) time.Time {
return zerotime
}
-// Once implements Timing to provide a run-once Job execution.
+// Once implements Timing to
+// provide a run-once Job execution.
type Once time.Time
func (o *Once) Next(time.Time) time.Time {
@@ -36,14 +40,16 @@ func (o *Once) Next(time.Time) time.Time {
return ret
}
-// Periodic implements Timing to provide a recurring Job execution.
+// Periodic implements Timing to
+// provide a recurring Job execution.
type Periodic time.Duration
func (p Periodic) Next(now time.Time) time.Time {
return now.Add(time.Duration(p))
}
-// PeriodicAt implements Timing to provide a recurring Job execution starting at 'Once' time.
+// PeriodicAt implements Timing to provide a
+// recurring Job execution starting at 'Once' time.
type PeriodicAt struct {
Once Once
Period Periodic
@@ -56,7 +62,8 @@ func (p *PeriodicAt) Next(now time.Time) time.Time {
return p.Period.Next(now)
}
-// TimingWrap allows combining two different Timing implementations.
+// TimingWrap allows combining two
+// different Timing implementations.
type TimingWrap struct {
Outer Timing
Inner Timing