summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-sched/scheduler.go
diff options
context:
space:
mode:
authorLibravatar kim <grufwub@gmail.com>2025-10-17 17:36:24 +0200
committerLibravatar tobi <tobi.smethurst@protonmail.com>2025-11-17 14:11:11 +0100
commitf714b06fec5b93cf076d0f92eeb8aa7c32cfb531 (patch)
tree8e1a89dd7b0db0f17b695557d03eede9055134ae /vendor/codeberg.org/gruf/go-sched/scheduler.go
parent[bugfix] recheck for just-processed-emoji within mutex lock before starting p... (diff)
downloadgotosocial-f714b06fec5b93cf076d0f92eeb8aa7c32cfb531.tar.xz
[chore] update dependencies (#4507)
- codeberg.org/gruf/go-runners: v1.6.3 -> v1.7.0 - codeberg.org/gruf/go-sched: v1.2.4 -> v1.3.0 - github.com/tdewolff/minify/v2: v2.24.3 -> v2.24.4 Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4507 Co-authored-by: kim <grufwub@gmail.com> Co-committed-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go225
1 files changed, 139 insertions, 86 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
index 79913a9b3..9ab5b8bc8 100644
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go
@@ -6,18 +6,22 @@ import (
"sync"
"sync/atomic"
"time"
+ "unsafe"
"codeberg.org/gruf/go-runners"
)
-// precision is the maximum time we can offer scheduler run-time precision down to.
-const precision = time.Millisecond
+// Precision is the maximum time we can
+// offer scheduler run-time precision down to.
+const Precision = 2 * time.Millisecond
var (
- // neverticks is a timer channel that never ticks (it's starved).
+ // neverticks is a timer channel
+ // that never ticks (it's starved).
neverticks = make(chan time.Time)
- // alwaysticks is a timer channel that always ticks (it's closed).
+ // alwaysticks is a timer channel
+ // that always ticks (it's closed).
alwaysticks = func() chan time.Time {
ch := make(chan time.Time)
close(ch)
@@ -28,48 +32,51 @@ var (
// Scheduler provides a means of running jobs at specific times and
// regular intervals, all while sharing a single underlying timer.
type Scheduler struct {
- jobs []*Job // jobs is a list of tracked Jobs to be executed
- jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
- svc runners.Service // svc manages the main scheduler routine
- jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
- rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs
+ svc runners.Service // svc manages the main scheduler routine
+ jobs []*Job // jobs is a list of tracked Jobs to be executed
+ jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs
+ jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
}
-// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
-func (sch *Scheduler) Start(gorun func(func())) bool {
- var block sync.Mutex
+// Start will attempt to start the Scheduler. Immediately returns false
+// if the Service is already running, and true after completed run.
+func (sch *Scheduler) Start() bool {
+ var wait sync.WaitGroup
- // Use mutex to synchronize between started
+ // Use waiter to synchronize between started
// goroutine and ourselves, to ensure that
// we don't return before Scheduler init'd.
- block.Lock()
- defer block.Unlock()
+ wait.Add(1)
ok := sch.svc.GoRun(func(ctx context.Context) {
- // Create Scheduler job channel
- sch.jch = make(chan interface{})
-
- // Set goroutine runner function
- if sch.rgo = gorun; sch.rgo == nil {
- sch.rgo = func(f func()) { go f() }
- }
-
- // Unlock start routine
- block.Unlock()
-
- // Enter main loop
- sch.run(ctx)
+ // Prepare new channel.
+ ch := new(channel)
+ ch.ctx = ctx.Done()
+ ch.ch = make(chan interface{})
+ sch.jch.Store(ch)
+
+ // Release
+ // start fn
+ wait.Done()
+
+ // Main loop
+ sch.run(ch)
})
if ok {
- // Wait on goroutine
- block.Lock()
+ // Wait on
+ // goroutine
+ wait.Wait()
+ } else {
+ // Release
+ wait.Done()
}
return ok
}
-// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
+// Stop will attempt to stop the Scheduler. Immediately returns false
+// if not running, and true only after Scheduler is fully stopped.
func (sch *Scheduler) Stop() bool {
return sch.svc.Stop()
}
@@ -86,45 +93,38 @@ func (sch *Scheduler) Done() <-chan struct{} {
// Schedule will add provided Job to the Scheduler, returning a cancel function.
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
- switch {
- // Check a job was passed
- case job == nil:
+ if job == nil {
panic("nil job")
-
- // Check we are running
- case !sch.Running():
- panic("scheduler not running")
}
- // Calculate next job ID
- last := sch.jid.Load()
- next := sch.jid.Add(1)
- if next < last {
- panic("job id overflow")
+ // Load job channel.
+ ch := sch.jch.Load()
+ if ch == nil {
+ panic("not running")
}
- // Pass job to scheduler
- job.id = next
- sch.jch <- job
+ // Calculate next job ID.
+ job.id = sch.jid.Add(1)
- // Take ptrs to current state chs
- ctx := sch.svc.Done()
- jch := sch.jch
-
- // Return cancel function for job ID
- return func() {
- select {
- // Sched stopped
- case <-ctx:
-
- // Cancel this job
- case jch <- next:
- }
+ // Pass job
+ // to channel.
+ if !ch.w(job) {
+ panic("not running")
}
+
+ // Return cancel function for job
+ return func() { ch.w(job.id) }
}
// run is the main scheduler run routine, which runs for as long as ctx is valid.
-func (sch *Scheduler) run(ctx context.Context) {
+func (sch *Scheduler) run(ch *channel) {
+ defer ch.close()
+ if ch == nil {
+ panic("nil channel")
+ } else if sch == nil {
+ panic("nil scheduler")
+ }
+
var (
// now stores the current time, and will only be
// set when the timer channel is set to be the
@@ -165,39 +165,43 @@ func (sch *Scheduler) run(ctx context.Context) {
// Get now time.
now = time.Now()
- // Sort jobs by next occurring.
+ // Sort by next occurring.
sort.Sort(byNext(sch.jobs))
// Get next job time.
next := sch.jobs[0].Next()
- // If this job is _just_ about to be ready, we don't bother
+ // If this job is *just* about to be ready, we don't bother
// sleeping. It's wasted cycles only sleeping for some obscenely
// tiny amount of time we can't guarantee precision for.
- if until := next.Sub(now); until <= precision/1e3 {
+ if until := next.Sub(now); until <= Precision/1e3 {
+
// This job is behind,
// set to always tick.
tch = alwaysticks
} else {
+
// Reset timer to period.
timer.Reset(until)
- tch = timer.C
timerset = true
+ tch = timer.C
}
} else {
+
// Unset timer
tch = neverticks
}
select {
// Scheduler stopped
- case <-ctx.Done():
+ case <-ch.done():
stopdrain()
return
- // Timer ticked, run scheduled
- case t := <-tch:
- if !timerset {
+ // Timer ticked,
+ // run scheduled.
+ case t, ok := <-tch:
+ if !ok {
// 'alwaysticks' returns zero
// times, BUT 'now' will have
// been set during above sort.
@@ -205,8 +209,9 @@ func (sch *Scheduler) run(ctx context.Context) {
}
sch.schedule(t)
- // Received update, handle job/id
- case v := <-sch.jch:
+ // Received update,
+ // handle job/id.
+ case v := <-ch.r():
sch.handle(v)
stopdrain()
}
@@ -220,21 +225,21 @@ func (sch *Scheduler) handle(v interface{}) {
switch v := v.(type) {
// New job added
case *Job:
- // Get current time
+ // Get current time.
now := time.Now()
- // Update the next call time
+ // Update next call time.
next := v.timing.Next(now)
- storeTime(&v.next, next)
+ v.next.Store(next)
- // Append this job to queued
+ // Append this job to queued/
sch.jobs = append(sch.jobs, v)
// Job removed
case uint64:
for i := 0; i < len(sch.jobs); i++ {
if sch.jobs[i].id == v {
- // This is the job we're looking for! Drop this
+ // This is the job we're looking for! Drop this.
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
return
}
@@ -242,29 +247,28 @@ func (sch *Scheduler) handle(v interface{}) {
}
}
-// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
+// schedule will iterate through the scheduler jobs and
+// execute those necessary, updating their next call time.
func (sch *Scheduler) schedule(now time.Time) {
for i := 0; i < len(sch.jobs); {
- // Scope our own var
+ // Scope our own var.
job := sch.jobs[i]
// We know these jobs are ordered by .Next(), so as soon
- // as we reach one with .Next() after now, we can return
+ // as we reach one with .Next() after now, we can return.
if job.Next().After(now) {
return
}
- // Pass to runner
- sch.rgo(func() {
- job.Run(now)
- })
+ // Run the job.
+ go job.Run(now)
- // Update the next call time
+ // Update the next call time.
next := job.timing.Next(now)
- storeTime(&job.next, next)
+ job.next.Store(next)
if next.IsZero() {
- // Zero time, this job is done and can be dropped
+ // Zero time, this job is done and can be dropped.
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
continue
}
@@ -274,7 +278,8 @@ func (sch *Scheduler) schedule(now time.Time) {
}
}
-// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
+// byNext is an implementation of sort.Interface
+// to sort Jobs by their .Next() time.
type byNext []*Job
func (by byNext) Len() int {
@@ -288,3 +293,51 @@ func (by byNext) Less(i int, j int) bool {
func (by byNext) Swap(i int, j int) {
by[i], by[j] = by[j], by[i]
}
+
+// atomic_channel wraps a *channel{} with atomic store / load.
+type atomic_channel struct{ p unsafe.Pointer }
+
+func (c *atomic_channel) Load() *channel {
+ if p := atomic.LoadPointer(&c.p); p != nil {
+ return (*channel)(p)
+ }
+ return nil
+}
+
+func (c *atomic_channel) Store(v *channel) {
+ atomic.StorePointer(&c.p, unsafe.Pointer(v))
+}
+
+// channel wraps both a context done
+// channel and a generic interface channel
+// to support safe writing to an underlying
+// channel that correctly fails after close.
+type channel struct {
+ ctx <-chan struct{}
+ ch chan interface{}
+}
+
+// done returns internal context channel.
+func (ch *channel) done() <-chan struct{} {
+ return ch.ctx
+}
+
+// r returns internal channel for read.
+func (ch *channel) r() chan interface{} {
+ return ch.ch
+}
+
+// w writes 'v' to channel, or returns false if closed.
+func (ch *channel) w(v interface{}) bool {
+ select {
+ case <-ch.ctx:
+ return false
+ case ch.ch <- v:
+ return true
+ }
+}
+
+// close closes underlying channel.
+func (ch *channel) close() {
+ close(ch.ch)
+}