diff options
| author | 2022-09-28 18:30:40 +0100 | |
|---|---|---|
| committer | 2022-09-28 18:30:40 +0100 | |
| commit | a156188b3eb5cb3da44aa1b7452265f5fa38a607 (patch) | |
| tree | 7097fa48d56fbabc7c2c8750b1f3bc9321d71c0f /vendor/codeberg.org/gruf | |
| parent | [bugfix] Fix emphasis being added to emoji shortcodes with markdown parsing (... (diff) | |
| download | gotosocial-a156188b3eb5cb3da44aa1b7452265f5fa38a607.tar.xz | |
[chore] update dependencies, bump to Go 1.19.1 (#826)
* update dependencies, bump Go version to 1.19
* bump test image Go version
* update golangci-lint
* update gotosocial-drone-build
* sign
* linting, go fmt
* update swagger docs
* update swagger docs
* whitespace
* update contributing.md
* fuckin whoopsie doopsie
* linterino, linteroni
* fix followrequest test not starting processor
* fix other api/client tests not starting processor
* fix remaining tests where processor not started
* bump go-runners version
* don't check last-webfingered-at, processor may have updated this
* update swagger command
* update bun to latest version
* fix embed to work the same as before with new bun
Signed-off-by: kim <grufwub@gmail.com>
Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-cache/v2/scheduler.go | 7 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-kv/field.go | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-kv/field_fmt.go | 33 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-kv/field_format.go | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-kv/format/format.go | 4 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-kv/format/util.go | 13 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-kv/util.go | 12 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-logger/v2/level/levels.go | 26 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 197 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 29 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/job.go | 15 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 92 | 
12 files changed, 282 insertions, 150 deletions
diff --git a/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go b/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go index bc1d8074a..ca93a0fcd 100644 --- a/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go +++ b/vendor/codeberg.org/gruf/go-cache/v2/scheduler.go @@ -8,10 +8,13 @@ import (  // scheduler is the global cache runtime scheduler  // for handling regular cache evictions. -var scheduler = sched.NewScheduler(5) +var scheduler sched.Scheduler  // schedule will given sweep  routine to the global scheduler, and start global scheduler.  func schedule(sweep func(time.Time), freq time.Duration) func() { -	go scheduler.Start() // does nothing if already running +	if !scheduler.Running() { +		// ensure running +		_ = scheduler.Start() +	}  	return scheduler.Schedule(sched.NewJob(sweep).Every(freq))  } diff --git a/vendor/codeberg.org/gruf/go-kv/field.go b/vendor/codeberg.org/gruf/go-kv/field.go index 7ff6f1ca3..86a53b173 100644 --- a/vendor/codeberg.org/gruf/go-kv/field.go +++ b/vendor/codeberg.org/gruf/go-kv/field.go @@ -75,7 +75,7 @@ type Field struct {  // Key returns the formatted key string of this Field.  func (f Field) Key() string {  	buf := byteutil.Buffer{B: make([]byte, 0, bufsize/2)} -	appendQuoteKey(&buf, f.K) +	AppendQuoteKey(&buf, f.K)  	return buf.String()  } diff --git a/vendor/codeberg.org/gruf/go-kv/field_fmt.go b/vendor/codeberg.org/gruf/go-kv/field_fmt.go index c62393fe0..fcdcc5e17 100644 --- a/vendor/codeberg.org/gruf/go-kv/field_fmt.go +++ b/vendor/codeberg.org/gruf/go-kv/field_fmt.go @@ -5,10 +5,18 @@ package kv  import (  	"fmt" +	"sync"  	"codeberg.org/gruf/go-byteutil"  ) +// bufPool is a memory pool of byte buffers. +var bufPool = sync.Pool{ +	New: func() interface{} { +		return &byteutil.Buffer{B: make([]byte, 0, 512)} +	}, +} +  // AppendFormat will append formatted format of Field to 'buf'. See .String() for details.  func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) {  	var fmtstr string @@ -17,9 +25,9 @@ func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) {  	} else /* regular */ {  		fmtstr = `%+v`  	} -	appendQuoteKey(buf, f.K) +	AppendQuoteKey(buf, f.K)  	buf.WriteByte('=') -	appendQuoteValue(buf, fmt.Sprintf(fmtstr, f.V)) +	appendValuef(buf, fmtstr, f.V)  }  // Value returns the formatted value string of this Field. @@ -31,6 +39,25 @@ func (f Field) Value(vbose bool) string {  		fmtstr = `%+v`  	}  	buf := byteutil.Buffer{B: make([]byte, 0, bufsize/2)} -	appendQuoteValue(&buf, fmt.Sprintf(fmtstr, f.V)) +	appendValuef(&buf, fmtstr, f.V)  	return buf.String()  } + +// appendValuef appends a quoted value string (formatted by fmt.Appendf) to 'buf'. +func appendValuef(buf *byteutil.Buffer, format string, args ...interface{}) { +	// Write format string to a byte buffer +	fmtbuf := bufPool.Get().(*byteutil.Buffer) +	fmtbuf.B = fmt.Appendf(fmtbuf.B, format, args...) + +	// Append quoted value to dst buffer +	AppendQuoteValue(buf, fmtbuf.String()) + +	// Drop overly large capacity buffers +	if fmtbuf.Cap() > int(^uint16(0)) { +		return +	} + +	// Replace buffer in pool +	fmtbuf.Reset() +	bufPool.Put(fmtbuf) +} diff --git a/vendor/codeberg.org/gruf/go-kv/field_format.go b/vendor/codeberg.org/gruf/go-kv/field_format.go index db1ffc721..4fa7a8dcf 100644 --- a/vendor/codeberg.org/gruf/go-kv/field_format.go +++ b/vendor/codeberg.org/gruf/go-kv/field_format.go @@ -16,7 +16,7 @@ func (f Field) AppendFormat(buf *byteutil.Buffer, vbose bool) {  	} else /* regular */ {  		fmtstr = "{:v}"  	} -	appendQuoteKey(buf, f.K) +	AppendQuoteKey(buf, f.K)  	buf.WriteByte('=')  	format.Appendf(buf, fmtstr, f.V)  } diff --git a/vendor/codeberg.org/gruf/go-kv/format/format.go b/vendor/codeberg.org/gruf/go-kv/format/format.go index df5a94b7c..7edc4475f 100644 --- a/vendor/codeberg.org/gruf/go-kv/format/format.go +++ b/vendor/codeberg.org/gruf/go-kv/format/format.go @@ -311,7 +311,7 @@ func (f format) AppendString(s string) {  }  func (f format) AppendStringKey(s string) { -	if !strconv.CanBackquote(s) { +	if len(s) > SingleTermLine || !strconv.CanBackquote(s) {  		// Requires quoting AND escaping  		f.Buffer.B = strconv.AppendQuote(f.Buffer.B, s)  	} else if ContainsDoubleQuote(s) { @@ -329,7 +329,7 @@ func (f format) AppendStringKey(s string) {  }  func (f format) AppendStringQuoted(s string) { -	if !strconv.CanBackquote(s) { +	if len(s) > SingleTermLine || !strconv.CanBackquote(s) {  		// Requires quoting AND escaping  		f.Buffer.B = strconv.AppendQuote(f.Buffer.B, s)  	} else if ContainsDoubleQuote(s) { diff --git a/vendor/codeberg.org/gruf/go-kv/format/util.go b/vendor/codeberg.org/gruf/go-kv/format/util.go index c4c42329e..674f81be5 100644 --- a/vendor/codeberg.org/gruf/go-kv/format/util.go +++ b/vendor/codeberg.org/gruf/go-kv/format/util.go @@ -5,6 +5,19 @@ import (  	"unsafe"  ) +const ( +	// SingleTermLine: beyond a certain length of string, all of the +	// extra checks to handle quoting/not-quoting add a significant +	// amount of extra processing time. Quoting in this manner only really +	// effects readability on a single line, so a max string length that +	// encompasses the maximum number of columns on *most* terminals was +	// selected. This was chosen using the metric that 1080p is one of the +	// most common display resolutions, and that a relatively small font size +	// of 7 requires 223 columns. So 256 should be >= $COLUMNS (fullscreen) +	// in 99% of usecases (these figures all pulled out of my ass). +	SingleTermLine = 256 +) +  // ContainsSpaceOrTab checks if "s" contains space or tabs.  func ContainsSpaceOrTab(s string) bool {  	if i := strings.IndexByte(s, ' '); i != -1 { diff --git a/vendor/codeberg.org/gruf/go-kv/util.go b/vendor/codeberg.org/gruf/go-kv/util.go index a9526bf3d..3d249e5d4 100644 --- a/vendor/codeberg.org/gruf/go-kv/util.go +++ b/vendor/codeberg.org/gruf/go-kv/util.go @@ -7,10 +7,10 @@ import (  	"codeberg.org/gruf/go-kv/format"  ) -// appendQuoteKey will append and escape/quote a formatted key string. -func appendQuoteKey(buf *byteutil.Buffer, str string) { +// AppendQuoteKey will append and escape/quote a formatted key string. +func AppendQuoteKey(buf *byteutil.Buffer, str string) {  	switch { -	case !strconv.CanBackquote(str): +	case len(str) > format.SingleTermLine || !strconv.CanBackquote(str):  		// Append quoted and escaped string  		buf.B = strconv.AppendQuote(buf.B, str)  	case format.ContainsDoubleQuote(str): @@ -27,10 +27,10 @@ func appendQuoteKey(buf *byteutil.Buffer, str string) {  	}  } -// appendQuoteValue will append and escape/quote a formatted value string. -func appendQuoteValue(buf *byteutil.Buffer, str string) { +// AppendQuoteValue will append and escape/quote a formatted value string. +func AppendQuoteValue(buf *byteutil.Buffer, str string) {  	switch { -	case !strconv.CanBackquote(str): +	case len(str) > format.SingleTermLine || !strconv.CanBackquote(str):  		// Append quoted and escaped string  		buf.B = strconv.AppendQuote(buf.B, str)  		return diff --git a/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go b/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go index 1804bdb23..3b1715a3c 100644 --- a/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go +++ b/vendor/codeberg.org/gruf/go-logger/v2/level/levels.go @@ -1,5 +1,10 @@  package level +import ( +	"fmt" +	"strings" +) +  // LEVEL defines a level of logging.  type LEVEL uint8 @@ -34,12 +39,6 @@ func Default() Levels {  		ERROR: "ERROR",  		FATAL: "FATAL",  		PANIC: "PANIC", - -		// we set these just so that -		// it can be debugged when someone -		// attempts to log with ALL/UNSET -		ALL:   "{all}", -		UNSET: "{unset}",  	}  } @@ -47,3 +46,18 @@ func Default() Levels {  func (l Levels) Get(lvl LEVEL) string {  	return l[int(lvl)]  } + +// Parse will attempt to decode a LEVEL from given string, checking (case insensitive) against strings in Levels. +func (l Levels) Parse(s string) (LEVEL, error) { +	// Ensure consistent casing +	s = strings.ToUpper(s) + +	for lvl := LEVEL(0); int(lvl) < len(l); lvl++ { +		// Compare to eqach known level +		if strings.ToUpper(l[lvl]) == s { +			return lvl, nil +		} +	} + +	return 0, fmt.Errorf("unrecognized log level: %s", s) +} diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index ca8849f30..b6be57d0a 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -7,124 +7,105 @@ import (  )  // WorkerFunc represents a function processable by a worker in WorkerPool. Note -// that implementations absolutely MUST check whether passed context is Done() -// otherwise stopping the pool may block for large periods of time. +// that implementations absolutely MUST check whether passed context is <-ctx.Done() +// otherwise stopping the pool may block indefinitely.  type WorkerFunc func(context.Context)  // WorkerPool provides a means of enqueuing asynchronous work.  type WorkerPool struct { -	queue chan WorkerFunc -	free  chan struct{} -	wait  sync.WaitGroup -	svc   Service +	fns chan WorkerFunc +	svc Service  } -// NewWorkerPool returns a new WorkerPool with provided worker count and WorkerFunc queue size. -// The number of workers represents how many WorkerFuncs can be executed simultaneously, and the -// queue size represents the max number of WorkerFuncs that can be queued at any one time. -func NewWorkerPool(workers int, queue int) WorkerPool { +// Start will start the main WorkerPool management loop in a new goroutine, along +// with requested number of child worker goroutines. Returns false if already running. +func (pool *WorkerPool) Start(workers int, queue int) bool { +	// Attempt to start the svc +	ctx, ok := pool.svc.doStart() +	if !ok { +		return false +	} +  	if workers < 1 { +		// Use $GOMAXPROCS as default worker count  		workers = runtime.GOMAXPROCS(0)  	} -	if queue < 1 { + +	if queue < 0 { +		// Set a reasonable queue default  		queue = workers * 2  	} -	return WorkerPool{ -		queue: make(chan WorkerFunc, queue), -		free:  make(chan struct{}, workers), -	} -} -// Start will attempt to start the worker pool, asynchronously. Return is success state. -func (pool *WorkerPool) Start() bool { -	ok := true +	// Allocate pool queue of given size +	fns := make(chan WorkerFunc, queue) +	pool.fns = fns -	done := make(chan struct{})  	go func() { -		ok = pool.svc.Run(func(ctx context.Context) { -			close(done) -			pool.process(ctx) -		}) -		if !ok { -			close(done) -		} -	}() -	<-done - -	return ok -} - -// Stop will attempt to stop the worker pool, this will block until stopped. Return is success state. -func (pool *WorkerPool) Stop() bool { -	return pool.svc.Stop() -} +		defer func() { +			// unlock single wait +			pool.svc.wait.Unlock() -// Running returns whether the worker pool is running. -func (pool *WorkerPool) Running() bool { -	return pool.svc.Running() -} +			// ensure stopped +			pool.svc.Stop() +		}() -// execute will take a queued function and pass it to a free worker when available. -func (pool *WorkerPool) execute(ctx context.Context, fn WorkerFunc) { -	var acquired bool +		var wait sync.WaitGroup -	// Set as running -	pool.wait.Add(1) +		// Start goroutine worker functions +		for i := 0; i < workers; i++ { +			go func() { +				// Trigger start / stop +				wait.Add(1) +				defer wait.Done() -	select { -	// Pool context cancelled -	// (we fall through and let -	// the function execute). -	case <-ctx.Done(): +				// Keep workers running on panic +				for !workerstart(ctx, fns) { +				} +			}() +		} -	// Free worker acquired. -	case pool.free <- struct{}{}: -		acquired = true -	} +		// Set GC finalizer to stop pool on dealloc +		runtime.SetFinalizer(pool, func(pool *WorkerPool) { +			pool.svc.Stop() +		}) -	go func() { -		defer func() { -			// defer in case panic -			if acquired { -				<-pool.free -			} -			pool.wait.Done() -		}() +		// Wait on ctx +		<-ctx.Done() -		// Run queued -		fn(ctx) +		// Stop all workers +		close(pool.fns) +		wait.Wait()  	}() + +	return true  } -// process is the background processing routine that passes queued functions to workers. -func (pool *WorkerPool) process(ctx context.Context) { -	for { -		select { -		// Pool context cancelled -		case <-ctx.Done(): -			for { -				select { -				// Pop and execute queued -				case fn := <-pool.queue: -					fn(ctx) // ctx is closed - -				// Empty, wait for workers -				default: -					pool.wait.Wait() -					return -				} -			} +// workerstart is the main worker runner routine, accepting functions from 'fns' until it is closed. +func workerstart(ctx context.Context, fns <-chan WorkerFunc) bool { +	// Recover and drop any panic +	defer func() { recover() }() -		// Queued func received -		case fn := <-pool.queue: -			pool.execute(ctx, fn) +	for { +		// Wait on next func +		fn, ok := <-fns +		if !ok { +			return true  		} + +		// Run with ctx +		fn(ctx)  	}  } +// Stop will stop the WorkerPool management loop, blocking until stopped. +func (pool *WorkerPool) Stop() bool { +	return pool.svc.Stop() +} +  // Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. -// This will block until the function has been queued. 'fn' will ALWAYS be executed, even on pool -// close, which can be determined via context <-ctx.Done(). WorkerFuncs MUST respect the passed context. +// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be +// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. +// WorkerFuncs MUST respect the passed context.  func (pool *WorkerPool) Enqueue(fn WorkerFunc) {  	// Check valid fn  	if fn == nil { @@ -132,29 +113,50 @@ func (pool *WorkerPool) Enqueue(fn WorkerFunc) {  	}  	select { -	// Pool context cancelled +	// Pool ctx cancelled +	case <-pool.svc.Done(): +		fn(closedctx) + +	// Placed fn in queue +	case pool.fns <- fn: +	} +} + +// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the +// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. +func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) { +	// Check valid fn +	if fn == nil { +		return +	} + +	select { +	// Caller ctx cancelled +	case <-ctx.Done(): + +	// Pool ctx cancelled  	case <-pool.svc.Done():  		fn(closedctx)  	// Placed fn in queue -	case pool.queue <- fn: +	case pool.fns <- fn:  	}  } -// EnqueueNoBlock attempts Enqueue but returns false if not executed. -func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool { +// EnqueueNow attempts Enqueue but returns false if not executed. +func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {  	// Check valid fn  	if fn == nil {  		return false  	}  	select { -	// Pool context cancelled +	// Pool ctx cancelled  	case <-pool.svc.Done():  		return false  	// Placed fn in queue -	case pool.queue <- fn: +	case pool.fns <- fn:  		return true  	// Queue is full @@ -165,10 +167,5 @@ func (pool *WorkerPool) EnqueueNoBlock(fn WorkerFunc) bool {  // Queue returns the number of currently queued WorkerFuncs.  func (pool *WorkerPool) Queue() int { -	return len(pool.queue) -} - -// Workers returns the number of currently active workers. -func (pool *WorkerPool) Workers() int { -	return len(pool.free) +	return len(pool.fns)  } diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index c0f878c45..68e8ea384 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -15,7 +15,7 @@ type Service struct {  	mu    sync.Mutex         // mu protects state changes  } -// Run will run the supplied function until completion, use given context to propagate cancel. +// Run will run the supplied function until completion, using given context to propagate cancel.  // Immediately returns false if the Service is already running, and true after completed run.  func (svc *Service) Run(fn func(context.Context)) bool {  	// Attempt to start the svc @@ -39,6 +39,33 @@ func (svc *Service) Run(fn func(context.Context)) bool {  	return true  } +// GoRun will run the supplied function until completion in a goroutine, using given context to +// propagate cancel. Immediately returns boolean indicating success, or that service is already running. +func (svc *Service) GoRun(fn func(context.Context)) bool { +	// Attempt to start the svc +	ctx, ok := svc.doStart() +	if !ok { +		return false +	} + +	go func() { +		defer func() { +			// unlock single wait +			svc.wait.Unlock() + +			// ensure stopped +			svc.Stop() +		}() + +		// Run user func +		if fn != nil { +			fn(ctx) +		} +	}() + +	return true +} +  // Stop will attempt to stop the service, cancelling the running function's context. Immediately  // returns false if not running, and true only after Service is fully stopped.  func (svc *Service) Stop() bool { diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go index 7831a39bd..e94c00024 100644 --- a/vendor/codeberg.org/gruf/go-sched/job.go +++ b/vendor/codeberg.org/gruf/go-sched/job.go @@ -61,6 +61,11 @@ func (job *Job) With(t Timing) *Job {  		panic("nil Timing")  	} +	if job.id != 0 { +		// Cannot update scheduled job +		panic("job already scheduled") +	} +  	if job.timing == emptytiming {  		// Set new timing  		job.timing = t @@ -76,12 +81,18 @@ func (job *Job) With(t Timing) *Job {  	return job  } -// Panic specifics how this job handles panics, default is an actual panic. -func (job *Job) Panic(fn func(interface{})) *Job { +// OnPanic specifies how this job handles panics, default is an actual panic. +func (job *Job) OnPanic(fn func(interface{})) *Job {  	if fn == nil {  		// Ensure a function  		panic("nil func")  	} + +	if job.id != 0 { +		// Cannot update scheduled job +		panic("job already scheduled") +	} +  	job.panic = fn  	return job  } diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index 8d076fea0..bdf0a371d 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -2,13 +2,18 @@ package sched  import (  	"context" +	"runtime"  	"sort" +	"sync" +	"sync/atomic"  	"time" -	"codeberg.org/gruf/go-atomics"  	"codeberg.org/gruf/go-runners"  ) +// precision is the maximum time we can offer scheduler run-time precision down to. +const precision = time.Millisecond +  var (  	// neverticks is a timer channel that never ticks (it's starved).  	neverticks = make(chan time.Time) @@ -27,20 +32,41 @@ type Scheduler struct {  	jobs []*Job           // jobs is a list of tracked Jobs to be executed  	jch  chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs  	svc  runners.Service  // svc manages the main scheduler routine -	jid  atomics.Uint64   // jid is used to iteratively generate unique IDs for jobs -} - -// New returns a new Scheduler instance with given job change queue size. -func NewScheduler(queue int) Scheduler { -	if queue < 0 { -		queue = 10 -	} -	return Scheduler{jch: make(chan interface{}, queue)} +	jid  atomic.Uint64    // jid is used to iteratively generate unique IDs for jobs  }  // Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.  func (sch *Scheduler) Start() bool { -	return sch.svc.Run(sch.run) +	var block sync.Mutex + +	// Use mutex to synchronize between started +	// goroutine and ourselves, to ensure that +	// we don't return before Scheduler init'd. +	block.Lock() +	defer block.Unlock() + +	ok := sch.svc.GoRun(func(ctx context.Context) { +		// Create Scheduler job channel +		sch.jch = make(chan interface{}) + +		// Unlock start routine +		block.Unlock() + +		// Set GC finalizer to ensure scheduler stopped +		runtime.SetFinalizer(sch, func(sch *Scheduler) { +			_ = sch.Stop() +		}) + +		// Enter main loop +		sch.run(ctx) +	}) + +	if ok { +		// Wait on goroutine +		block.Lock() +	} + +	return ok  }  // Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. @@ -55,24 +81,41 @@ func (sch *Scheduler) Running() bool {  // Schedule will add provided Job to the Scheduler, returning a cancel function.  func (sch *Scheduler) Schedule(job *Job) (cancel func()) { -	if job == nil { -		// Ensure there's a job! +	switch { +	// Check a job was passed +	case job == nil:  		panic("nil job") + +	// Check we are running +	case sch.jch == nil: +		panic("scheduler not running")  	} -	// Get last known job ID +	// Calculate next job ID  	last := sch.jid.Load() - -	// Give this job an ID and check overflow -	if job.id = sch.jid.Add(1); job.id < last { -		panic("scheduler job id overflow") +	next := sch.jid.Add(1) +	if next < last { +		panic("job id overflow")  	}  	// Pass job to scheduler +	job.id = next  	sch.jch <- job +	// Take ptrs to current state chs +	ctx := sch.svc.Done() +	jch := sch.jch +  	// Return cancel function for job ID -	return func() { sch.jch <- job.id } +	return func() { +		select { +		// Sched stopped +		case <-ctx: + +		// Cancel this job +		case jch <- next: +		} +	}  }  // run is the main scheduler run routine, which runs for as long as ctx is valid. @@ -136,11 +179,8 @@ func (sch *Scheduler) run(ctx context.Context) {  			// don't bother sleeping. It's wasted cycles only  			// sleeping for some obscenely tiny amount of time  			// we can't guarantee precision for. -			const precision = time.Millisecond -  			if until := next.Sub(now); until <= precision/1e3 { -				// This job is behind schedule, -				// set timer to always tick +				// This job is behind schedule, set to always tick.  				tch = alwaysticks  			} else {  				// Reset timer to period @@ -216,13 +256,13 @@ func (sch *Scheduler) schedule(now time.Time) {  			return  		} +		// Pass job to runner +		go job.Run(now) +  		// Update the next call time  		next := job.timing.Next(now)  		job.next.Store(next) -		// Run this job async! -		go job.Run(now) -  		if next.IsZero() {  			// Zero time, this job is done and can be dropped  			sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)  | 
