diff options
| author | 2025-10-17 17:36:24 +0200 | |
|---|---|---|
| committer | 2025-11-17 14:11:11 +0100 | |
| commit | f714b06fec5b93cf076d0f92eeb8aa7c32cfb531 (patch) | |
| tree | 8e1a89dd7b0db0f17b695557d03eede9055134ae /vendor/codeberg.org/gruf | |
| parent | [bugfix] recheck for just-processed-emoji within mutex lock before starting p... (diff) | |
| download | gotosocial-f714b06fec5b93cf076d0f92eeb8aa7c32cfb531.tar.xz | |
[chore] update dependencies (#4507)
- codeberg.org/gruf/go-runners: v1.6.3 -> v1.7.0
- codeberg.org/gruf/go-sched: v1.2.4 -> v1.3.0
- github.com/tdewolff/minify/v2: v2.24.3 -> v2.24.4
Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4507
Co-authored-by: kim <grufwub@gmail.com>
Co-committed-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/context.go | 9 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pointer.go | 22 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 292 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/process.go | 116 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 311 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/README.md | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/job.go | 64 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 225 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/timing.go | 25 |
10 files changed, 418 insertions, 650 deletions
diff --git a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go index a12b33ab9..12faa86b2 100644 --- a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go +++ b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go @@ -14,7 +14,7 @@ var scheduler sched.Scheduler func schedule(sweep func(time.Time), freq time.Duration) func() { if !scheduler.Running() { // ensure sched running - _ = scheduler.Start(nil) + _ = scheduler.Start() } return scheduler.Schedule(sched.NewJob(sweep).Every(freq)) } diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go index 12f7f1a10..e02dcab22 100644 --- a/vendor/codeberg.org/gruf/go-runners/context.go +++ b/vendor/codeberg.org/gruf/go-runners/context.go @@ -2,6 +2,7 @@ package runners import ( "context" + "sync/atomic" "time" ) @@ -19,9 +20,13 @@ func Closed() context.Context { // CtxWithCancel returns a new context.Context impl with cancel. func CtxWithCancel() (context.Context, context.CancelFunc) { + var once atomic.Uint32 ctx := make(chan struct{}) - cncl := func() { close(ctx) } - return CancelCtx(ctx), cncl + return CancelCtx(ctx), func() { + if once.CompareAndSwap(0, 1) { + close(ctx) + } + } } // CancelCtx is the simplest possible cancellable context. diff --git a/vendor/codeberg.org/gruf/go-runners/pointer.go b/vendor/codeberg.org/gruf/go-runners/pointer.go new file mode 100644 index 000000000..cc139309f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-runners/pointer.go @@ -0,0 +1,22 @@ +package runners + +import ( + "sync/atomic" + "unsafe" +) + +// atomic_pointer wraps an unsafe.Pointer with +// receiver methods for their atomic counterparts. +type atomic_pointer struct{ p unsafe.Pointer } + +func (p *atomic_pointer) Load() unsafe.Pointer { + return atomic.LoadPointer(&p.p) +} + +func (p *atomic_pointer) Store(ptr unsafe.Pointer) { + atomic.StorePointer(&p.p, ptr) +} + +func (p *atomic_pointer) CAS(old, new unsafe.Pointer) bool { + return atomic.CompareAndSwapPointer(&p.p, old, new) +} diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go deleted file mode 100644 index 644cde0b9..000000000 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ /dev/null @@ -1,292 +0,0 @@ -package runners - -import ( - "context" - "fmt" - "os" - "runtime" - "sync" - - "codeberg.org/gruf/go-errors/v2" -) - -// WorkerFunc represents a function processable by a worker in WorkerPool. Note -// that implementations absolutely MUST check whether passed context is <-ctx.Done() -// otherwise stopping the pool may block indefinitely. -type WorkerFunc func(context.Context) - -// WorkerPool provides a means of enqueuing asynchronous work. -type WorkerPool struct { - fns chan WorkerFunc - svc Service -} - -// Start will start the main WorkerPool management loop in a new goroutine, along -// with requested number of child worker goroutines. Returns false if already running. -func (pool *WorkerPool) Start(workers int, queue int) bool { - // Attempt to start the svc - ctx, ok := pool.svc.doStart() - if !ok { - return false - } - - if workers <= 0 { - // Use $GOMAXPROCS as default. - workers = runtime.GOMAXPROCS(0) - } - - if queue < 0 { - // Use reasonable queue default. - queue = workers * 10 - } - - // Allocate pool queue of given size. - // - // This MUST be set BEFORE we return and NOT in - // the launched goroutine, or there is a risk that - // the pool may appear as closed for a short time - // until the main goroutine has been entered. - fns := make(chan WorkerFunc, queue) - pool.fns = fns - - go func() { - defer func() { - // unlock single wait - pool.svc.wait.Unlock() - - // ensure stopped - pool.svc.Stop() - }() - - var wait sync.WaitGroup - - // Start goroutine worker functions - for i := 0; i < workers; i++ { - wait.Add(1) - - go func() { - defer wait.Done() - - // Run worker function (retry on panic) - for !worker_run(CancelCtx(ctx), fns) { - } - }() - } - - // Wait on ctx - <-ctx - - // Drain function queue. - // - // All functions in the queue MUST be - // run, so we pass them a closed context. - // - // This mainly allows us to block until - // the function queue is empty, as worker - // functions will also continue draining in - // the background with the (now) closed ctx. - for !drain_queue(fns) { - // retry on panic - } - - // Now the queue is empty, we can - // safely close the channel signalling - // all of the workers to return. - close(fns) - wait.Wait() - }() - - return true -} - -// Stop will stop the WorkerPool management loop, blocking until stopped. -func (pool *WorkerPool) Stop() bool { - return pool.svc.Stop() -} - -// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping). -func (pool *WorkerPool) Running() bool { - return pool.svc.Running() -} - -// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions. -func (pool *WorkerPool) Done() <-chan struct{} { - return pool.svc.Done() -} - -// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. -// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be -// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. -// WorkerFuncs MUST respect the passed context. -func (pool *WorkerPool) Enqueue(fn WorkerFunc) { - // Check valid fn - if fn == nil { - return - } - - select { - // Pool ctx cancelled - case <-pool.svc.Done(): - fn(closedctx) - - // Placed fn in queue - case pool.fns <- fn: - } -} - -// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the -// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. -func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool { - // Check valid fn - if fn == nil { - return false - } - - select { - // Caller ctx cancelled - case <-ctx.Done(): - return false - - // Pool ctx cancelled - case <-pool.svc.Done(): - return false - - // Placed fn in queue - case pool.fns <- fn: - return true - } -} - -// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case -// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue(). -// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed. -func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) { - // Check valid fn - if fn == nil { - return false - } - - select { - case <-ctx.Done(): - // We failed to add this entry to the worker queue before the - // incoming context was cancelled. So to ensure processing - // we simply queue it asynchronously and return early to caller. - go pool.Enqueue(fn) - return false - - case <-pool.svc.Done(): - // Pool ctx cancelled - fn(closedctx) - return false - - case pool.fns <- fn: - // Placed fn in queue - return true - } -} - -// EnqueueNow attempts Enqueue but returns false if not executed. -func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { - // Check valid fn - if fn == nil { - return false - } - - select { - // Pool ctx cancelled - case <-pool.svc.Done(): - return false - - // Placed fn in queue - case pool.fns <- fn: - return true - - // Queue is full - default: - return false - } -} - -// Queue returns the number of currently queued WorkerFuncs. -func (pool *WorkerPool) Queue() int { - var l int - pool.svc.While(func() { - l = len(pool.fns) - }) - return l -} - -// worker_run is the main worker routine, accepting functions from 'fns' until it is closed. -func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool { - defer func() { - // Recover and drop any panic - if r := recover(); r != nil { - - // Gather calling func frames. - pcs := make([]uintptr, 10) - n := runtime.Callers(3, pcs) - i := runtime.CallersFrames(pcs[:n]) - c := gatherFrames(i, n) - - const msg = "worker_run: recovered panic: %v\n\n%s\n" - fmt.Fprintf(os.Stderr, msg, r, c.String()) - } - }() - - for { - // Wait on next func - fn, ok := <-fns - if !ok { - return true - } - - // Run with ctx - fn(ctx) - } -} - -// drain_queue will drain and run all functions in worker queue, passing in a closed context. -func drain_queue(fns <-chan WorkerFunc) bool { - defer func() { - // Recover and drop any panic - if r := recover(); r != nil { - - // Gather calling func frames. - pcs := make([]uintptr, 10) - n := runtime.Callers(3, pcs) - i := runtime.CallersFrames(pcs[:n]) - c := gatherFrames(i, n) - - const msg = "worker_run: recovered panic: %v\n\n%s\n" - fmt.Fprintf(os.Stderr, msg, r, c.String()) - } - }() - - for { - select { - // Run with closed ctx - case fn := <-fns: - fn(closedctx) - - // Queue is empty - default: - return true - } - } -} - -// gatherFrames collates runtime frames from a frame iterator. -func gatherFrames(iter *runtime.Frames, n int) errors.Callers { - if iter == nil { - return nil - } - frames := make([]runtime.Frame, 0, n) - for { - f, ok := iter.Next() - if !ok { - break - } - frames = append(frames, f) - } - return frames -} diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go index ca39ac0d0..3feeff258 100644 --- a/vendor/codeberg.org/gruf/go-runners/process.go +++ b/vendor/codeberg.org/gruf/go-runners/process.go @@ -2,80 +2,72 @@ package runners import ( "fmt" + "unsafe" + "sync" ) -// Processable defines a runnable process with error return -// that can be passed to a Processor instance for managed running. -type Processable func() error - // Processor acts similarly to a sync.Once object, except that it is reusable. After // the first call to Process(), any further calls before this first has returned will // block until the first call has returned, and return the same error. This ensures // that only a single instance of it is ever running at any one time. -type Processor struct { - mutex sync.Mutex - wait *sync.WaitGroup - err *error -} +type Processor struct{ p atomic_pointer } // Process will process the given function if first-call, else blocking until // the first function has returned, returning the same error result. -func (p *Processor) Process(proc Processable) (err error) { - // Acquire state lock. - p.mutex.Lock() - - if p.wait != nil { - // Already running. - // - // Get current ptrs. - waitPtr := p.wait - errPtr := p.err - - // Free state lock. - p.mutex.Unlock() - - // Wait for finish. - waitPtr.Wait() - return *errPtr - } - - // Alloc waiter for new process. - var wait sync.WaitGroup - - // No need to alloc new error as - // we use the alloc'd named error - // return required for panic handling. - - // Reset ptrs. - p.wait = &wait - p.err = &err - - // Set started. - wait.Add(1) - p.mutex.Unlock() - - defer func() { - if r := recover(); r != nil { - if err != nil { - rOld := r // wrap the panic so we don't lose existing returned error - r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) - } - - // Catch any panics and wrap as error. - err = fmt.Errorf("caught panic: %v", r) +func (p *Processor) Process(proc func() error) (err error) { + var i *proc_instance + + for { + // Attempt to load existing instance. + ptr := (*proc_instance)(p.p.Load()) + if ptr != nil { + + // Wait on existing. + ptr.wait.Wait() + err = ptr.err + return } - // Mark done. - wait.Done() + if i == nil { + // Allocate instance. + i = new(proc_instance) + i.wait.Add(1) + } - // Set stopped. - p.mutex.Lock() - p.wait = nil - p.mutex.Unlock() - }() + // Try to acquire start slot by + // setting ptr to *our* instance. + if p.p.CAS(nil, unsafe.Pointer(i)) { + defer func() { + if r := recover(); r != nil { + if i.err != nil { + rOld := r // wrap the panic so we don't lose existing returned error + r = fmt.Errorf("panic occured after error %q: %v", i.err.Error(), rOld) + } + + // Catch panics and wrap as error return. + i.err = fmt.Errorf("caught panic: %v", r) + } + + // Set return. + err = i.err + + // Release the + // goroutines. + i.wait.Done() + + // Free processor. + p.p.Store(nil) + }() + + // Run func. + i.err = proc() + return + } + } +} - // Run process. - err = proc() - return +type proc_instance struct { + wait sync.WaitGroup + err error } diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index 8a7c0051a..fe41807f9 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -3,215 +3,220 @@ package runners import ( "context" "sync" + "sync/atomic" + "unsafe" ) -// Service provides a means of tracking a single long-running service, provided protected state -// changes and preventing multiple instances running. Also providing service state information. -type Service struct { - state uint32 // 0=stopped, 1=running, 2=stopping - mutex sync.Mutex // mutex protects overall state changes - wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' - ctx chan struct{} // ctx is the current context for running function (or nil if not running) -} +// Service provides a means of tracking a single long-running Service, provided protected state +// changes and preventing multiple instances running. Also providing Service state information. +type Service struct{ p atomic_pointer } // Run will run the supplied function until completion, using given context to propagate cancel. // Immediately returns false if the Service is already running, and true after completed run. -func (svc *Service) Run(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) Run(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) - - return true + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return } // GoRun will run the supplied function until completion in a goroutine, using given context to // propagate cancel. Immediately returns boolean indicating success, or that service is already running. -func (svc *Service) GoRun(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) GoRun(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } go func() { - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) }() - return true + return } // RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns. -func (svc *Service) RunWait(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) RunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - <-ctx // block - return false + <-ptr.done + return } - defer func() { - // unlock single wait - svc.wait.Unlock() + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return +} - // ensure stopped - _ = svc.Stop() - }() +// GoRunWait is functionally the same as .RunWait(), but blocks until the first instance of RunWait() returns. +func (svc *Service) GoRunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance - // Run with context. - fn(CancelCtx(ctx)) + // Attempt to start. + ptr, ok = svc.start() + if !ok { + <-ptr.done + return + } + + go func() { + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + }() - return true + return } // Stop will attempt to stop the service, cancelling the running function's context. Immediately // returns false if not running, and true only after Service is fully stopped. func (svc *Service) Stop() bool { - // Attempt to stop the svc - ctx, ok := svc.doStop() - if !ok { - return false - } - - defer func() { - // Get svc lock - svc.mutex.Lock() - - // Wait until stopped - svc.wait.Lock() - svc.wait.Unlock() + return svc.must_get().stop() +} - // Reset the svc - svc.ctx = nil - svc.state = 0 - svc.mutex.Unlock() - }() +// Running returns if Service is running (i.e. NOT fully stopped, but may be *stopping*). +func (svc *Service) Running() bool { + return svc.must_get().running() +} - // Cancel ctx - close(ctx) +// Done returns a channel that's closed when Service.Stop() is called. It is +// the same channel provided to the currently running service function. +func (svc *Service) Done() <-chan struct{} { + return svc.must_get().done +} - return true +func (svc *Service) start() (*svc_instance, bool) { + ptr := svc.must_get() + return ptr, ptr.start() } -// While allows you to execute given function guaranteed within current -// service state. Please note that this will hold the underlying service -// state change mutex open while executing the function. -func (svc *Service) While(fn func()) { - // Protect state change - svc.mutex.Lock() - defer svc.mutex.Unlock() +func (svc *Service) on_done(ptr *svc_instance) { + // Ensure stopped. + ptr.stop_private() - // Run - fn() + // Free service. + svc.p.Store(nil) } -// doStart will safely set Service state to started, returning a ptr to this context insance. -func (svc *Service) doStart() (chan struct{}, bool) { - // Protect startup - svc.mutex.Lock() +func (svc *Service) must_get() *svc_instance { + var newptr *svc_instance - if svc.ctx == nil { - // this will only have been allocated - // if svc.Done() was already called. - svc.ctx = make(chan struct{}) - } - - // Take our own ptr - ctx := svc.ctx + for { + // Try to load existing instance. + ptr := (*svc_instance)(svc.p.Load()) + if ptr != nil { + return ptr + } - if svc.state != 0 { - // State was not stopped. - svc.mutex.Unlock() - return ctx, false - } + if newptr == nil { + // Allocate new instance. + newptr = new(svc_instance) + newptr.done = make(chan struct{}) + } - // Set started. - svc.state = 1 + // Attempt to acquire slot by setting our ptr. + if !svc.p.CAS(nil, unsafe.Pointer(newptr)) { + continue + } - // Start waiter. - svc.wait.Lock() + return newptr + } +} - // Unlock and return - svc.mutex.Unlock() - return ctx, true +type svc_instance struct { + wait sync.WaitGroup + done chan struct{} + state atomic.Uint32 } -// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance. -func (svc *Service) doStop() (chan struct{}, bool) { - // Protect stop - svc.mutex.Lock() +const ( + started_bit = uint32(1) << 0 + stopping_bit = uint32(1) << 1 + finished_bit = uint32(1) << 2 +) - if svc.state != 1 /* not started */ { - svc.mutex.Unlock() - return nil, false - } +func (i *svc_instance) start() (ok bool) { + // Acquire start by setting 'started' bit. + switch old := i.state.Or(started_bit); { - // state stopping - svc.state = 2 + case old&finished_bit != 0: + // Already finished. - // Take our own ptr - // and unlock state - ctx := svc.ctx - svc.mutex.Unlock() + case old&started_bit == 0: + // Successfully started! + i.wait.Add(1) + ok = true + } - return ctx, true + return } -// Running returns if Service is running (i.e. state NOT stopped / stopping). -func (svc *Service) Running() bool { - svc.mutex.Lock() - state := svc.state - svc.mutex.Unlock() - return (state == 1) -} +// NOTE: MAY ONLY BE CALLED BY STARTING GOROUTINE. +func (i *svc_instance) stop_private() { + // Attempt set both stopping and finished bits. + old := i.state.Or(stopping_bit | finished_bit) -// Done returns a channel that's closed when Service.Stop() is called. It is -// the same channel provided to the currently running service function. -func (svc *Service) Done() <-chan struct{} { - var done <-chan struct{} - - svc.mutex.Lock() - switch svc.state { - // stopped - case 0: - if svc.ctx == nil { - // here we create a new context so that the - // returned 'done' channel here will still - // be valid for when Service is next started. - svc.ctx = make(chan struct{}) - } - done = svc.ctx + // Only if we weren't already + // stopping do we close channel. + if old&stopping_bit == 0 { + close(i.done) + } - // started - case 1: - done = svc.ctx + // Release + // waiters. + i.wait.Done() +} - // stopping - case 2: - done = svc.ctx +func (i *svc_instance) stop() (ok bool) { + // Attempt to set the 'stopping' bit. + switch old := i.state.Or(stopping_bit); { + + case old&finished_bit != 0: + // Already finished. + return + + case old&started_bit == 0: + // This was never started + // to begin with, just mark + // as fully finished here. + _ = i.state.Or(finished_bit) + return + + case old&stopping_bit == 0: + // We succesfully stopped + // instance, close channel. + close(i.done) + ok = true } - svc.mutex.Unlock() - return done + // Wait on stop. + i.wait.Wait() + return +} + +// running returns whether service was started and +// is not yet finished. that indicates that it may +// have been started and not yet stopped, or that +// it was started, stopped and not yet returned. +func (i *svc_instance) running() bool { + val := i.state.Load() + return val&started_bit != 0 && + val&finished_bit == 0 } diff --git a/vendor/codeberg.org/gruf/go-sched/README.md b/vendor/codeberg.org/gruf/go-sched/README.md index d32a961ae..0a9439577 100644 --- a/vendor/codeberg.org/gruf/go-sched/README.md +++ b/vendor/codeberg.org/gruf/go-sched/README.md @@ -2,4 +2,4 @@ A simple job (both run-once and recurring) queueing library with down-to millisecond precision. -Precision estimates based on test output (running on i7-11800h): 1ms precision with 80% tolerance.
\ No newline at end of file +Precision estimates based on test output (running on AMD Ryzen 7 7840u): 2ms precision with 95% tolerance. diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go index 2531769d6..f3bd869d2 100644 --- a/vendor/codeberg.org/gruf/go-sched/job.go +++ b/vendor/codeberg.org/gruf/go-sched/job.go @@ -14,10 +14,9 @@ import ( // holding onto a next execution time safely in a concurrent environment. type Job struct { id uint64 - next unsafe.Pointer // *time.Time + next atomic_time timing Timing call func(time.Time) - panic func(interface{}) } // NewJob returns a new Job to run given function. @@ -30,28 +29,31 @@ func NewJob(fn func(now time.Time)) *Job { j := &Job{ // set defaults timing: emptytiming, // i.e. fire immediately call: fn, - panic: func(i interface{}) { panic(i) }, } return j } -// At sets this Job to execute at time, by passing (*sched.Once)(&at) to .With(). See .With() for details. +// At sets this Job to execute at time, by passing +// (*sched.Once)(&at) to .With(). See .With() for details. func (job *Job) At(at time.Time) *Job { return job.With((*Once)(&at)) } -// Every sets this Job to execute every period, by passing sched.Period(period) to .With(). See .With() for details. +// Every sets this Job to execute every period, by passing +// sched.Period(period) to .With(). See .With() for details. func (job *Job) Every(period time.Duration) *Job { return job.With(Periodic(period)) } -// EveryAt sets this Job to execute every period starting at time, by passing &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details. +// EveryAt sets this Job to execute every period starting at time, by passing +// &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details. func (job *Job) EveryAt(at time.Time, period time.Duration) *Job { return job.With(&PeriodicAt{Once: Once(at), Period: Periodic(period)}) } -// With sets this Job's timing to given implementation, or if already set will wrap existing using sched.TimingWrap{}. +// With sets this Job's timing to given implementation, or +// if already set will wrap existing using sched.TimingWrap{}. func (job *Job) With(t Timing) *Job { if t == nil { // Ensure a timing @@ -78,44 +80,16 @@ func (job *Job) With(t Timing) *Job { return job } -// OnPanic specifies how this job handles panics, default is an actual panic. -func (job *Job) OnPanic(fn func(interface{})) *Job { - if fn == nil { - // Ensure a function - panic("nil func") - } - - if job.id != 0 { - // Cannot update scheduled job - panic("job already scheduled") - } - - job.panic = fn - return job -} - // Next returns the next time this Job is expected to run. func (job *Job) Next() time.Time { - return loadTime(&job.next) + return job.next.Load() } // Run will execute this Job and pass through given now time. -func (job *Job) Run(now time.Time) { - defer func() { - switch r := recover(); { - case r == nil: - // no panic - case job != nil && - job.panic != nil: - job.panic(r) - default: - panic(r) - } - }() - job.call(now) -} +func (job *Job) Run(now time.Time) { job.call(now) } -// String provides a debuggable string representation of Job including ID, next time and Timing type. +// String provides a debuggable string representation +// of Job including ID, next time and Timing type. func (job *Job) String() string { var buf strings.Builder buf.WriteByte('{') @@ -123,7 +97,7 @@ func (job *Job) String() string { buf.WriteString(strconv.FormatUint(job.id, 10)) buf.WriteByte(' ') buf.WriteString("next=") - buf.WriteString(loadTime(&job.next).Format(time.StampMicro)) + buf.WriteString(job.next.Load().Format(time.StampMicro)) buf.WriteByte(' ') buf.WriteString("timing=") buf.WriteString(reflect.TypeOf(job.timing).String()) @@ -131,13 +105,15 @@ func (job *Job) String() string { return buf.String() } -func loadTime(p *unsafe.Pointer) time.Time { - if p := atomic.LoadPointer(p); p != nil { +type atomic_time struct{ p unsafe.Pointer } + +func (t *atomic_time) Load() time.Time { + if p := atomic.LoadPointer(&t.p); p != nil { return *(*time.Time)(p) } return zerotime } -func storeTime(p *unsafe.Pointer, t time.Time) { - atomic.StorePointer(p, unsafe.Pointer(&t)) +func (t *atomic_time) Store(v time.Time) { + atomic.StorePointer(&t.p, unsafe.Pointer(&v)) } diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index 79913a9b3..9ab5b8bc8 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -6,18 +6,22 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "codeberg.org/gruf/go-runners" ) -// precision is the maximum time we can offer scheduler run-time precision down to. -const precision = time.Millisecond +// Precision is the maximum time we can +// offer scheduler run-time precision down to. +const Precision = 2 * time.Millisecond var ( - // neverticks is a timer channel that never ticks (it's starved). + // neverticks is a timer channel + // that never ticks (it's starved). neverticks = make(chan time.Time) - // alwaysticks is a timer channel that always ticks (it's closed). + // alwaysticks is a timer channel + // that always ticks (it's closed). alwaysticks = func() chan time.Time { ch := make(chan time.Time) close(ch) @@ -28,48 +32,51 @@ var ( // Scheduler provides a means of running jobs at specific times and // regular intervals, all while sharing a single underlying timer. type Scheduler struct { - jobs []*Job // jobs is a list of tracked Jobs to be executed - jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs - svc runners.Service // svc manages the main scheduler routine - jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs - rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs + svc runners.Service // svc manages the main scheduler routine + jobs []*Job // jobs is a list of tracked Jobs to be executed + jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs + jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs } -// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start(gorun func(func())) bool { - var block sync.Mutex +// Start will attempt to start the Scheduler. Immediately returns false +// if the Service is already running, and true after completed run. +func (sch *Scheduler) Start() bool { + var wait sync.WaitGroup - // Use mutex to synchronize between started + // Use waiter to synchronize between started // goroutine and ourselves, to ensure that // we don't return before Scheduler init'd. - block.Lock() - defer block.Unlock() + wait.Add(1) ok := sch.svc.GoRun(func(ctx context.Context) { - // Create Scheduler job channel - sch.jch = make(chan interface{}) - - // Set goroutine runner function - if sch.rgo = gorun; sch.rgo == nil { - sch.rgo = func(f func()) { go f() } - } - - // Unlock start routine - block.Unlock() - - // Enter main loop - sch.run(ctx) + // Prepare new channel. + ch := new(channel) + ch.ctx = ctx.Done() + ch.ch = make(chan interface{}) + sch.jch.Store(ch) + + // Release + // start fn + wait.Done() + + // Main loop + sch.run(ch) }) if ok { - // Wait on goroutine - block.Lock() + // Wait on + // goroutine + wait.Wait() + } else { + // Release + wait.Done() } return ok } -// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. +// Stop will attempt to stop the Scheduler. Immediately returns false +// if not running, and true only after Scheduler is fully stopped. func (sch *Scheduler) Stop() bool { return sch.svc.Stop() } @@ -86,45 +93,38 @@ func (sch *Scheduler) Done() <-chan struct{} { // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - switch { - // Check a job was passed - case job == nil: + if job == nil { panic("nil job") - - // Check we are running - case !sch.Running(): - panic("scheduler not running") } - // Calculate next job ID - last := sch.jid.Load() - next := sch.jid.Add(1) - if next < last { - panic("job id overflow") + // Load job channel. + ch := sch.jch.Load() + if ch == nil { + panic("not running") } - // Pass job to scheduler - job.id = next - sch.jch <- job + // Calculate next job ID. + job.id = sch.jid.Add(1) - // Take ptrs to current state chs - ctx := sch.svc.Done() - jch := sch.jch - - // Return cancel function for job ID - return func() { - select { - // Sched stopped - case <-ctx: - - // Cancel this job - case jch <- next: - } + // Pass job + // to channel. + if !ch.w(job) { + panic("not running") } + + // Return cancel function for job + return func() { ch.w(job.id) } } // run is the main scheduler run routine, which runs for as long as ctx is valid. -func (sch *Scheduler) run(ctx context.Context) { +func (sch *Scheduler) run(ch *channel) { + defer ch.close() + if ch == nil { + panic("nil channel") + } else if sch == nil { + panic("nil scheduler") + } + var ( // now stores the current time, and will only be // set when the timer channel is set to be the @@ -165,39 +165,43 @@ func (sch *Scheduler) run(ctx context.Context) { // Get now time. now = time.Now() - // Sort jobs by next occurring. + // Sort by next occurring. sort.Sort(byNext(sch.jobs)) // Get next job time. next := sch.jobs[0].Next() - // If this job is _just_ about to be ready, we don't bother + // If this job is *just* about to be ready, we don't bother // sleeping. It's wasted cycles only sleeping for some obscenely // tiny amount of time we can't guarantee precision for. - if until := next.Sub(now); until <= precision/1e3 { + if until := next.Sub(now); until <= Precision/1e3 { + // This job is behind, // set to always tick. tch = alwaysticks } else { + // Reset timer to period. timer.Reset(until) - tch = timer.C timerset = true + tch = timer.C } } else { + // Unset timer tch = neverticks } select { // Scheduler stopped - case <-ctx.Done(): + case <-ch.done(): stopdrain() return - // Timer ticked, run scheduled - case t := <-tch: - if !timerset { + // Timer ticked, + // run scheduled. + case t, ok := <-tch: + if !ok { // 'alwaysticks' returns zero // times, BUT 'now' will have // been set during above sort. @@ -205,8 +209,9 @@ func (sch *Scheduler) run(ctx context.Context) { } sch.schedule(t) - // Received update, handle job/id - case v := <-sch.jch: + // Received update, + // handle job/id. + case v := <-ch.r(): sch.handle(v) stopdrain() } @@ -220,21 +225,21 @@ func (sch *Scheduler) handle(v interface{}) { switch v := v.(type) { // New job added case *Job: - // Get current time + // Get current time. now := time.Now() - // Update the next call time + // Update next call time. next := v.timing.Next(now) - storeTime(&v.next, next) + v.next.Store(next) - // Append this job to queued + // Append this job to queued/ sch.jobs = append(sch.jobs, v) // Job removed case uint64: for i := 0; i < len(sch.jobs); i++ { if sch.jobs[i].id == v { - // This is the job we're looking for! Drop this + // This is the job we're looking for! Drop this. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) return } @@ -242,29 +247,28 @@ func (sch *Scheduler) handle(v interface{}) { } } -// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. +// schedule will iterate through the scheduler jobs and +// execute those necessary, updating their next call time. func (sch *Scheduler) schedule(now time.Time) { for i := 0; i < len(sch.jobs); { - // Scope our own var + // Scope our own var. job := sch.jobs[i] // We know these jobs are ordered by .Next(), so as soon - // as we reach one with .Next() after now, we can return + // as we reach one with .Next() after now, we can return. if job.Next().After(now) { return } - // Pass to runner - sch.rgo(func() { - job.Run(now) - }) + // Run the job. + go job.Run(now) - // Update the next call time + // Update the next call time. next := job.timing.Next(now) - storeTime(&job.next, next) + job.next.Store(next) if next.IsZero() { - // Zero time, this job is done and can be dropped + // Zero time, this job is done and can be dropped. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) continue } @@ -274,7 +278,8 @@ func (sch *Scheduler) schedule(now time.Time) { } } -// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. +// byNext is an implementation of sort.Interface +// to sort Jobs by their .Next() time. type byNext []*Job func (by byNext) Len() int { @@ -288,3 +293,51 @@ func (by byNext) Less(i int, j int) bool { func (by byNext) Swap(i int, j int) { by[i], by[j] = by[j], by[i] } + +// atomic_channel wraps a *channel{} with atomic store / load. +type atomic_channel struct{ p unsafe.Pointer } + +func (c *atomic_channel) Load() *channel { + if p := atomic.LoadPointer(&c.p); p != nil { + return (*channel)(p) + } + return nil +} + +func (c *atomic_channel) Store(v *channel) { + atomic.StorePointer(&c.p, unsafe.Pointer(v)) +} + +// channel wraps both a context done +// channel and a generic interface channel +// to support safe writing to an underlying +// channel that correctly fails after close. +type channel struct { + ctx <-chan struct{} + ch chan interface{} +} + +// done returns internal context channel. +func (ch *channel) done() <-chan struct{} { + return ch.ctx +} + +// r returns internal channel for read. +func (ch *channel) r() chan interface{} { + return ch.ch +} + +// w writes 'v' to channel, or returns false if closed. +func (ch *channel) w(v interface{}) bool { + select { + case <-ch.ctx: + return false + case ch.ch <- v: + return true + } +} + +// close closes underlying channel. +func (ch *channel) close() { + close(ch.ch) +} diff --git a/vendor/codeberg.org/gruf/go-sched/timing.go b/vendor/codeberg.org/gruf/go-sched/timing.go index 33c230fa5..cb9b4925a 100644 --- a/vendor/codeberg.org/gruf/go-sched/timing.go +++ b/vendor/codeberg.org/gruf/go-sched/timing.go @@ -5,11 +5,13 @@ import ( ) var ( - // zerotime is zero time.Time (unix epoch). - zerotime = time.Time{} + // zerotime is zero + // time.Time (unix epoch). + zerotime time.Time - // emptytiming is a global timingempty to check against. - emptytiming = timingempty{} + // emptytiming is a global + // timingempty to check against. + emptytiming timingempty ) // Timing provides scheduling for a Job, determining the next time @@ -20,14 +22,16 @@ type Timing interface { Next(time.Time) time.Time } -// timingempty is a 'zero' Timing implementation that always returns zero time. +// timingempty is a 'zero' Timing implementation +// that always returns zero time. type timingempty struct{} func (timingempty) Next(time.Time) time.Time { return zerotime } -// Once implements Timing to provide a run-once Job execution. +// Once implements Timing to +// provide a run-once Job execution. type Once time.Time func (o *Once) Next(time.Time) time.Time { @@ -36,14 +40,16 @@ func (o *Once) Next(time.Time) time.Time { return ret } -// Periodic implements Timing to provide a recurring Job execution. +// Periodic implements Timing to +// provide a recurring Job execution. type Periodic time.Duration func (p Periodic) Next(now time.Time) time.Time { return now.Add(time.Duration(p)) } -// PeriodicAt implements Timing to provide a recurring Job execution starting at 'Once' time. +// PeriodicAt implements Timing to provide a +// recurring Job execution starting at 'Once' time. type PeriodicAt struct { Once Once Period Periodic @@ -56,7 +62,8 @@ func (p *PeriodicAt) Next(now time.Time) time.Time { return p.Period.Next(now) } -// TimingWrap allows combining two different Timing implementations. +// TimingWrap allows combining two +// different Timing implementations. type TimingWrap struct { Outer Timing Inner Timing |
