summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-sched/scheduler.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2023-01-06 10:16:09 +0000
committerLibravatar GitHub <noreply@github.com>2023-01-06 11:16:09 +0100
commitadbc87700a5bc7a95883ba5b9688d8b946a8db48 (patch)
tree6030ff70d3eb0b9a0b8fc7d5fca378a77033d546 /vendor/codeberg.org/gruf/go-sched/scheduler.go
parent[chore] Update/add license headers for 2023 (#1304) (diff)
downloadgotosocial-adbc87700a5bc7a95883ba5b9688d8b946a8db48.tar.xz
[chore] pull in latest go-cache, go-runners versions (#1306)
Signed-off-by: kim <grufwub@gmail.com> Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go35
1 files changed, 14 insertions, 21 deletions
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
index bdf0a371d..df19cf18b 100644
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go
@@ -33,10 +33,11 @@ type Scheduler struct {
jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
svc runners.Service // svc manages the main scheduler routine
jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
+ rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs
}
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
-func (sch *Scheduler) Start() bool {
+func (sch *Scheduler) Start(gorun func(func())) bool {
var block sync.Mutex
// Use mutex to synchronize between started
@@ -49,14 +50,19 @@ func (sch *Scheduler) Start() bool {
// Create Scheduler job channel
sch.jch = make(chan interface{})
- // Unlock start routine
- block.Unlock()
+ // Set goroutine runner function
+ if sch.rgo = gorun; sch.rgo == nil {
+ sch.rgo = func(f func()) { go f() }
+ }
// Set GC finalizer to ensure scheduler stopped
runtime.SetFinalizer(sch, func(sch *Scheduler) {
_ = sch.Stop()
})
+ // Unlock start routine
+ block.Unlock()
+
// Enter main loop
sch.run(ctx)
})
@@ -87,7 +93,7 @@ func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
panic("nil job")
// Check we are running
- case sch.jch == nil:
+ case !sch.Running():
panic("scheduler not running")
}
@@ -142,21 +148,6 @@ func (sch *Scheduler) run(ctx context.Context) {
}
)
- for {
- select {
- // Handle received job/id
- case v := <-sch.jch:
- sch.handle(v)
- continue
-
- // No more
- default:
- }
-
- // Done
- break
- }
-
// Create a stopped timer
timer = time.NewTimer(1)
<-timer.C
@@ -256,8 +247,10 @@ func (sch *Scheduler) schedule(now time.Time) {
return
}
- // Pass job to runner
- go job.Run(now)
+ // Pass to runner
+ sch.rgo(func() {
+ job.Run(now)
+ })
// Update the next call time
next := job.timing.Next(now)