diff options
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/run.go | 40 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/job.go | 19 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 14 |
3 files changed, 48 insertions, 25 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/run.go b/vendor/codeberg.org/gruf/go-runners/run.go index 27f7fb9b8..67d19b40c 100644 --- a/vendor/codeberg.org/gruf/go-runners/run.go +++ b/vendor/codeberg.org/gruf/go-runners/run.go @@ -4,8 +4,9 @@ import ( "context" "errors" "fmt" - "sync" "time" + + "codeberg.org/gruf/go-atomics" ) // FuncRunner provides a means of managing long-running functions e.g. main logic loops. @@ -17,9 +18,8 @@ type FuncRunner struct { // provided function. This can be used both for logging, and for error filtering ErrorHandler func(err error) error - svc Service // underlying service to manage start/stop - err error // last-set error - mu sync.Mutex // protects err + svc Service // underlying service to manage start/stop + err atomics.Error } // Go will attempt to run 'fn' asynchronously. The provided context is used to propagate requested @@ -28,22 +28,20 @@ type FuncRunner struct { // time before considering the function as handed off. Returned bool is success state, i.e. returns true // if function is successfully handed off or returns within hand off time with nil error. func (r *FuncRunner) Go(fn func(ctx context.Context) error) bool { + var has bool + done := make(chan struct{}) go func() { var cancelled bool - has := r.svc.Run(func(ctx context.Context) { + has = r.svc.Run(func(ctx context.Context) { // reset error - r.mu.Lock() - r.err = nil - r.mu.Unlock() + r.err.Store(nil) // Run supplied func and set errror if returned if err := Run(func() error { return fn(ctx) }); err != nil { - r.mu.Lock() - r.err = err - r.mu.Unlock() + r.err.Store(err) } // signal done @@ -61,19 +59,18 @@ func (r *FuncRunner) Go(fn func(ctx context.Context) error) bool { switch has { // returned after starting case true: - r.mu.Lock() + // Load set error + err := r.err.Load() // filter out errors due FuncRunner.Stop() being called - if cancelled && errors.Is(r.err, context.Canceled) { + if cancelled && errors.Is(err, context.Canceled) { // filter out errors from FuncRunner.Stop() being called - r.err = nil - } else if r.err != nil && r.ErrorHandler != nil { + r.err.Store(nil) + } else if err != nil && r.ErrorHandler != nil { // pass any non-nil error to set handler - r.err = r.ErrorHandler(r.err) + r.err.Store(r.ErrorHandler(err)) } - r.mu.Unlock() - // already running case false: close(done) @@ -93,7 +90,7 @@ func (r *FuncRunner) Go(fn func(ctx context.Context) error) bool { // 'fn' returned, check error case <-done: - return (r.Err() == nil) + return has } } @@ -104,10 +101,7 @@ func (r *FuncRunner) Stop() bool { // Err returns the last-set error value. func (r *FuncRunner) Err() error { - r.mu.Lock() - err := r.err - r.mu.Unlock() - return err + return r.err.Load() } // Run will execute the supplied 'fn' catching any panics. Returns either function-returned error or formatted panic. diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go index 66e24fe9a..7831a39bd 100644 --- a/vendor/codeberg.org/gruf/go-sched/job.go +++ b/vendor/codeberg.org/gruf/go-sched/job.go @@ -1,6 +1,9 @@ package sched import ( + "reflect" + "strconv" + "strings" "time" "codeberg.org/gruf/go-atomics" @@ -97,3 +100,19 @@ func (job *Job) Run(now time.Time) { }() job.call(now) } + +// String provides a debuggable string representation of Job including ID, next time and Timing type. +func (job *Job) String() string { + var buf strings.Builder + buf.WriteByte('{') + buf.WriteString("id=") + buf.WriteString(strconv.FormatUint(job.id, 10)) + buf.WriteByte(' ') + buf.WriteString("next=") + buf.WriteString(job.next.Load().Format(time.StampMicro)) + buf.WriteByte(' ') + buf.WriteString("timing=") + buf.WriteString(reflect.TypeOf(job.timing).String()) + buf.WriteByte('}') + return buf.String() +} diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index d017ddcf6..8d076fea0 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -132,7 +132,13 @@ func (sch *Scheduler) run(ctx context.Context) { // Get next job time next := sch.jobs[0].Next() - if until := next.Sub(now); until <= 0 { + // If this job is _just_ about to be ready, we + // don't bother sleeping. It's wasted cycles only + // sleeping for some obscenely tiny amount of time + // we can't guarantee precision for. + const precision = time.Millisecond + + if until := next.Sub(now); until <= precision/1e3 { // This job is behind schedule, // set timer to always tick tch = alwaysticks @@ -155,6 +161,10 @@ func (sch *Scheduler) run(ctx context.Context) { // Timer ticked, run scheduled case now := <-tch: + if !timerset { + // alwaysticks returns zero times + now = time.Now() + } sch.schedule(now) // Received update, handle job/id @@ -213,7 +223,7 @@ func (sch *Scheduler) schedule(now time.Time) { // Run this job async! go job.Run(now) - if job.Next().IsZero() { + if next.IsZero() { // Zero time, this job is done and can be dropped sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) continue |