diff options
author | 2023-01-06 10:16:09 +0000 | |
---|---|---|
committer | 2023-01-06 11:16:09 +0100 | |
commit | adbc87700a5bc7a95883ba5b9688d8b946a8db48 (patch) | |
tree | 6030ff70d3eb0b9a0b8fc7d5fca378a77033d546 /vendor/codeberg.org/gruf/go-sched/scheduler.go | |
parent | [chore] Update/add license headers for 2023 (#1304) (diff) | |
download | gotosocial-adbc87700a5bc7a95883ba5b9688d8b946a8db48.tar.xz |
[chore] pull in latest go-cache, go-runners versions (#1306)
Signed-off-by: kim <grufwub@gmail.com>
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 35 |
1 files changed, 14 insertions, 21 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index bdf0a371d..df19cf18b 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -33,10 +33,11 @@ type Scheduler struct { jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs svc runners.Service // svc manages the main scheduler routine jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs + rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs } // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start() bool { +func (sch *Scheduler) Start(gorun func(func())) bool { var block sync.Mutex // Use mutex to synchronize between started @@ -49,14 +50,19 @@ func (sch *Scheduler) Start() bool { // Create Scheduler job channel sch.jch = make(chan interface{}) - // Unlock start routine - block.Unlock() + // Set goroutine runner function + if sch.rgo = gorun; sch.rgo == nil { + sch.rgo = func(f func()) { go f() } + } // Set GC finalizer to ensure scheduler stopped runtime.SetFinalizer(sch, func(sch *Scheduler) { _ = sch.Stop() }) + // Unlock start routine + block.Unlock() + // Enter main loop sch.run(ctx) }) @@ -87,7 +93,7 @@ func (sch *Scheduler) Schedule(job *Job) (cancel func()) { panic("nil job") // Check we are running - case sch.jch == nil: + case !sch.Running(): panic("scheduler not running") } @@ -142,21 +148,6 @@ func (sch *Scheduler) run(ctx context.Context) { } ) - for { - select { - // Handle received job/id - case v := <-sch.jch: - sch.handle(v) - continue - - // No more - default: - } - - // Done - break - } - // Create a stopped timer timer = time.NewTimer(1) <-timer.C @@ -256,8 +247,10 @@ func (sch *Scheduler) schedule(now time.Time) { return } - // Pass job to runner - go job.Run(now) + // Pass to runner + sch.rgo(func() { + job.Run(now) + }) // Update the next call time next := job.timing.Next(now) |