diff options
| author | 2023-01-06 10:16:09 +0000 | |
|---|---|---|
| committer | 2023-01-06 11:16:09 +0100 | |
| commit | adbc87700a5bc7a95883ba5b9688d8b946a8db48 (patch) | |
| tree | 6030ff70d3eb0b9a0b8fc7d5fca378a77033d546 /vendor/codeberg.org/gruf/go-sched/scheduler.go | |
| parent | [chore] Update/add license headers for 2023 (#1304) (diff) | |
| download | gotosocial-adbc87700a5bc7a95883ba5b9688d8b946a8db48.tar.xz | |
[chore] pull in latest go-cache, go-runners versions (#1306)
Signed-off-by: kim <grufwub@gmail.com>
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf/go-sched/scheduler.go')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 35 | 
1 files changed, 14 insertions, 21 deletions
| diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index bdf0a371d..df19cf18b 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -33,10 +33,11 @@ type Scheduler struct {  	jch  chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs  	svc  runners.Service  // svc manages the main scheduler routine  	jid  atomic.Uint64    // jid is used to iteratively generate unique IDs for jobs +	rgo  func(func())     // goroutine runner, allows using goroutine pool to launch jobs  }  // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start() bool { +func (sch *Scheduler) Start(gorun func(func())) bool {  	var block sync.Mutex  	// Use mutex to synchronize between started @@ -49,14 +50,19 @@ func (sch *Scheduler) Start() bool {  		// Create Scheduler job channel  		sch.jch = make(chan interface{}) -		// Unlock start routine -		block.Unlock() +		// Set goroutine runner function +		if sch.rgo = gorun; sch.rgo == nil { +			sch.rgo = func(f func()) { go f() } +		}  		// Set GC finalizer to ensure scheduler stopped  		runtime.SetFinalizer(sch, func(sch *Scheduler) {  			_ = sch.Stop()  		}) +		// Unlock start routine +		block.Unlock() +  		// Enter main loop  		sch.run(ctx)  	}) @@ -87,7 +93,7 @@ func (sch *Scheduler) Schedule(job *Job) (cancel func()) {  		panic("nil job")  	// Check we are running -	case sch.jch == nil: +	case !sch.Running():  		panic("scheduler not running")  	} @@ -142,21 +148,6 @@ func (sch *Scheduler) run(ctx context.Context) {  		}  	) -	for { -		select { -		// Handle received job/id -		case v := <-sch.jch: -			sch.handle(v) -			continue - -		// No more -		default: -		} - -		// Done -		break -	} -  	// Create a stopped timer  	timer = time.NewTimer(1)  	<-timer.C @@ -256,8 +247,10 @@ func (sch *Scheduler) schedule(now time.Time) {  			return  		} -		// Pass job to runner -		go job.Run(now) +		// Pass to runner +		sch.rgo(func() { +			job.Run(now) +		})  		// Update the next call time  		next := job.timing.Next(now) | 
