diff options
| author | 2025-10-17 17:36:24 +0200 | |
|---|---|---|
| committer | 2025-11-17 14:11:11 +0100 | |
| commit | f714b06fec5b93cf076d0f92eeb8aa7c32cfb531 (patch) | |
| tree | 8e1a89dd7b0db0f17b695557d03eede9055134ae | |
| parent | [bugfix] recheck for just-processed-emoji within mutex lock before starting p... (diff) | |
| download | gotosocial-f714b06fec5b93cf076d0f92eeb8aa7c32cfb531.tar.xz | |
[chore] update dependencies (#4507)
- codeberg.org/gruf/go-runners: v1.6.3 -> v1.7.0
- codeberg.org/gruf/go-sched: v1.2.4 -> v1.3.0
- github.com/tdewolff/minify/v2: v2.24.3 -> v2.24.4
Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4507
Co-authored-by: kim <grufwub@gmail.com>
Co-committed-by: kim <grufwub@gmail.com>
| -rw-r--r-- | go.mod | 10 | ||||
| -rw-r--r-- | go.sum | 20 | ||||
| -rw-r--r-- | internal/scheduler/scheduler.go | 12 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/context.go | 9 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pointer.go | 22 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 292 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/process.go | 116 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 311 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/README.md | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/job.go | 64 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/scheduler.go | 225 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-sched/timing.go | 25 | ||||
| -rw-r--r-- | vendor/github.com/tdewolff/parse/v2/binary.go | 64 | ||||
| -rw-r--r-- | vendor/github.com/tdewolff/parse/v2/binary_unix.go | 20 | ||||
| -rw-r--r-- | vendor/github.com/tdewolff/parse/v2/html/lex.go | 10 | ||||
| -rw-r--r-- | vendor/modules.txt | 10 |
17 files changed, 495 insertions, 719 deletions
@@ -15,7 +15,7 @@ require ( code.superseriousbusiness.org/oauth2/v4 v4.5.4-0.20250812115401-3961e46a7384 codeberg.org/gruf/go-bytesize v1.0.4 codeberg.org/gruf/go-byteutil v1.3.0 - codeberg.org/gruf/go-cache/v3 v3.6.1 + codeberg.org/gruf/go-cache/v3 v3.6.2 codeberg.org/gruf/go-caller v0.0.0-20250806133437-db8d0b1f71cf codeberg.org/gruf/go-debug v1.3.0 codeberg.org/gruf/go-errors/v2 v2.3.2 @@ -27,8 +27,8 @@ require ( codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253 codeberg.org/gruf/go-mutexes v1.5.8 - codeberg.org/gruf/go-runners v1.6.3 - codeberg.org/gruf/go-sched v1.2.4 + codeberg.org/gruf/go-runners v1.7.0 + codeberg.org/gruf/go-sched v1.3.0 codeberg.org/gruf/go-split v1.2.0 codeberg.org/gruf/go-storage v0.3.1 codeberg.org/gruf/go-structr v0.9.13 @@ -62,7 +62,7 @@ require ( github.com/spf13/pflag v1.0.10 github.com/spf13/viper v1.21.0 github.com/stretchr/testify v1.11.1 - github.com/tdewolff/minify/v2 v2.24.3 + github.com/tdewolff/minify/v2 v2.24.4 github.com/temoto/robotstxt v1.1.2 github.com/tetratelabs/wazero v1.9.0 github.com/tomnomnom/linkheader v0.0.0-20250811210735-e5fe3b51442e @@ -201,7 +201,7 @@ require ( github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect github.com/spf13/afero v1.15.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect - github.com/tdewolff/parse/v2 v2.8.3 // indirect + github.com/tdewolff/parse/v2 v2.8.4 // indirect github.com/tinylib/msgp v1.3.0 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect github.com/toqueteos/webbrowser v1.2.0 // indirect @@ -14,8 +14,8 @@ codeberg.org/gruf/go-bytesize v1.0.4 h1:LEojK46lUoE748Om7yldx6kLe6jCCuiytz5IZ8vH codeberg.org/gruf/go-bytesize v1.0.4/go.mod h1:n/GU8HzL9f3UNp/mUKyr1qVmTlj7+xacpp0OHfkvLPs= codeberg.org/gruf/go-byteutil v1.3.0 h1:nRqJnCcRQ7xbfU6azw7zOzJrSMDIJHBqX6FL9vEMYmU= codeberg.org/gruf/go-byteutil v1.3.0/go.mod h1:chgnZz1LUcfaObaIFglxF5MRYQkJGjQf4WwVz95ccCM= -codeberg.org/gruf/go-cache/v3 v3.6.1 h1:sY1XhYeskjZAuYeMm5R0o4Qymru5taNbzmZPSn1oXLE= -codeberg.org/gruf/go-cache/v3 v3.6.1/go.mod h1:JUNjc4E8gRccn3t+B99akxURFrU6NTDkvFVcwiZirnw= +codeberg.org/gruf/go-cache/v3 v3.6.2 h1:fQn7Dkj5gQpSNjlnoFeotXHKwbAh1PTH4qrD5BMm5ZA= +codeberg.org/gruf/go-cache/v3 v3.6.2/go.mod h1:yjmrOyda2K8B5sAfCpPPAAqI3oewf4mNQmvxygmP+g8= codeberg.org/gruf/go-caller v0.0.0-20250806133437-db8d0b1f71cf h1:Rzu7WLpscj2w1N+ClIHlJoTYf9SuqZrZ7E4f9T7jGdw= codeberg.org/gruf/go-caller v0.0.0-20250806133437-db8d0b1f71cf/go.mod h1:jEyYiqCzH1TaxfclSFYthE32oI0dsMnRS6EHqy6y0uo= codeberg.org/gruf/go-debug v1.3.0 h1:PIRxQiWUFKtGOGZFdZ3Y0pqyfI0Xr87j224IYe2snZs= @@ -46,10 +46,10 @@ codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253 h1:qPAY72xCWlySV codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253/go.mod h1:761koiXmqfgzvu5mez2Rk7YlwWilpqJ/zv5hIA6NoNI= codeberg.org/gruf/go-mutexes v1.5.8 h1:HRGnvT4COb3jX9xdeoSUUbjPgmk5kXPuDfld9ksUJKA= codeberg.org/gruf/go-mutexes v1.5.8/go.mod h1:21sy/hWH8dDQBk7ocsxqo2GNpWiIir+e82RG3hjnN20= -codeberg.org/gruf/go-runners v1.6.3 h1:To/AX7eTrWuXrTkA3RA01YTP5zha1VZ68LQ+0D4RY7E= -codeberg.org/gruf/go-runners v1.6.3/go.mod h1:oXAaUmG2VxoKttpCqZGv5nQBeSvZSR2BzIk7h1yTRlU= -codeberg.org/gruf/go-sched v1.2.4 h1:ddBB9o0D/2oU8NbQ0ldN5aWxogpXPRBATWi58+p++Hw= -codeberg.org/gruf/go-sched v1.2.4/go.mod h1:wad6l+OcYGWMA2TzNLMmLObsrbBDxdJfEy5WvTgBjNk= +codeberg.org/gruf/go-runners v1.7.0 h1:Z+8Qne4H9nAdZZbA4cij0PWhhJxtigUGA4Mp7griYes= +codeberg.org/gruf/go-runners v1.7.0/go.mod h1:1xBodiyuPfosJga+NYTfeepQYUrlBGCAa4NuQTbtiBw= +codeberg.org/gruf/go-sched v1.3.0 h1:3Y+Vb6p+rt05iUC6Oj3TDFc9GQZCDImDfTKSUKUI9WA= +codeberg.org/gruf/go-sched v1.3.0/go.mod h1:qL9MdPdBFaNXzSfumpQ18TStBAwnRddCTD+wlrZUEgI= codeberg.org/gruf/go-split v1.2.0 h1:PmzL23nVEVHm8VxjsJmv4m4wGQz2bGgQw52dgSSj65c= codeberg.org/gruf/go-split v1.2.0/go.mod h1:0rejWJpqvOoFAd7nwm5tIXYKaAqjtFGOXmTqQV+VO38= codeberg.org/gruf/go-storage v0.3.1 h1:g66UIM/xXnEk9ejT+W0T9s/PODBZhXa/8ajzeY/MELI= @@ -426,10 +426,10 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= -github.com/tdewolff/minify/v2 v2.24.3 h1:BaKgWSFLKbKDiUskbeRgbe2n5d1Ci1x3cN/eXna8zOA= -github.com/tdewolff/minify/v2 v2.24.3/go.mod h1:1JrCtoZXaDbqioQZfk3Jdmr0GPJKiU7c1Apmb+7tCeE= -github.com/tdewolff/parse/v2 v2.8.3 h1:5VbvtJ83cfb289A1HzRA9sf02iT8YyUwN84ezjkdY1I= -github.com/tdewolff/parse/v2 v2.8.3/go.mod h1:Hwlni2tiVNKyzR1o6nUs4FOF07URA+JLBLd6dlIXYqo= +github.com/tdewolff/minify/v2 v2.24.4 h1:pQyr6eWDa+RXtAoZg+6wurh0jB9ojqw/qc5LlU7/z6c= +github.com/tdewolff/minify/v2 v2.24.4/go.mod h1:iD9Qn7/brhKY9d0KLKMkZrqS8/bqxSxRKruBi7V6m+w= +github.com/tdewolff/parse/v2 v2.8.4 h1:A6slgBLGGDPBMGA28KQZfHpaKffuNvhOe7zSag+x/rw= +github.com/tdewolff/parse/v2 v2.8.4/go.mod h1:Hwlni2tiVNKyzR1o6nUs4FOF07URA+JLBLd6dlIXYqo= github.com/tdewolff/test v1.0.11 h1:FdLbwQVHxqG16SlkGveC0JVyrJN62COWTRyUFzfbtBE= github.com/tdewolff/test v1.0.11/go.mod h1:XPuWBzvdUzhCuxWO1ojpXsyzsA5bFoS3tO/Q3kFuTG8= github.com/temoto/robotstxt v1.1.2 h1:W2pOjSJ6SWvldyEuiFXNxz3xZ8aiWX5LbfDiOFd7Fxg= diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 8ef595dc4..b021cc137 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -37,7 +37,7 @@ type Scheduler struct { // Start attempts to start the scheduler. Returns false if already running. func (sch *Scheduler) Start() bool { - if sch.sch.Start(nil) { + if sch.sch.Start() { sch.ts = make(map[string]*task) return true } @@ -89,7 +89,7 @@ func (sch *Scheduler) schedule(id string, fn func(context.Context, time.Time), t panic("nil function") } - // Perform within lock. + // Acquire lock. sch.mu.Lock() defer sch.mu.Unlock() @@ -99,16 +99,14 @@ func (sch *Scheduler) schedule(id string, fn func(context.Context, time.Time), t return false } - // Extract current sched context. - doneCh := sch.sch.Done() - ctx := runners.CancelCtx(doneCh) + // Extract current scheduler context. + ctx := runners.CancelCtx(sch.sch.Done()) // Create a new job to hold task function with // timing, passing in the current sched context. job := sched.NewJob(func(now time.Time) { fn(ctx, now) - }) - job.With(t) + }).With(t) // Queue job with the scheduler, // and store a new encompassing task. diff --git a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go index a12b33ab9..12faa86b2 100644 --- a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go +++ b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go @@ -14,7 +14,7 @@ var scheduler sched.Scheduler func schedule(sweep func(time.Time), freq time.Duration) func() { if !scheduler.Running() { // ensure sched running - _ = scheduler.Start(nil) + _ = scheduler.Start() } return scheduler.Schedule(sched.NewJob(sweep).Every(freq)) } diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go index 12f7f1a10..e02dcab22 100644 --- a/vendor/codeberg.org/gruf/go-runners/context.go +++ b/vendor/codeberg.org/gruf/go-runners/context.go @@ -2,6 +2,7 @@ package runners import ( "context" + "sync/atomic" "time" ) @@ -19,9 +20,13 @@ func Closed() context.Context { // CtxWithCancel returns a new context.Context impl with cancel. func CtxWithCancel() (context.Context, context.CancelFunc) { + var once atomic.Uint32 ctx := make(chan struct{}) - cncl := func() { close(ctx) } - return CancelCtx(ctx), cncl + return CancelCtx(ctx), func() { + if once.CompareAndSwap(0, 1) { + close(ctx) + } + } } // CancelCtx is the simplest possible cancellable context. diff --git a/vendor/codeberg.org/gruf/go-runners/pointer.go b/vendor/codeberg.org/gruf/go-runners/pointer.go new file mode 100644 index 000000000..cc139309f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-runners/pointer.go @@ -0,0 +1,22 @@ +package runners + +import ( + "sync/atomic" + "unsafe" +) + +// atomic_pointer wraps an unsafe.Pointer with +// receiver methods for their atomic counterparts. +type atomic_pointer struct{ p unsafe.Pointer } + +func (p *atomic_pointer) Load() unsafe.Pointer { + return atomic.LoadPointer(&p.p) +} + +func (p *atomic_pointer) Store(ptr unsafe.Pointer) { + atomic.StorePointer(&p.p, ptr) +} + +func (p *atomic_pointer) CAS(old, new unsafe.Pointer) bool { + return atomic.CompareAndSwapPointer(&p.p, old, new) +} diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go deleted file mode 100644 index 644cde0b9..000000000 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ /dev/null @@ -1,292 +0,0 @@ -package runners - -import ( - "context" - "fmt" - "os" - "runtime" - "sync" - - "codeberg.org/gruf/go-errors/v2" -) - -// WorkerFunc represents a function processable by a worker in WorkerPool. Note -// that implementations absolutely MUST check whether passed context is <-ctx.Done() -// otherwise stopping the pool may block indefinitely. -type WorkerFunc func(context.Context) - -// WorkerPool provides a means of enqueuing asynchronous work. -type WorkerPool struct { - fns chan WorkerFunc - svc Service -} - -// Start will start the main WorkerPool management loop in a new goroutine, along -// with requested number of child worker goroutines. Returns false if already running. -func (pool *WorkerPool) Start(workers int, queue int) bool { - // Attempt to start the svc - ctx, ok := pool.svc.doStart() - if !ok { - return false - } - - if workers <= 0 { - // Use $GOMAXPROCS as default. - workers = runtime.GOMAXPROCS(0) - } - - if queue < 0 { - // Use reasonable queue default. - queue = workers * 10 - } - - // Allocate pool queue of given size. - // - // This MUST be set BEFORE we return and NOT in - // the launched goroutine, or there is a risk that - // the pool may appear as closed for a short time - // until the main goroutine has been entered. - fns := make(chan WorkerFunc, queue) - pool.fns = fns - - go func() { - defer func() { - // unlock single wait - pool.svc.wait.Unlock() - - // ensure stopped - pool.svc.Stop() - }() - - var wait sync.WaitGroup - - // Start goroutine worker functions - for i := 0; i < workers; i++ { - wait.Add(1) - - go func() { - defer wait.Done() - - // Run worker function (retry on panic) - for !worker_run(CancelCtx(ctx), fns) { - } - }() - } - - // Wait on ctx - <-ctx - - // Drain function queue. - // - // All functions in the queue MUST be - // run, so we pass them a closed context. - // - // This mainly allows us to block until - // the function queue is empty, as worker - // functions will also continue draining in - // the background with the (now) closed ctx. - for !drain_queue(fns) { - // retry on panic - } - - // Now the queue is empty, we can - // safely close the channel signalling - // all of the workers to return. - close(fns) - wait.Wait() - }() - - return true -} - -// Stop will stop the WorkerPool management loop, blocking until stopped. -func (pool *WorkerPool) Stop() bool { - return pool.svc.Stop() -} - -// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping). -func (pool *WorkerPool) Running() bool { - return pool.svc.Running() -} - -// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions. -func (pool *WorkerPool) Done() <-chan struct{} { - return pool.svc.Done() -} - -// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker. -// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be -// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx. -// WorkerFuncs MUST respect the passed context. -func (pool *WorkerPool) Enqueue(fn WorkerFunc) { - // Check valid fn - if fn == nil { - return - } - - select { - // Pool ctx cancelled - case <-pool.svc.Done(): - fn(closedctx) - - // Placed fn in queue - case pool.fns <- fn: - } -} - -// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the -// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc. -func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool { - // Check valid fn - if fn == nil { - return false - } - - select { - // Caller ctx cancelled - case <-ctx.Done(): - return false - - // Pool ctx cancelled - case <-pool.svc.Done(): - return false - - // Placed fn in queue - case pool.fns <- fn: - return true - } -} - -// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case -// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue(). -// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed. -func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) { - // Check valid fn - if fn == nil { - return false - } - - select { - case <-ctx.Done(): - // We failed to add this entry to the worker queue before the - // incoming context was cancelled. So to ensure processing - // we simply queue it asynchronously and return early to caller. - go pool.Enqueue(fn) - return false - - case <-pool.svc.Done(): - // Pool ctx cancelled - fn(closedctx) - return false - - case pool.fns <- fn: - // Placed fn in queue - return true - } -} - -// EnqueueNow attempts Enqueue but returns false if not executed. -func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { - // Check valid fn - if fn == nil { - return false - } - - select { - // Pool ctx cancelled - case <-pool.svc.Done(): - return false - - // Placed fn in queue - case pool.fns <- fn: - return true - - // Queue is full - default: - return false - } -} - -// Queue returns the number of currently queued WorkerFuncs. -func (pool *WorkerPool) Queue() int { - var l int - pool.svc.While(func() { - l = len(pool.fns) - }) - return l -} - -// worker_run is the main worker routine, accepting functions from 'fns' until it is closed. -func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool { - defer func() { - // Recover and drop any panic - if r := recover(); r != nil { - - // Gather calling func frames. - pcs := make([]uintptr, 10) - n := runtime.Callers(3, pcs) - i := runtime.CallersFrames(pcs[:n]) - c := gatherFrames(i, n) - - const msg = "worker_run: recovered panic: %v\n\n%s\n" - fmt.Fprintf(os.Stderr, msg, r, c.String()) - } - }() - - for { - // Wait on next func - fn, ok := <-fns - if !ok { - return true - } - - // Run with ctx - fn(ctx) - } -} - -// drain_queue will drain and run all functions in worker queue, passing in a closed context. -func drain_queue(fns <-chan WorkerFunc) bool { - defer func() { - // Recover and drop any panic - if r := recover(); r != nil { - - // Gather calling func frames. - pcs := make([]uintptr, 10) - n := runtime.Callers(3, pcs) - i := runtime.CallersFrames(pcs[:n]) - c := gatherFrames(i, n) - - const msg = "worker_run: recovered panic: %v\n\n%s\n" - fmt.Fprintf(os.Stderr, msg, r, c.String()) - } - }() - - for { - select { - // Run with closed ctx - case fn := <-fns: - fn(closedctx) - - // Queue is empty - default: - return true - } - } -} - -// gatherFrames collates runtime frames from a frame iterator. -func gatherFrames(iter *runtime.Frames, n int) errors.Callers { - if iter == nil { - return nil - } - frames := make([]runtime.Frame, 0, n) - for { - f, ok := iter.Next() - if !ok { - break - } - frames = append(frames, f) - } - return frames -} diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go index ca39ac0d0..3feeff258 100644 --- a/vendor/codeberg.org/gruf/go-runners/process.go +++ b/vendor/codeberg.org/gruf/go-runners/process.go @@ -2,80 +2,72 @@ package runners import ( "fmt" + "unsafe" + "sync" ) -// Processable defines a runnable process with error return -// that can be passed to a Processor instance for managed running. -type Processable func() error - // Processor acts similarly to a sync.Once object, except that it is reusable. After // the first call to Process(), any further calls before this first has returned will // block until the first call has returned, and return the same error. This ensures // that only a single instance of it is ever running at any one time. -type Processor struct { - mutex sync.Mutex - wait *sync.WaitGroup - err *error -} +type Processor struct{ p atomic_pointer } // Process will process the given function if first-call, else blocking until // the first function has returned, returning the same error result. -func (p *Processor) Process(proc Processable) (err error) { - // Acquire state lock. - p.mutex.Lock() - - if p.wait != nil { - // Already running. - // - // Get current ptrs. - waitPtr := p.wait - errPtr := p.err - - // Free state lock. - p.mutex.Unlock() - - // Wait for finish. - waitPtr.Wait() - return *errPtr - } - - // Alloc waiter for new process. - var wait sync.WaitGroup - - // No need to alloc new error as - // we use the alloc'd named error - // return required for panic handling. - - // Reset ptrs. - p.wait = &wait - p.err = &err - - // Set started. - wait.Add(1) - p.mutex.Unlock() - - defer func() { - if r := recover(); r != nil { - if err != nil { - rOld := r // wrap the panic so we don't lose existing returned error - r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) - } - - // Catch any panics and wrap as error. - err = fmt.Errorf("caught panic: %v", r) +func (p *Processor) Process(proc func() error) (err error) { + var i *proc_instance + + for { + // Attempt to load existing instance. + ptr := (*proc_instance)(p.p.Load()) + if ptr != nil { + + // Wait on existing. + ptr.wait.Wait() + err = ptr.err + return } - // Mark done. - wait.Done() + if i == nil { + // Allocate instance. + i = new(proc_instance) + i.wait.Add(1) + } - // Set stopped. - p.mutex.Lock() - p.wait = nil - p.mutex.Unlock() - }() + // Try to acquire start slot by + // setting ptr to *our* instance. + if p.p.CAS(nil, unsafe.Pointer(i)) { + defer func() { + if r := recover(); r != nil { + if i.err != nil { + rOld := r // wrap the panic so we don't lose existing returned error + r = fmt.Errorf("panic occured after error %q: %v", i.err.Error(), rOld) + } + + // Catch panics and wrap as error return. + i.err = fmt.Errorf("caught panic: %v", r) + } + + // Set return. + err = i.err + + // Release the + // goroutines. + i.wait.Done() + + // Free processor. + p.p.Store(nil) + }() + + // Run func. + i.err = proc() + return + } + } +} - // Run process. - err = proc() - return +type proc_instance struct { + wait sync.WaitGroup + err error } diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index 8a7c0051a..fe41807f9 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -3,215 +3,220 @@ package runners import ( "context" "sync" + "sync/atomic" + "unsafe" ) -// Service provides a means of tracking a single long-running service, provided protected state -// changes and preventing multiple instances running. Also providing service state information. -type Service struct { - state uint32 // 0=stopped, 1=running, 2=stopping - mutex sync.Mutex // mutex protects overall state changes - wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' - ctx chan struct{} // ctx is the current context for running function (or nil if not running) -} +// Service provides a means of tracking a single long-running Service, provided protected state +// changes and preventing multiple instances running. Also providing Service state information. +type Service struct{ p atomic_pointer } // Run will run the supplied function until completion, using given context to propagate cancel. // Immediately returns false if the Service is already running, and true after completed run. -func (svc *Service) Run(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) Run(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) - - return true + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return } // GoRun will run the supplied function until completion in a goroutine, using given context to // propagate cancel. Immediately returns boolean indicating success, or that service is already running. -func (svc *Service) GoRun(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) GoRun(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } go func() { - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) }() - return true + return } // RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns. -func (svc *Service) RunWait(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) RunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - <-ctx // block - return false + <-ptr.done + return } - defer func() { - // unlock single wait - svc.wait.Unlock() + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return +} - // ensure stopped - _ = svc.Stop() - }() +// GoRunWait is functionally the same as .RunWait(), but blocks until the first instance of RunWait() returns. +func (svc *Service) GoRunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance - // Run with context. - fn(CancelCtx(ctx)) + // Attempt to start. + ptr, ok = svc.start() + if !ok { + <-ptr.done + return + } + + go func() { + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + }() - return true + return } // Stop will attempt to stop the service, cancelling the running function's context. Immediately // returns false if not running, and true only after Service is fully stopped. func (svc *Service) Stop() bool { - // Attempt to stop the svc - ctx, ok := svc.doStop() - if !ok { - return false - } - - defer func() { - // Get svc lock - svc.mutex.Lock() - - // Wait until stopped - svc.wait.Lock() - svc.wait.Unlock() + return svc.must_get().stop() +} - // Reset the svc - svc.ctx = nil - svc.state = 0 - svc.mutex.Unlock() - }() +// Running returns if Service is running (i.e. NOT fully stopped, but may be *stopping*). +func (svc *Service) Running() bool { + return svc.must_get().running() +} - // Cancel ctx - close(ctx) +// Done returns a channel that's closed when Service.Stop() is called. It is +// the same channel provided to the currently running service function. +func (svc *Service) Done() <-chan struct{} { + return svc.must_get().done +} - return true +func (svc *Service) start() (*svc_instance, bool) { + ptr := svc.must_get() + return ptr, ptr.start() } -// While allows you to execute given function guaranteed within current -// service state. Please note that this will hold the underlying service -// state change mutex open while executing the function. -func (svc *Service) While(fn func()) { - // Protect state change - svc.mutex.Lock() - defer svc.mutex.Unlock() +func (svc *Service) on_done(ptr *svc_instance) { + // Ensure stopped. + ptr.stop_private() - // Run - fn() + // Free service. + svc.p.Store(nil) } -// doStart will safely set Service state to started, returning a ptr to this context insance. -func (svc *Service) doStart() (chan struct{}, bool) { - // Protect startup - svc.mutex.Lock() +func (svc *Service) must_get() *svc_instance { + var newptr *svc_instance - if svc.ctx == nil { - // this will only have been allocated - // if svc.Done() was already called. - svc.ctx = make(chan struct{}) - } - - // Take our own ptr - ctx := svc.ctx + for { + // Try to load existing instance. + ptr := (*svc_instance)(svc.p.Load()) + if ptr != nil { + return ptr + } - if svc.state != 0 { - // State was not stopped. - svc.mutex.Unlock() - return ctx, false - } + if newptr == nil { + // Allocate new instance. + newptr = new(svc_instance) + newptr.done = make(chan struct{}) + } - // Set started. - svc.state = 1 + // Attempt to acquire slot by setting our ptr. + if !svc.p.CAS(nil, unsafe.Pointer(newptr)) { + continue + } - // Start waiter. - svc.wait.Lock() + return newptr + } +} - // Unlock and return - svc.mutex.Unlock() - return ctx, true +type svc_instance struct { + wait sync.WaitGroup + done chan struct{} + state atomic.Uint32 } -// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance. -func (svc *Service) doStop() (chan struct{}, bool) { - // Protect stop - svc.mutex.Lock() +const ( + started_bit = uint32(1) << 0 + stopping_bit = uint32(1) << 1 + finished_bit = uint32(1) << 2 +) - if svc.state != 1 /* not started */ { - svc.mutex.Unlock() - return nil, false - } +func (i *svc_instance) start() (ok bool) { + // Acquire start by setting 'started' bit. + switch old := i.state.Or(started_bit); { - // state stopping - svc.state = 2 + case old&finished_bit != 0: + // Already finished. - // Take our own ptr - // and unlock state - ctx := svc.ctx - svc.mutex.Unlock() + case old&started_bit == 0: + // Successfully started! + i.wait.Add(1) + ok = true + } - return ctx, true + return } -// Running returns if Service is running (i.e. state NOT stopped / stopping). -func (svc *Service) Running() bool { - svc.mutex.Lock() - state := svc.state - svc.mutex.Unlock() - return (state == 1) -} +// NOTE: MAY ONLY BE CALLED BY STARTING GOROUTINE. +func (i *svc_instance) stop_private() { + // Attempt set both stopping and finished bits. + old := i.state.Or(stopping_bit | finished_bit) -// Done returns a channel that's closed when Service.Stop() is called. It is -// the same channel provided to the currently running service function. -func (svc *Service) Done() <-chan struct{} { - var done <-chan struct{} - - svc.mutex.Lock() - switch svc.state { - // stopped - case 0: - if svc.ctx == nil { - // here we create a new context so that the - // returned 'done' channel here will still - // be valid for when Service is next started. - svc.ctx = make(chan struct{}) - } - done = svc.ctx + // Only if we weren't already + // stopping do we close channel. + if old&stopping_bit == 0 { + close(i.done) + } - // started - case 1: - done = svc.ctx + // Release + // waiters. + i.wait.Done() +} - // stopping - case 2: - done = svc.ctx +func (i *svc_instance) stop() (ok bool) { + // Attempt to set the 'stopping' bit. + switch old := i.state.Or(stopping_bit); { + + case old&finished_bit != 0: + // Already finished. + return + + case old&started_bit == 0: + // This was never started + // to begin with, just mark + // as fully finished here. + _ = i.state.Or(finished_bit) + return + + case old&stopping_bit == 0: + // We succesfully stopped + // instance, close channel. + close(i.done) + ok = true } - svc.mutex.Unlock() - return done + // Wait on stop. + i.wait.Wait() + return +} + +// running returns whether service was started and +// is not yet finished. that indicates that it may +// have been started and not yet stopped, or that +// it was started, stopped and not yet returned. +func (i *svc_instance) running() bool { + val := i.state.Load() + return val&started_bit != 0 && + val&finished_bit == 0 } diff --git a/vendor/codeberg.org/gruf/go-sched/README.md b/vendor/codeberg.org/gruf/go-sched/README.md index d32a961ae..0a9439577 100644 --- a/vendor/codeberg.org/gruf/go-sched/README.md +++ b/vendor/codeberg.org/gruf/go-sched/README.md @@ -2,4 +2,4 @@ A simple job (both run-once and recurring) queueing library with down-to millisecond precision. -Precision estimates based on test output (running on i7-11800h): 1ms precision with 80% tolerance.
\ No newline at end of file +Precision estimates based on test output (running on AMD Ryzen 7 7840u): 2ms precision with 95% tolerance. diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go index 2531769d6..f3bd869d2 100644 --- a/vendor/codeberg.org/gruf/go-sched/job.go +++ b/vendor/codeberg.org/gruf/go-sched/job.go @@ -14,10 +14,9 @@ import ( // holding onto a next execution time safely in a concurrent environment. type Job struct { id uint64 - next unsafe.Pointer // *time.Time + next atomic_time timing Timing call func(time.Time) - panic func(interface{}) } // NewJob returns a new Job to run given function. @@ -30,28 +29,31 @@ func NewJob(fn func(now time.Time)) *Job { j := &Job{ // set defaults timing: emptytiming, // i.e. fire immediately call: fn, - panic: func(i interface{}) { panic(i) }, } return j } -// At sets this Job to execute at time, by passing (*sched.Once)(&at) to .With(). See .With() for details. +// At sets this Job to execute at time, by passing +// (*sched.Once)(&at) to .With(). See .With() for details. func (job *Job) At(at time.Time) *Job { return job.With((*Once)(&at)) } -// Every sets this Job to execute every period, by passing sched.Period(period) to .With(). See .With() for details. +// Every sets this Job to execute every period, by passing +// sched.Period(period) to .With(). See .With() for details. func (job *Job) Every(period time.Duration) *Job { return job.With(Periodic(period)) } -// EveryAt sets this Job to execute every period starting at time, by passing &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details. +// EveryAt sets this Job to execute every period starting at time, by passing +// &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details. func (job *Job) EveryAt(at time.Time, period time.Duration) *Job { return job.With(&PeriodicAt{Once: Once(at), Period: Periodic(period)}) } -// With sets this Job's timing to given implementation, or if already set will wrap existing using sched.TimingWrap{}. +// With sets this Job's timing to given implementation, or +// if already set will wrap existing using sched.TimingWrap{}. func (job *Job) With(t Timing) *Job { if t == nil { // Ensure a timing @@ -78,44 +80,16 @@ func (job *Job) With(t Timing) *Job { return job } -// OnPanic specifies how this job handles panics, default is an actual panic. -func (job *Job) OnPanic(fn func(interface{})) *Job { - if fn == nil { - // Ensure a function - panic("nil func") - } - - if job.id != 0 { - // Cannot update scheduled job - panic("job already scheduled") - } - - job.panic = fn - return job -} - // Next returns the next time this Job is expected to run. func (job *Job) Next() time.Time { - return loadTime(&job.next) + return job.next.Load() } // Run will execute this Job and pass through given now time. -func (job *Job) Run(now time.Time) { - defer func() { - switch r := recover(); { - case r == nil: - // no panic - case job != nil && - job.panic != nil: - job.panic(r) - default: - panic(r) - } - }() - job.call(now) -} +func (job *Job) Run(now time.Time) { job.call(now) } -// String provides a debuggable string representation of Job including ID, next time and Timing type. +// String provides a debuggable string representation +// of Job including ID, next time and Timing type. func (job *Job) String() string { var buf strings.Builder buf.WriteByte('{') @@ -123,7 +97,7 @@ func (job *Job) String() string { buf.WriteString(strconv.FormatUint(job.id, 10)) buf.WriteByte(' ') buf.WriteString("next=") - buf.WriteString(loadTime(&job.next).Format(time.StampMicro)) + buf.WriteString(job.next.Load().Format(time.StampMicro)) buf.WriteByte(' ') buf.WriteString("timing=") buf.WriteString(reflect.TypeOf(job.timing).String()) @@ -131,13 +105,15 @@ func (job *Job) String() string { return buf.String() } -func loadTime(p *unsafe.Pointer) time.Time { - if p := atomic.LoadPointer(p); p != nil { +type atomic_time struct{ p unsafe.Pointer } + +func (t *atomic_time) Load() time.Time { + if p := atomic.LoadPointer(&t.p); p != nil { return *(*time.Time)(p) } return zerotime } -func storeTime(p *unsafe.Pointer, t time.Time) { - atomic.StorePointer(p, unsafe.Pointer(&t)) +func (t *atomic_time) Store(v time.Time) { + atomic.StorePointer(&t.p, unsafe.Pointer(&v)) } diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go index 79913a9b3..9ab5b8bc8 100644 --- a/vendor/codeberg.org/gruf/go-sched/scheduler.go +++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go @@ -6,18 +6,22 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "codeberg.org/gruf/go-runners" ) -// precision is the maximum time we can offer scheduler run-time precision down to. -const precision = time.Millisecond +// Precision is the maximum time we can +// offer scheduler run-time precision down to. +const Precision = 2 * time.Millisecond var ( - // neverticks is a timer channel that never ticks (it's starved). + // neverticks is a timer channel + // that never ticks (it's starved). neverticks = make(chan time.Time) - // alwaysticks is a timer channel that always ticks (it's closed). + // alwaysticks is a timer channel + // that always ticks (it's closed). alwaysticks = func() chan time.Time { ch := make(chan time.Time) close(ch) @@ -28,48 +32,51 @@ var ( // Scheduler provides a means of running jobs at specific times and // regular intervals, all while sharing a single underlying timer. type Scheduler struct { - jobs []*Job // jobs is a list of tracked Jobs to be executed - jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs - svc runners.Service // svc manages the main scheduler routine - jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs - rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs + svc runners.Service // svc manages the main scheduler routine + jobs []*Job // jobs is a list of tracked Jobs to be executed + jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs + jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs } -// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run. -func (sch *Scheduler) Start(gorun func(func())) bool { - var block sync.Mutex +// Start will attempt to start the Scheduler. Immediately returns false +// if the Service is already running, and true after completed run. +func (sch *Scheduler) Start() bool { + var wait sync.WaitGroup - // Use mutex to synchronize between started + // Use waiter to synchronize between started // goroutine and ourselves, to ensure that // we don't return before Scheduler init'd. - block.Lock() - defer block.Unlock() + wait.Add(1) ok := sch.svc.GoRun(func(ctx context.Context) { - // Create Scheduler job channel - sch.jch = make(chan interface{}) - - // Set goroutine runner function - if sch.rgo = gorun; sch.rgo == nil { - sch.rgo = func(f func()) { go f() } - } - - // Unlock start routine - block.Unlock() - - // Enter main loop - sch.run(ctx) + // Prepare new channel. + ch := new(channel) + ch.ctx = ctx.Done() + ch.ch = make(chan interface{}) + sch.jch.Store(ch) + + // Release + // start fn + wait.Done() + + // Main loop + sch.run(ch) }) if ok { - // Wait on goroutine - block.Lock() + // Wait on + // goroutine + wait.Wait() + } else { + // Release + wait.Done() } return ok } -// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped. +// Stop will attempt to stop the Scheduler. Immediately returns false +// if not running, and true only after Scheduler is fully stopped. func (sch *Scheduler) Stop() bool { return sch.svc.Stop() } @@ -86,45 +93,38 @@ func (sch *Scheduler) Done() <-chan struct{} { // Schedule will add provided Job to the Scheduler, returning a cancel function. func (sch *Scheduler) Schedule(job *Job) (cancel func()) { - switch { - // Check a job was passed - case job == nil: + if job == nil { panic("nil job") - - // Check we are running - case !sch.Running(): - panic("scheduler not running") } - // Calculate next job ID - last := sch.jid.Load() - next := sch.jid.Add(1) - if next < last { - panic("job id overflow") + // Load job channel. + ch := sch.jch.Load() + if ch == nil { + panic("not running") } - // Pass job to scheduler - job.id = next - sch.jch <- job + // Calculate next job ID. + job.id = sch.jid.Add(1) - // Take ptrs to current state chs - ctx := sch.svc.Done() - jch := sch.jch - - // Return cancel function for job ID - return func() { - select { - // Sched stopped - case <-ctx: - - // Cancel this job - case jch <- next: - } + // Pass job + // to channel. + if !ch.w(job) { + panic("not running") } + + // Return cancel function for job + return func() { ch.w(job.id) } } // run is the main scheduler run routine, which runs for as long as ctx is valid. -func (sch *Scheduler) run(ctx context.Context) { +func (sch *Scheduler) run(ch *channel) { + defer ch.close() + if ch == nil { + panic("nil channel") + } else if sch == nil { + panic("nil scheduler") + } + var ( // now stores the current time, and will only be // set when the timer channel is set to be the @@ -165,39 +165,43 @@ func (sch *Scheduler) run(ctx context.Context) { // Get now time. now = time.Now() - // Sort jobs by next occurring. + // Sort by next occurring. sort.Sort(byNext(sch.jobs)) // Get next job time. next := sch.jobs[0].Next() - // If this job is _just_ about to be ready, we don't bother + // If this job is *just* about to be ready, we don't bother // sleeping. It's wasted cycles only sleeping for some obscenely // tiny amount of time we can't guarantee precision for. - if until := next.Sub(now); until <= precision/1e3 { + if until := next.Sub(now); until <= Precision/1e3 { + // This job is behind, // set to always tick. tch = alwaysticks } else { + // Reset timer to period. timer.Reset(until) - tch = timer.C timerset = true + tch = timer.C } } else { + // Unset timer tch = neverticks } select { // Scheduler stopped - case <-ctx.Done(): + case <-ch.done(): stopdrain() return - // Timer ticked, run scheduled - case t := <-tch: - if !timerset { + // Timer ticked, + // run scheduled. + case t, ok := <-tch: + if !ok { // 'alwaysticks' returns zero // times, BUT 'now' will have // been set during above sort. @@ -205,8 +209,9 @@ func (sch *Scheduler) run(ctx context.Context) { } sch.schedule(t) - // Received update, handle job/id - case v := <-sch.jch: + // Received update, + // handle job/id. + case v := <-ch.r(): sch.handle(v) stopdrain() } @@ -220,21 +225,21 @@ func (sch *Scheduler) handle(v interface{}) { switch v := v.(type) { // New job added case *Job: - // Get current time + // Get current time. now := time.Now() - // Update the next call time + // Update next call time. next := v.timing.Next(now) - storeTime(&v.next, next) + v.next.Store(next) - // Append this job to queued + // Append this job to queued/ sch.jobs = append(sch.jobs, v) // Job removed case uint64: for i := 0; i < len(sch.jobs); i++ { if sch.jobs[i].id == v { - // This is the job we're looking for! Drop this + // This is the job we're looking for! Drop this. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) return } @@ -242,29 +247,28 @@ func (sch *Scheduler) handle(v interface{}) { } } -// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time. +// schedule will iterate through the scheduler jobs and +// execute those necessary, updating their next call time. func (sch *Scheduler) schedule(now time.Time) { for i := 0; i < len(sch.jobs); { - // Scope our own var + // Scope our own var. job := sch.jobs[i] // We know these jobs are ordered by .Next(), so as soon - // as we reach one with .Next() after now, we can return + // as we reach one with .Next() after now, we can return. if job.Next().After(now) { return } - // Pass to runner - sch.rgo(func() { - job.Run(now) - }) + // Run the job. + go job.Run(now) - // Update the next call time + // Update the next call time. next := job.timing.Next(now) - storeTime(&job.next, next) + job.next.Store(next) if next.IsZero() { - // Zero time, this job is done and can be dropped + // Zero time, this job is done and can be dropped. sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...) continue } @@ -274,7 +278,8 @@ func (sch *Scheduler) schedule(now time.Time) { } } -// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time. +// byNext is an implementation of sort.Interface +// to sort Jobs by their .Next() time. type byNext []*Job func (by byNext) Len() int { @@ -288,3 +293,51 @@ func (by byNext) Less(i int, j int) bool { func (by byNext) Swap(i int, j int) { by[i], by[j] = by[j], by[i] } + +// atomic_channel wraps a *channel{} with atomic store / load. +type atomic_channel struct{ p unsafe.Pointer } + +func (c *atomic_channel) Load() *channel { + if p := atomic.LoadPointer(&c.p); p != nil { + return (*channel)(p) + } + return nil +} + +func (c *atomic_channel) Store(v *channel) { + atomic.StorePointer(&c.p, unsafe.Pointer(v)) +} + +// channel wraps both a context done +// channel and a generic interface channel +// to support safe writing to an underlying +// channel that correctly fails after close. +type channel struct { + ctx <-chan struct{} + ch chan interface{} +} + +// done returns internal context channel. +func (ch *channel) done() <-chan struct{} { + return ch.ctx +} + +// r returns internal channel for read. +func (ch *channel) r() chan interface{} { + return ch.ch +} + +// w writes 'v' to channel, or returns false if closed. +func (ch *channel) w(v interface{}) bool { + select { + case <-ch.ctx: + return false + case ch.ch <- v: + return true + } +} + +// close closes underlying channel. +func (ch *channel) close() { + close(ch.ch) +} diff --git a/vendor/codeberg.org/gruf/go-sched/timing.go b/vendor/codeberg.org/gruf/go-sched/timing.go index 33c230fa5..cb9b4925a 100644 --- a/vendor/codeberg.org/gruf/go-sched/timing.go +++ b/vendor/codeberg.org/gruf/go-sched/timing.go @@ -5,11 +5,13 @@ import ( ) var ( - // zerotime is zero time.Time (unix epoch). - zerotime = time.Time{} + // zerotime is zero + // time.Time (unix epoch). + zerotime time.Time - // emptytiming is a global timingempty to check against. - emptytiming = timingempty{} + // emptytiming is a global + // timingempty to check against. + emptytiming timingempty ) // Timing provides scheduling for a Job, determining the next time @@ -20,14 +22,16 @@ type Timing interface { Next(time.Time) time.Time } -// timingempty is a 'zero' Timing implementation that always returns zero time. +// timingempty is a 'zero' Timing implementation +// that always returns zero time. type timingempty struct{} func (timingempty) Next(time.Time) time.Time { return zerotime } -// Once implements Timing to provide a run-once Job execution. +// Once implements Timing to +// provide a run-once Job execution. type Once time.Time func (o *Once) Next(time.Time) time.Time { @@ -36,14 +40,16 @@ func (o *Once) Next(time.Time) time.Time { return ret } -// Periodic implements Timing to provide a recurring Job execution. +// Periodic implements Timing to +// provide a recurring Job execution. type Periodic time.Duration func (p Periodic) Next(now time.Time) time.Time { return now.Add(time.Duration(p)) } -// PeriodicAt implements Timing to provide a recurring Job execution starting at 'Once' time. +// PeriodicAt implements Timing to provide a +// recurring Job execution starting at 'Once' time. type PeriodicAt struct { Once Once Period Periodic @@ -56,7 +62,8 @@ func (p *PeriodicAt) Next(now time.Time) time.Time { return p.Period.Next(now) } -// TimingWrap allows combining two different Timing implementations. +// TimingWrap allows combining two +// different Timing implementations. type TimingWrap struct { Outer Timing Inner Timing diff --git a/vendor/github.com/tdewolff/parse/v2/binary.go b/vendor/github.com/tdewolff/parse/v2/binary.go index cf4f91d4a..721864d12 100644 --- a/vendor/github.com/tdewolff/parse/v2/binary.go +++ b/vendor/github.com/tdewolff/parse/v2/binary.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "sync" ) const PageSize = 4096 @@ -45,17 +46,15 @@ func (r *binaryReaderFile) Len() int64 { } func (r *binaryReaderFile) Bytes(b []byte, n, off int64) ([]byte, error) { - if _, err := r.f.Seek(off, 0); err != nil { - return nil, err - } else if b == nil { + if b == nil { b = make([]byte, n) } - - m, err := r.f.Read(b) - if err != nil { - return nil, err + if m, err := r.f.ReadAt(b, off); err != nil { + return b[:m], err + } else if off+int64(m) == r.size { + return b[:m], io.EOF } else if int64(m) != n { - return nil, errors.New("file: could not read all bytes") + return b[:m], errors.New("file: could not read all bytes") } return b, nil } @@ -79,16 +78,22 @@ func (r *binaryReaderBytes) Len() int64 { } func (r *binaryReaderBytes) Bytes(b []byte, n, off int64) ([]byte, error) { - if off < 0 || n < 0 || int64(len(r.data)) < off || int64(len(r.data))-off < n { + var err error + if off < 0 || n < 0 { return nil, fmt.Errorf("bytes: invalid range %d--%d", off, off+n) + } else if int64(len(r.data)) <= off { + return nil, io.EOF + } else if int64(len(r.data))-off <= n { + n = int64(len(r.data)) - off + err = io.EOF } data := r.data[off : off+n : off+n] if b == nil { - return data, nil + return data, err } copy(b, data) - return b, nil + return b[:len(data)], err } type binaryReaderReader struct { @@ -96,12 +101,13 @@ type binaryReaderReader struct { size int64 readerAt bool seeker bool + mu sync.Mutex } func newBinaryReaderReader(r io.Reader, n int64) *binaryReaderReader { _, readerAt := r.(io.ReaderAt) _, seeker := r.(io.Seeker) - return &binaryReaderReader{r, n, readerAt, seeker} + return &binaryReaderReader{r, n, readerAt, seeker, sync.Mutex{}} } // Close closes the reader. @@ -124,23 +130,30 @@ func (r *binaryReaderReader) Bytes(b []byte, n, off int64) ([]byte, error) { // seeker seems faster than readerAt by 10% if r.seeker { + r.mu.Lock() if _, err := r.r.(io.Seeker).Seek(off, 0); err != nil { + r.mu.Unlock() return nil, err } m, err := r.r.Read(b) + r.mu.Unlock() if err != nil { - return nil, err + return b[:m], err + } else if off+int64(m) == r.size { + return b[:m], io.EOF } else if int64(m) != n { - return nil, errors.New("file: could not read all bytes") + return b[:m], errors.New("reader: could not read all bytes") } return b, nil } else if r.readerAt { m, err := r.r.(io.ReaderAt).ReadAt(b, off) if err != nil { - return nil, err + return b[:m], err + } else if off+int64(m) == r.size { + return b[:m], io.EOF } else if int64(m) != n { - return nil, errors.New("file: could not read all bytes") + return b[:m], errors.New("reader: could not read all bytes") } return b, nil } @@ -197,12 +210,8 @@ func (r *BinaryReader) IBinaryReader() IBinaryReader { } func (r *BinaryReader) Clone() *BinaryReader { - f := r.f - if cloner, ok := f.(interface{ Clone() IBinaryReader }); ok { - f = cloner.Clone() - } return &BinaryReader{ - f: f, + f: r.f, pos: r.pos, err: r.err, ByteOrder: r.ByteOrder, @@ -262,9 +271,6 @@ func (r *BinaryReader) Seek(off int64, whence int) (int64, error) { // Read complies with io.Reader. func (r *BinaryReader) Read(b []byte) (int, error) { data, err := r.f.Bytes(b, int64(len(b)), r.pos) - if err != nil && err != io.EOF { - return 0, err - } r.pos += int64(len(data)) return len(data), err } @@ -272,20 +278,14 @@ func (r *BinaryReader) Read(b []byte) (int, error) { // ReadAt complies with io.ReaderAt. func (r *BinaryReader) ReadAt(b []byte, off int64) (int, error) { data, err := r.f.Bytes(b, int64(len(b)), off) - if err != nil && err != io.EOF { - return 0, err - } return len(data), err } // ReadBytes reads n bytes. func (r *BinaryReader) ReadBytes(n int64) []byte { data, err := r.f.Bytes(nil, n, r.pos) - if err != nil { - r.err = err - return nil - } - r.pos += n + r.pos += int64(len(data)) + r.err = err return data } diff --git a/vendor/github.com/tdewolff/parse/v2/binary_unix.go b/vendor/github.com/tdewolff/parse/v2/binary_unix.go index 4a8979fda..25ec83dbc 100644 --- a/vendor/github.com/tdewolff/parse/v2/binary_unix.go +++ b/vendor/github.com/tdewolff/parse/v2/binary_unix.go @@ -5,8 +5,8 @@ package parse import ( "errors" "fmt" + "io" "os" - "runtime" "syscall" ) @@ -25,6 +25,8 @@ func newBinaryReaderMmap(filename string) (*binaryReaderMmap, error) { info, err := f.Stat() if err != nil { return nil, err + } else if !info.Mode().IsRegular() { + return nil, fmt.Errorf("mmap: not a regular file: %v", filename) } size := info.Size() @@ -48,7 +50,7 @@ func newBinaryReaderMmap(filename string) (*binaryReaderMmap, error) { return nil, err } r := &binaryReaderMmap{data, size} - runtime.SetFinalizer(r, (*binaryReaderMmap).Close) + //runtime.SetFinalizer(r, (*binaryReaderMmap).Close) return r, nil } @@ -62,7 +64,7 @@ func (r *binaryReaderMmap) Close() error { } data := r.data r.data = nil - runtime.SetFinalizer(r, nil) + //runtime.SetFinalizer(r, nil) return syscall.Munmap(data) } @@ -72,18 +74,24 @@ func (r *binaryReaderMmap) Len() int64 { } func (r *binaryReaderMmap) Bytes(b []byte, n, off int64) ([]byte, error) { + var err error if r.data == nil { return nil, errors.New("mmap: closed") - } else if off < 0 || n < 0 || int64(len(r.data)) < off || int64(len(r.data))-off < n { + } else if off < 0 || n < 0 { return nil, fmt.Errorf("mmap: invalid range %d--%d", off, off+n) + } else if int64(len(r.data)) <= off { + return nil, io.EOF + } else if int64(len(r.data))-off <= n { + n = int64(len(r.data)) - off + err = io.EOF } data := r.data[off : off+n : off+n] if b == nil { - return data, nil + return data, err } copy(b, data) - return b, nil + return b[:len(data)], err } func NewBinaryReaderMmap(filename string) (*BinaryReader, error) { diff --git a/vendor/github.com/tdewolff/parse/v2/html/lex.go b/vendor/github.com/tdewolff/parse/v2/html/lex.go index e44a77ce8..a1954ff59 100644 --- a/vendor/github.com/tdewolff/parse/v2/html/lex.go +++ b/vendor/github.com/tdewolff/parse/v2/html/lex.go @@ -399,10 +399,12 @@ func (l *Lexer) shiftStartTag() (TokenType, []byte) { func (l *Lexer) shiftAttribute() []byte { nameStart := l.r.Pos() var c byte - if 0 < len(l.tmplBegin) && l.at(l.tmplBegin...) { - l.r.Move(len(l.tmplBegin)) - l.moveTemplate() - l.hasTmpl = true + if 0 < len(l.tmplBegin) { + for l.at(l.tmplBegin...) { + l.r.Move(len(l.tmplBegin)) + l.moveTemplate() + l.hasTmpl = true + } } for { // attribute name state if c = l.r.Peek(0); c == ' ' || c == '=' || c == '>' || c == '/' && l.r.Peek(1) == '>' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == 0 && l.r.Err() != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index d768c157e..ccf3a0b8e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -227,7 +227,7 @@ codeberg.org/gruf/go-bytesize # codeberg.org/gruf/go-byteutil v1.3.0 ## explicit; go 1.20 codeberg.org/gruf/go-byteutil -# codeberg.org/gruf/go-cache/v3 v3.6.1 +# codeberg.org/gruf/go-cache/v3 v3.6.2 ## explicit; go 1.20 codeberg.org/gruf/go-cache/v3 codeberg.org/gruf/go-cache/v3/simple @@ -277,10 +277,10 @@ codeberg.org/gruf/go-mempool # codeberg.org/gruf/go-mutexes v1.5.8 ## explicit; go 1.24.0 codeberg.org/gruf/go-mutexes -# codeberg.org/gruf/go-runners v1.6.3 +# codeberg.org/gruf/go-runners v1.7.0 ## explicit; go 1.19 codeberg.org/gruf/go-runners -# codeberg.org/gruf/go-sched v1.2.4 +# codeberg.org/gruf/go-sched v1.3.0 ## explicit; go 1.19 codeberg.org/gruf/go-sched # codeberg.org/gruf/go-split v1.2.0 @@ -864,11 +864,11 @@ github.com/stretchr/testify/suite # github.com/subosito/gotenv v1.6.0 ## explicit; go 1.18 github.com/subosito/gotenv -# github.com/tdewolff/minify/v2 v2.24.3 +# github.com/tdewolff/minify/v2 v2.24.4 ## explicit; go 1.17 github.com/tdewolff/minify/v2 github.com/tdewolff/minify/v2/html -# github.com/tdewolff/parse/v2 v2.8.3 +# github.com/tdewolff/parse/v2 v2.8.4 ## explicit; go 1.11 github.com/tdewolff/parse/v2 github.com/tdewolff/parse/v2/buffer |
