diff options
author | 2022-09-28 18:30:40 +0100 | |
---|---|---|
committer | 2022-09-28 18:30:40 +0100 | |
commit | a156188b3eb5cb3da44aa1b7452265f5fa38a607 (patch) | |
tree | 7097fa48d56fbabc7c2c8750b1f3bc9321d71c0f /vendor/codeberg.org | |
parent | [bugfix] Fix emphasis being added to emoji shortcodes with markdown parsing (... (diff) | |
download | gotosocial-a156188b3eb5cb3da44aa1b7452265f5fa38a607.tar.xz |
[chore] update dependencies, bump to Go 1.19.1 (#826)
* update dependencies, bump Go version to 1.19
* bump test image Go version
* update golangci-lint
* update gotosocial-drone-build
* sign
* linting, go fmt
* update swagger docs
* update swagger docs
* whitespace
* update contributing.md
* fuckin whoopsie doopsie
* linterino, linteroni
* fix followrequest test not starting processor
* fix other api/client tests not starting processor
* fix remaining tests where processor not started
* bump go-runners version
* don't check last-webfingered-at, processor may have updated this
* update swagger command
* update bun to latest version
* fix embed to work the same as before with new bun
Signed-off-by: kim <grufwub@gmail.com>
Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r-- | vendor/codeberg.org/gruf/go-cache/v2/scheduler.go | 7 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-kv/field.go | 2 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-kv/field_fmt.go | 33 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-kv/field_format.go | 2 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-kv/format/format.go | 4 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-kv/format/util.go | 13 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-kv/util.go | 12 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-logger/v2/level/levels.go | 26 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 197 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 29 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/job.go | 15 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 92 |
12 files changed, 282 insertions, 150 deletions
diff --git a/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go b/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go index bc1d8074a..ca93a0fcd 100644 --- a/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go +++ b/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go @@ -8,10 +8,13 @@ import ( // scheduler is the global cache runtime scheduler // for handling regular cache evictions. -var scheduler = sched.NewScheduler(5) +var scheduler sched.Scheduler // schedule will given sweep routine to the global scheduler, and start global scheduler. func schedule(sweep func(time.Time), freq time.Duration) func() { - go scheduler.Start() // does nothing if already running + if !scheduler.Running() { + // ensure running + _ = scheduler.Start() + } return scheduler.Schedule(sched.NewJob(sweep).Every(freq)) } diff --git a/vendor/codeberg.org/gruf/go-kv/field.go b/vendor/codeberg.org/gruf/go-kv/field.go index 7ff6f1ca3..86a53b173 100644 --- a/vendor/codeberg.org/gruf/go-kv/field.go +++ b/vendor/codeberg.org/gruf/go-kv/field.go @@ -75,7 +75,7 @@ type Field struct { // Key returns the formatted key string of this Field. func (f Field) Key() string { buf := byteutil.Buffer{B: make([]byte, 0, bufsize/2)} - appendQuoteKey(&buf, f.K) + AppendQuoteKey(&buf, f.K) return buf.String() } diff --git a/vendor/codeberg.org/gruf/go-kv/field_fmt.go b/vendor/codeberg.org/gruf/go-kv/field_fmt.go index c62393fe0..fcdcc5e17 100644 --- a/vendor/codeberg.org/gruf/go-kv/field_fmt.go +++ b/vendor/codeberg.org/gruf/go-kv/field_fmt.go @@ -5,10 +5,18 @@ package kv import ( "fmt" + "sync" "codeberg.org/gruf/go-byteutil" ) +// bufPool is a memory pool of byte buffers. +var bufPool = sync.Pool{ + New: func() interface{} { + return &byteutil.Buffer{B: make([]byte, 0, 512)} + }, +} + // AppendFormat will append formatted format of Field to 'buf'. See .String() for details. func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) { var fmtstr string @@ -17,9 +25,9 @@ func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) { } else /* regular */ { fmtstr = `%+v` } - appendQuoteKey(buf, f.K) + AppendQuoteKey(buf, f.K) buf.WriteByte('=') - appendQuoteValue(buf, fmt.Sprintf(fmtstr, f.V)) + appendValuef(buf, fmtstr, f.V) } // Value returns the formatted value string of this Field. @@ -31,6 +39,25 @@ func (f Field) Value(vbose bool) string { fmtstr = `%+v` } buf := byteutil.Buffer{B: make([]byte, 0, bufsize/2)} - appendQuoteValue(&buf, fmt.Sprintf(fmtstr, f.V)) + appendValuef(&buf, fmtstr, f.V) return buf.String() } + +// appendValuef appends a quoted value string (formatted by fmt.Appendf) to 'buf'. +func appendValuef(buf *byteutil.Buffer, format string, args ...interface{}) { + // Write format string to a byte buffer + fmtbuf := bufPool.Get().(*byteutil.Buffer) + fmtbuf.B = fmt.Appendf(fmtbuf.B, format, args...) + + // Append quoted value to dst buffer + AppendQuoteValue(buf, fmtbuf.String()) + + // Drop overly large capacity buffers + if fmtbuf.Cap() > int(^uint16(0)) { + return + } + + // Replace buffer in pool + fmtbuf.Reset() + bufPool.Put(fmtbuf) +} diff --git a/vendor/codeberg.org/gruf/go-kv/field_format.go b/vendor/codeberg.org/gruf/go-kv/field_format.go index db1ffc721..4fa7a8dcf 100644 --- a/vendor/codeberg.org/gruf/go-kv/field_format.go +++ b/vendor/codeberg.org/gruf/go-kv/field_format.go @@ -16,7 +16,7 @@ func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) { } else /* regular */ { fmtstr = "{:v}" } - appendQuoteKey(buf, f.K) + AppendQuoteKey(buf, f.K) buf.WriteByte('=') format.Appendf(buf, fmtstr, f.V) } diff --git a/vendor/codeberg.org/gruf/go-kv/format/format.go b/vendor/codeberg.org/gruf/go-kv/format/format.go index df5a94b7c..7edc4475f 100644 --- a/vendor/codeberg.org/gruf/go-kv/format/format.go +++ b/vendor/codeberg.org/gruf/go-kv/format/format.go @@ -311,7 +311,7 @@ func (f format) AppendString(s string) { } func (f format) AppendStringKey(s string) { - if !strconv.CanBackquote(s) { + if len(s) > SingleTermLine || !strconv.CanBackquote(s) { // Requires quoting AND escaping f.Buffer.B = strconv.AppendQuote(f.Buffer.B, s) } else if ContainsDoubleQuote(s) { @@ -329,7 +329,7 @@ func (f format) AppendStringKey(s string) { } func (f format) AppendStringQuoted(s string) { - if !strconv.CanBackquote(s) { + if len(s) > SingleTermLine || !strconv.CanBackquote(s) { // Requires quoting AND escaping f.Buffer.B = strconv.AppendQuote(f.Buffer.B, s) } else if ContainsDoubleQuote(s) { diff --git a/vendor/codeberg.org/gruf/go-kv/format/util.go b/vendor/codeberg.org/gruf/go-kv/format/util.go index c4c42329e..674f81be5 100644 --- a/vendor/codeberg.org/gruf/go-kv/format/util.go +++ b/vendor/codeberg.org/gruf/go-kv/format/util.go @@ -5,6 +5,19 @@ import ( "unsafe" ) +const ( + // SingleTermLine: beyond a certain length of string, all of the + // extra checks to handle quoting/not-quoting add a significant + // amount of extra processing time. Quoting in this manner only really + // effects readability on a single line, so a max string length that + // encompasses the maximum number of columns on *most* terminals was + // selected. This was chosen using the metric that 1080p is one of the + // most common display resolutions, and that a relatively small font size + // of 7 requires 223 columns. So 256 should be >= $COLUMNS (fullscreen) + // in 99% of usecases (these figures all pulled out of my ass). + SingleTermLine = 256 +) + // ContainsSpaceOrTab checks if "s" contains space or tabs. func ContainsSpaceOrTab(s string) bool { if i := strings.IndexByte(s, ' '); i != -1 { diff --git a/vendor/codeberg.org/gruf/go-kv/util.go b/vendor/codeberg.org/gruf/go-kv/util.go index a9526bf3d..3d249e5d4 100644 --- a/vendor/codeberg.org/gruf/go-kv/util.go +++ b/vendor/codeberg.org/gruf/go-kv/util.go @@ -7,10 +7,10 @@ import ( "codeberg.org/gruf/go-kv/format" ) -// appendQuoteKey will append and escape/quote a formatted key string. -func appendQuoteKey(buf *byteutil.Buffer, str string) { +// AppendQuoteKey will append and escape/quote a formatted key string. +func AppendQuoteKey(buf *byteutil.Buffer, str string) { switch { - case !strconv.CanBackquote(str): + case len(str) > format.SingleTermLine || !strconv.CanBackquote(str): // Append quoted and escaped string buf.B = strconv.AppendQuote(buf.B, str) case format.ContainsDoubleQuote(str): @@ -27,10 +27,10 @@ func appendQuoteKey(buf *byteutil.Buffer, str string) { } } -// appendQuoteValue will append and escape/quote a formatted value string. -func appendQuoteValue(buf *byteutil.Buffer, str string) { +// AppendQuoteValue will append and escape/quote a formatted value string. +func AppendQuoteValue(buf *byteutil.Buffer, str string) { switch { - case !strconv.CanBackquote(str): + case len(str) > format.SingleTermLine || !strconv.CanBackquote(str): // Append quoted and escaped string buf.B = strconv.AppendQuote(buf.B, str) return diff --git a/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go b/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go index 1804bdb23..3b1715a3c 100644 --- a/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go +++ b/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go @@ -1,5 +1,10 @@ package level +import ( + "fmt" + "strings" +) + // LEVEL defines a level of logging. type LEVEL uint8 @@ -34,12 +39,6 @@ func Default() Levels { ERROR: "ERROR", FATAL: "FATAL", PANIC: "PANIC", - - // we set these just so that - // it can be debugged when someone - // attempts to log with ALL/UNSET - ALL: "{all}", - UNSET: "{unset}", } } @@ -47,3 +46,18 @@ func Default() Levels { func (l Levels) Get(lvl LEVEL) string { return l[int(lvl)] } + +// Parse will attempt to decode a LEVEL from given string, checking (case insensitive) against strings in Levels. +func (l Levels) Parse(s string) (LEVEL, error) { + // Ensure consistent casing + s = strings.ToUpper(s) + + for lvl := LEVEL(0); int(lvl) < len(l); lvl++ { + // Compare to eqach known level + if strings.ToUpper(l[lvl]) == s { + return lvl, nil + } + } + + return 0, fmt.Errorf("unrecognized log level: %s", s) +} diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index ca8849f30..b6be57d0a 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -7,124 +7,105 @@ import ( ) // WorkerFunc represents a function processable by a worker in WorkerPool. Note -// that implementations absolutely MUST check whether passed context is Done() -// otherwise stopping the pool may block for large periods of time. +// that implementations absolutely MUST check whether passed context is <-ctx.Done() +// otherwise stopping the pool may block indefinitely. type WorkerFunc func(context.Context) // WorkerPool provides a means of enqueuing asynchronous work. type WorkerPool struct { - queue chan WorkerFunc - free chan struct{} - wait sync.WaitGroup - svc Service + fns chan WorkerFunc + svc Service } -// NewWorkerPool returns a new WorkerPool with provided worker count and WorkerFunc queue size. -// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the -// queue size represents the max number of WorkerFuncs that can be queued at any one time. -func NewWorkerPool(workers int, queue int) WorkerPool { +// Start will start the main WorkerPool management loop in a new goroutine, along +// with requested number of child worker goroutines. Returns false if already running. +func (pool *WorkerPool) Start(workers int, queue int) bool { + // Attempt to start the svc + ctx, ok := pool.svc.doStart() + if !ok { + return false + } + if workers < 1 { + // Use $GOMAXPROCS as default worker count workers = runtime.GOMAXPROCS(0) } - if queue < 1 { + + if queue < 0 { + // Set a reasonable queue default queue = workers * 2 } - return WorkerPool{ - queue: make(chan WorkerFunc, queue), - free: make(chan struct{}, workers), - } -} -// Start will attempt to start the worker pool, asynchronously. Return is success state. -func (pool *WorkerPool) Start() bool { - ok := true + // Allocate pool queue of given size + fns := make(chan WorkerFunc, queue) + pool.fns = fns - done := make(chan struct{}) go func() { - ok = pool.svc.Run(func(ctx context.Context) { - close(done) - pool.process(ctx) - }) - if !ok { - close(done) - } - }() - <-done - - return ok -} - -// Stop will attempt to stop the worker pool, this will block until stopped. Return is success state. -func (pool *WorkerPool) Stop() bool { - return pool.svc.Stop() -} + defer func() { + // unlock single wait + pool.svc.wait.Unlock() -// Running returns whether the worker pool is running. -func (pool *WorkerPool) Running() bool { - return pool.svc.Running() -} + // ensure stopped + pool.svc.Stop() + }() -// execute will take a queued function and pass it to a free worker when available. -func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) { - var acquired bool + var wait sync.WaitGroup - // Set as running - pool.wait.Add(1) + // Start goroutine worker functions + for i := 0; i < workers; i++ { + go func() { + // Trigger start / stop + wait.Add(1) + defer wait.Done() - select { - // Pool context cancelled - // (we fall through and let - // the function execute). - case <-ctx.Done(): + // Keep workers running on panic + for !workerstart(ctx, fns) { + } + }() + } - // Free worker acquired. - case pool.free <- struct{}{}: - acquired = true - } + // Set GC finalizer to stop pool on dealloc + runtime.SetFinalizer(pool, func(pool *WorkerPool) { + pool.svc.Stop() + }) - go func() { - defer func() { - // defer in case panic - if acquired { - <-pool.free - } - pool.wait.Done() - }() + // Wait on ctx + <-ctx.Done() - // Run queued - fn(ctx) + // Stop all workers + close(pool.fns) + wait.Wait() }() + + return true } -// process is the background processing routine that passes queued functions to workers. -func (pool *WorkerPool) process(ctx context.Context) { - for { - select { - // Pool context cancelled - case <-ctx.Done(): - for { - select { - // Pop and execute queued - case fn := <-pool.queue: - fn(ctx) // ctx is closed - - // Empty, wait for workers - default: - pool.wait.Wait() - return - } - } +// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed. +func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool { + // Recover and drop any panic + defer func() { recover() }() - // Queued func received - case fn := <-pool.queue: - pool.execute(ctx, fn) + for { + // Wait on next func + fn, ok := <-fns + if !ok { + return true } + + // Run with ctx + fn(ctx) } } +// Stop will stop the WorkerPool management loop, blocking until stopped. +func (pool *WorkerPool) Stop() bool { + return pool.svc.Stop() +} + // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. -// This will block until the function has been queued. 'fn' will ALWAYS be executed, even on pool -// close, which can be determined via context <-ctx.Done(). WorkerFuncs MUST respect the passed context. +// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be +// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. +// WorkerFuncs MUST respect the passed context. func (pool *WorkerPool) Enqueue(fn WorkerFunc) { // Check valid fn if fn == nil { @@ -132,29 +113,50 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) { } select { - // Pool context cancelled + // Pool ctx cancelled + case <-pool.svc.Done(): + fn(closedctx) + + // Placed fn in queue + case pool.fns <- fn: + } +} + +// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the +// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. +func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) { + // Check valid fn + if fn == nil { + return + } + + select { + // Caller ctx cancelled + case <-ctx.Done(): + + // Pool ctx cancelled case <-pool.svc.Done(): fn(closedctx) // Placed fn in queue - case pool.queue <- fn: + case pool.fns <- fn: } } -// EnqueueNoBlock attempts Enqueue but returns false if not executed. -func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool { +// EnqueueNow attempts Enqueue but returns false if not executed. +func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { // Check valid fn if fn == nil { return false } select { - // Pool context cancelled + // Pool ctx cancelled case <-pool.svc.Done(): return false // Placed fn in queue - case pool.queue <- fn: + case pool.fns <- fn: return true // Queue is full @@ -165,10 +167,5 @@ func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool { // Queue returns the number of currently queued WorkerFuncs. func (pool *WorkerPool) Queue() int { - return len(pool.queue) -} - -// Workers returns the number of currently active workers. -func (pool *WorkerPool) Workers() int { - return len(pool.free) + return len(pool.fns) } diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index c0f878c45..68e8ea384 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -15,7 +15,7 @@ type Service struct { mu sync.Mutex // mu protects state changes } -// Run will run the supplied function until completion, use given context to propagate cancel. +// Run will run the supplied function until completion, using given context to propagate cancel. // Immediately returns false if the Service is already running, and true after completed run. func (svc *Service) Run(fn func(context.Context)) bool { // Attempt to start the svc @@ -39,6 +39,33 @@ func (svc *Service) Run(fn func(context.Context)) bool { return true } +// GoRun will run the supplied function until completion in a goroutine, using given context to +// propagate cancel. Immediately returns boolean indicating success, or that service is already running. +func (svc *Service) GoRun(fn func(context.Context)) bool { + // Attempt to start the svc + ctx, ok := svc.doStart() + if !ok { + return false + } + + go func() { + defer func() { + // unlock single wait + svc.wait.Unlock() + + // ensure stopped + svc.Stop() + }() + + // Run user func + if fn != nil { + fn(ctx) + } + }() + + return true +} + // Stop will attempt to stop the service, cancelling the running function's context. Immediately // returns false if not running, and true only after Service is fully stopped. func (svc *Service) Stop() bool { diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go index 7831a39bd..e94c00024 100644 --- a/vendor/codeberg.org/gruf/go-sched/job.go +++ b/vendor/codeberg.org/gruf/go-sched/job.go @@ -61,6 +61,11 @@ func (job *Job) With(t Timing) *Job { panic("nil Timing") } + if job.id != 0 { + // Cannot update scheduled job + panic("job already scheduled") + } + if job.timing == emptytiming { // Set new timing job.timing = t @@ -76,12 +81,18 @@ func (job *Job) With(t Timing) *Job { return job } -// Panic specifics how this job handles panics, default is an actual panic. -func (job *Job) Panic(fn func(interface{})) *Job { +// OnPanic specifies how this job handles panics, default is an actual panic. +func (job *Job) OnPanic(fn func(interface{})) *Job { if fn == nil { // Ensure a function panic("nil func") } + + if job.id != 0 { + // Cannot update scheduled job + panic("job already scheduled") + } + job.panic = fn return job } diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index 8d076fea0..bdf0a371d 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -2,13 +2,18 @@ package sched import ( "context" + "runtime" "sort" + "sync" + "sync/atomic" "time" - "codeberg.org/gruf/go-atomics" "codeberg.org/gruf/go-runners" ) +// precision is the maximum time we can offer scheduler run-time precision down to. +const precision = time.Millisecond + var ( // neverticks is a timer channel that never ticks (it's starved). neverticks = make(chan time.Time) @@ -27,20 +32,41 @@ type Scheduler struct { jobs []*Job // jobs is a list of tracked Jobs to be executed jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs svc runners.Service // svc manages the main scheduler routine - jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs -} - -// New returns a new Scheduler instance with given job change queue size. -func NewScheduler(queue int) Scheduler { - if queue < 0 { - queue = 10 - } - return Scheduler{jch: make(chan interface{}, queue)} + jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs } // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. func (sch *Scheduler) Start() bool { - return sch.svc.Run(sch.run) + var block sync.Mutex + + // Use mutex to synchronize between started + // goroutine and ourselves, to ensure that + // we don't return before Scheduler init'd. + block.Lock() + defer block.Unlock() + + ok := sch.svc.GoRun(func(ctx context.Context) { + // Create Scheduler job channel + sch.jch = make(chan interface{}) + + // Unlock start routine + block.Unlock() + + // Set GC finalizer to ensure scheduler stopped + runtime.SetFinalizer(sch, func(sch *Scheduler) { + _ = sch.Stop() + }) + + // Enter main loop + sch.run(ctx) + }) + + if ok { + // Wait on goroutine + block.Lock() + } + + return ok } // Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. @@ -55,24 +81,41 @@ func (sch *Scheduler) Running() bool { // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - if job == nil { - // Ensure there's a job! + switch { + // Check a job was passed + case job == nil: panic("nil job") + + // Check we are running + case sch.jch == nil: + panic("scheduler not running") } - // Get last known job ID + // Calculate next job ID last := sch.jid.Load() - - // Give this job an ID and check overflow - if job.id = sch.jid.Add(1); job.id < last { - panic("scheduler job id overflow") + next := sch.jid.Add(1) + if next < last { + panic("job id overflow") } // Pass job to scheduler + job.id = next sch.jch <- job + // Take ptrs to current state chs + ctx := sch.svc.Done() + jch := sch.jch + // Return cancel function for job ID - return func() { sch.jch <- job.id } + return func() { + select { + // Sched stopped + case <-ctx: + + // Cancel this job + case jch <- next: + } + } } // run is the main scheduler run routine, which runs for as long as ctx is valid. @@ -136,11 +179,8 @@ func (sch *Scheduler) run(ctx context.Context) { // don't bother sleeping. It's wasted cycles only // sleeping for some obscenely tiny amount of time // we can't guarantee precision for. - const precision = time.Millisecond - if until := next.Sub(now); until <= precision/1e3 { - // This job is behind schedule, - // set timer to always tick + // This job is behind schedule, set to always tick. tch = alwaysticks } else { // Reset timer to period @@ -216,13 +256,13 @@ func (sch *Scheduler) schedule(now time.Time) { return } + // Pass job to runner + go job.Run(now) + // Update the next call time next := job.timing.Next(now) job.next.Store(next) - // Run this job async! - go job.Run(now) - if next.IsZero() { // Zero time, this job is done and can be dropped sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) |