diff options
author | 2025-03-09 17:47:56 +0100 | |
---|---|---|
committer | 2025-03-10 01:59:49 +0100 | |
commit | 3ac1ee16f377d31a0fb80c8dae28b6239ac4229e (patch) | |
tree | f61faa581feaaeaba2542b9f2b8234a590684413 /vendor/codeberg.org/gruf/go-sched/scheduler.go | |
parent | [chore] update URLs to forked source (diff) | |
download | gotosocial-3ac1ee16f377d31a0fb80c8dae28b6239ac4229e.tar.xz |
[chore] remove vendor
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 290 |
1 files changed, 0 insertions, 290 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go deleted file mode 100644 index 79913a9b3..000000000 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ /dev/null @@ -1,290 +0,0 @@ -package sched - -import ( - "context" - "sort" - "sync" - "sync/atomic" - "time" - - "codeberg.org/gruf/go-runners" -) - -// precision is the maximum time we can offer scheduler run-time precision down to. -const precision = time.Millisecond - -var ( - // neverticks is a timer channel that never ticks (it's starved). - neverticks = make(chan time.Time) - - // alwaysticks is a timer channel that always ticks (it's closed). - alwaysticks = func() chan time.Time { - ch := make(chan time.Time) - close(ch) - return ch - }() -) - -// Scheduler provides a means of running jobs at specific times and -// regular intervals, all while sharing a single underlying timer. -type Scheduler struct { - jobs []*Job // jobs is a list of tracked Jobs to be executed - jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs - svc runners.Service // svc manages the main scheduler routine - jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs - rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs -} - -// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start(gorun func(func())) bool { - var block sync.Mutex - - // Use mutex to synchronize between started - // goroutine and ourselves, to ensure that - // we don't return before Scheduler init'd. - block.Lock() - defer block.Unlock() - - ok := sch.svc.GoRun(func(ctx context.Context) { - // Create Scheduler job channel - sch.jch = make(chan interface{}) - - // Set goroutine runner function - if sch.rgo = gorun; sch.rgo == nil { - sch.rgo = func(f func()) { go f() } - } - - // Unlock start routine - block.Unlock() - - // Enter main loop - sch.run(ctx) - }) - - if ok { - // Wait on goroutine - block.Lock() - } - - return ok -} - -// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. -func (sch *Scheduler) Stop() bool { - return sch.svc.Stop() -} - -// Running will return whether Scheduler is running (i.e. NOT stopped / stopping). -func (sch *Scheduler) Running() bool { - return sch.svc.Running() -} - -// Done returns a channel that's closed when Scheduler.Stop() is called. -func (sch *Scheduler) Done() <-chan struct{} { - return sch.svc.Done() -} - -// Schedule will add provided Job to the Scheduler, returning a cancel function. -func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - switch { - // Check a job was passed - case job == nil: - panic("nil job") - - // Check we are running - case !sch.Running(): - panic("scheduler not running") - } - - // Calculate next job ID - last := sch.jid.Load() - next := sch.jid.Add(1) - if next < last { - panic("job id overflow") - } - - // Pass job to scheduler - job.id = next - sch.jch <- job - - // Take ptrs to current state chs - ctx := sch.svc.Done() - jch := sch.jch - - // Return cancel function for job ID - return func() { - select { - // Sched stopped - case <-ctx: - - // Cancel this job - case jch <- next: - } - } -} - -// run is the main scheduler run routine, which runs for as long as ctx is valid. -func (sch *Scheduler) run(ctx context.Context) { - var ( - // now stores the current time, and will only be - // set when the timer channel is set to be the - // 'alwaysticks' channel. this allows minimizing - // the number of calls required to time.Now(). - now time.Time - - // timerset represents whether timer was running - // for a particular run of the loop. false means - // that tch == neverticks || tch == alwaysticks. - timerset bool - - // timer tick channel (or always / never ticks). - tch <-chan time.Time - - // timer notifies this main routine to wake when - // the job queued needs to be checked for executions. - timer *time.Timer - - // stopdrain will stop and drain the timer - // if it has been running (i.e. timerset == true). - stopdrain = func() { - if timerset && !timer.Stop() { - <-timer.C - } - } - ) - - // Create a stopped timer. - timer = time.NewTimer(1) - <-timer.C - - for { - // Reset timer state. - timerset = false - - if len(sch.jobs) > 0 { - // Get now time. - now = time.Now() - - // Sort jobs by next occurring. - sort.Sort(byNext(sch.jobs)) - - // Get next job time. - next := sch.jobs[0].Next() - - // If this job is _just_ about to be ready, we don't bother - // sleeping. It's wasted cycles only sleeping for some obscenely - // tiny amount of time we can't guarantee precision for. - if until := next.Sub(now); until <= precision/1e3 { - // This job is behind, - // set to always tick. - tch = alwaysticks - } else { - // Reset timer to period. - timer.Reset(until) - tch = timer.C - timerset = true - } - } else { - // Unset timer - tch = neverticks - } - - select { - // Scheduler stopped - case <-ctx.Done(): - stopdrain() - return - - // Timer ticked, run scheduled - case t := <-tch: - if !timerset { - // 'alwaysticks' returns zero - // times, BUT 'now' will have - // been set during above sort. - t = now - } - sch.schedule(t) - - // Received update, handle job/id - case v := <-sch.jch: - sch.handle(v) - stopdrain() - } - } -} - -// handle takes an interfaces received from Scheduler.jch and handles either: -// - Job --> new job to add. -// - uint64 --> job ID to remove. -func (sch *Scheduler) handle(v interface{}) { - switch v := v.(type) { - // New job added - case *Job: - // Get current time - now := time.Now() - - // Update the next call time - next := v.timing.Next(now) - storeTime(&v.next, next) - - // Append this job to queued - sch.jobs = append(sch.jobs, v) - - // Job removed - case uint64: - for i := 0; i < len(sch.jobs); i++ { - if sch.jobs[i].id == v { - // This is the job we're looking for! Drop this - sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) - return - } - } - } -} - -// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. -func (sch *Scheduler) schedule(now time.Time) { - for i := 0; i < len(sch.jobs); { - // Scope our own var - job := sch.jobs[i] - - // We know these jobs are ordered by .Next(), so as soon - // as we reach one with .Next() after now, we can return - if job.Next().After(now) { - return - } - - // Pass to runner - sch.rgo(func() { - job.Run(now) - }) - - // Update the next call time - next := job.timing.Next(now) - storeTime(&job.next, next) - - if next.IsZero() { - // Zero time, this job is done and can be dropped - sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) - continue - } - - // Iter - i++ - } -} - -// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. -type byNext []*Job - -func (by byNext) Len() int { - return len(by) -} - -func (by byNext) Less(i int, j int) bool { - return by[i].Next().Before(by[j].Next()) -} - -func (by byNext) Swap(i int, j int) { - by[i], by[j] = by[j], by[i] -} |