summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLibravatar kim <grufwub@gmail.com>2025-10-17 17:36:24 +0200
committerLibravatar tobi <tobi.smethurst@protonmail.com>2025-11-17 14:11:11 +0100
commitf714b06fec5b93cf076d0f92eeb8aa7c32cfb531 (patch)
tree8e1a89dd7b0db0f17b695557d03eede9055134ae
parent[bugfix] recheck for just-processed-emoji within mutex lock before starting p... (diff)
downloadgotosocial-f714b06fec5b93cf076d0f92eeb8aa7c32cfb531.tar.xz
[chore] update dependencies (#4507)
- codeberg.org/gruf/go-runners: v1.6.3 -> v1.7.0 - codeberg.org/gruf/go-sched: v1.2.4 -> v1.3.0 - github.com/tdewolff/minify/v2: v2.24.3 -> v2.24.4 Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4507 Co-authored-by: kim <grufwub@gmail.com> Co-committed-by: kim <grufwub@gmail.com>
-rw-r--r--go.mod10
-rw-r--r--go.sum20
-rw-r--r--internal/scheduler/scheduler.go12
-rw-r--r--vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go2
-rw-r--r--vendor/codeberg.org/gruf/go-runners/context.go9
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pointer.go22
-rw-r--r--vendor/codeberg.org/gruf/go-runners/pool.go292
-rw-r--r--vendor/codeberg.org/gruf/go-runners/process.go116
-rw-r--r--vendor/codeberg.org/gruf/go-runners/service.go311
-rw-r--r--vendor/codeberg.org/gruf/go-sched/README.md2
-rw-r--r--vendor/codeberg.org/gruf/go-sched/job.go64
-rw-r--r--vendor/codeberg.org/gruf/go-sched/scheduler.go225
-rw-r--r--vendor/codeberg.org/gruf/go-sched/timing.go25
-rw-r--r--vendor/github.com/tdewolff/parse/v2/binary.go64
-rw-r--r--vendor/github.com/tdewolff/parse/v2/binary_unix.go20
-rw-r--r--vendor/github.com/tdewolff/parse/v2/html/lex.go10
-rw-r--r--vendor/modules.txt10
17 files changed, 495 insertions, 719 deletions
diff --git a/go.mod b/go.mod
index 9627e8fef..aa3d0af02 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,7 @@ require (
code.superseriousbusiness.org/oauth2/v4 v4.5.4-0.20250812115401-3961e46a7384
codeberg.org/gruf/go-bytesize v1.0.4
codeberg.org/gruf/go-byteutil v1.3.0
- codeberg.org/gruf/go-cache/v3 v3.6.1
+ codeberg.org/gruf/go-cache/v3 v3.6.2
codeberg.org/gruf/go-caller v0.0.0-20250806133437-db8d0b1f71cf
codeberg.org/gruf/go-debug v1.3.0
codeberg.org/gruf/go-errors/v2 v2.3.2
@@ -27,8 +27,8 @@ require (
codeberg.org/gruf/go-list v0.0.0-20240425093752-494db03d641f
codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253
codeberg.org/gruf/go-mutexes v1.5.8
- codeberg.org/gruf/go-runners v1.6.3
- codeberg.org/gruf/go-sched v1.2.4
+ codeberg.org/gruf/go-runners v1.7.0
+ codeberg.org/gruf/go-sched v1.3.0
codeberg.org/gruf/go-split v1.2.0
codeberg.org/gruf/go-storage v0.3.1
codeberg.org/gruf/go-structr v0.9.13
@@ -62,7 +62,7 @@ require (
github.com/spf13/pflag v1.0.10
github.com/spf13/viper v1.21.0
github.com/stretchr/testify v1.11.1
- github.com/tdewolff/minify/v2 v2.24.3
+ github.com/tdewolff/minify/v2 v2.24.4
github.com/temoto/robotstxt v1.1.2
github.com/tetratelabs/wazero v1.9.0
github.com/tomnomnom/linkheader v0.0.0-20250811210735-e5fe3b51442e
@@ -201,7 +201,7 @@ require (
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
github.com/spf13/afero v1.15.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
- github.com/tdewolff/parse/v2 v2.8.3 // indirect
+ github.com/tdewolff/parse/v2 v2.8.4 // indirect
github.com/tinylib/msgp v1.3.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
github.com/toqueteos/webbrowser v1.2.0 // indirect
diff --git a/go.sum b/go.sum
index ba86f596f..cb22da7e2 100644
--- a/go.sum
+++ b/go.sum
@@ -14,8 +14,8 @@ codeberg.org/gruf/go-bytesize v1.0.4 h1:LEojK46lUoE748Om7yldx6kLe6jCCuiytz5IZ8vH
codeberg.org/gruf/go-bytesize v1.0.4/go.mod h1:n/GU8HzL9f3UNp/mUKyr1qVmTlj7+xacpp0OHfkvLPs=
codeberg.org/gruf/go-byteutil v1.3.0 h1:nRqJnCcRQ7xbfU6azw7zOzJrSMDIJHBqX6FL9vEMYmU=
codeberg.org/gruf/go-byteutil v1.3.0/go.mod h1:chgnZz1LUcfaObaIFglxF5MRYQkJGjQf4WwVz95ccCM=
-codeberg.org/gruf/go-cache/v3 v3.6.1 h1:sY1XhYeskjZAuYeMm5R0o4Qymru5taNbzmZPSn1oXLE=
-codeberg.org/gruf/go-cache/v3 v3.6.1/go.mod h1:JUNjc4E8gRccn3t+B99akxURFrU6NTDkvFVcwiZirnw=
+codeberg.org/gruf/go-cache/v3 v3.6.2 h1:fQn7Dkj5gQpSNjlnoFeotXHKwbAh1PTH4qrD5BMm5ZA=
+codeberg.org/gruf/go-cache/v3 v3.6.2/go.mod h1:yjmrOyda2K8B5sAfCpPPAAqI3oewf4mNQmvxygmP+g8=
codeberg.org/gruf/go-caller v0.0.0-20250806133437-db8d0b1f71cf h1:Rzu7WLpscj2w1N+ClIHlJoTYf9SuqZrZ7E4f9T7jGdw=
codeberg.org/gruf/go-caller v0.0.0-20250806133437-db8d0b1f71cf/go.mod h1:jEyYiqCzH1TaxfclSFYthE32oI0dsMnRS6EHqy6y0uo=
codeberg.org/gruf/go-debug v1.3.0 h1:PIRxQiWUFKtGOGZFdZ3Y0pqyfI0Xr87j224IYe2snZs=
@@ -46,10 +46,10 @@ codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253 h1:qPAY72xCWlySV
codeberg.org/gruf/go-mempool v0.0.0-20251003110531-b54adae66253/go.mod h1:761koiXmqfgzvu5mez2Rk7YlwWilpqJ/zv5hIA6NoNI=
codeberg.org/gruf/go-mutexes v1.5.8 h1:HRGnvT4COb3jX9xdeoSUUbjPgmk5kXPuDfld9ksUJKA=
codeberg.org/gruf/go-mutexes v1.5.8/go.mod h1:21sy/hWH8dDQBk7ocsxqo2GNpWiIir+e82RG3hjnN20=
-codeberg.org/gruf/go-runners v1.6.3 h1:To/AX7eTrWuXrTkA3RA01YTP5zha1VZ68LQ+0D4RY7E=
-codeberg.org/gruf/go-runners v1.6.3/go.mod h1:oXAaUmG2VxoKttpCqZGv5nQBeSvZSR2BzIk7h1yTRlU=
-codeberg.org/gruf/go-sched v1.2.4 h1:ddBB9o0D/2oU8NbQ0ldN5aWxogpXPRBATWi58+p++Hw=
-codeberg.org/gruf/go-sched v1.2.4/go.mod h1:wad6l+OcYGWMA2TzNLMmLObsrbBDxdJfEy5WvTgBjNk=
+codeberg.org/gruf/go-runners v1.7.0 h1:Z+8Qne4H9nAdZZbA4cij0PWhhJxtigUGA4Mp7griYes=
+codeberg.org/gruf/go-runners v1.7.0/go.mod h1:1xBodiyuPfosJga+NYTfeepQYUrlBGCAa4NuQTbtiBw=
+codeberg.org/gruf/go-sched v1.3.0 h1:3Y+Vb6p+rt05iUC6Oj3TDFc9GQZCDImDfTKSUKUI9WA=
+codeberg.org/gruf/go-sched v1.3.0/go.mod h1:qL9MdPdBFaNXzSfumpQ18TStBAwnRddCTD+wlrZUEgI=
codeberg.org/gruf/go-split v1.2.0 h1:PmzL23nVEVHm8VxjsJmv4m4wGQz2bGgQw52dgSSj65c=
codeberg.org/gruf/go-split v1.2.0/go.mod h1:0rejWJpqvOoFAd7nwm5tIXYKaAqjtFGOXmTqQV+VO38=
codeberg.org/gruf/go-storage v0.3.1 h1:g66UIM/xXnEk9ejT+W0T9s/PODBZhXa/8ajzeY/MELI=
@@ -426,10 +426,10 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
-github.com/tdewolff/minify/v2 v2.24.3 h1:BaKgWSFLKbKDiUskbeRgbe2n5d1Ci1x3cN/eXna8zOA=
-github.com/tdewolff/minify/v2 v2.24.3/go.mod h1:1JrCtoZXaDbqioQZfk3Jdmr0GPJKiU7c1Apmb+7tCeE=
-github.com/tdewolff/parse/v2 v2.8.3 h1:5VbvtJ83cfb289A1HzRA9sf02iT8YyUwN84ezjkdY1I=
-github.com/tdewolff/parse/v2 v2.8.3/go.mod h1:Hwlni2tiVNKyzR1o6nUs4FOF07URA+JLBLd6dlIXYqo=
+github.com/tdewolff/minify/v2 v2.24.4 h1:pQyr6eWDa+RXtAoZg+6wurh0jB9ojqw/qc5LlU7/z6c=
+github.com/tdewolff/minify/v2 v2.24.4/go.mod h1:iD9Qn7/brhKY9d0KLKMkZrqS8/bqxSxRKruBi7V6m+w=
+github.com/tdewolff/parse/v2 v2.8.4 h1:A6slgBLGGDPBMGA28KQZfHpaKffuNvhOe7zSag+x/rw=
+github.com/tdewolff/parse/v2 v2.8.4/go.mod h1:Hwlni2tiVNKyzR1o6nUs4FOF07URA+JLBLd6dlIXYqo=
github.com/tdewolff/test v1.0.11 h1:FdLbwQVHxqG16SlkGveC0JVyrJN62COWTRyUFzfbtBE=
github.com/tdewolff/test v1.0.11/go.mod h1:XPuWBzvdUzhCuxWO1ojpXsyzsA5bFoS3tO/Q3kFuTG8=
github.com/temoto/robotstxt v1.1.2 h1:W2pOjSJ6SWvldyEuiFXNxz3xZ8aiWX5LbfDiOFd7Fxg=
diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go
index 8ef595dc4..b021cc137 100644
--- a/internal/scheduler/scheduler.go
+++ b/internal/scheduler/scheduler.go
@@ -37,7 +37,7 @@ type Scheduler struct {
// Start attempts to start the scheduler. Returns false if already running.
func (sch *Scheduler) Start() bool {
- if sch.sch.Start(nil) {
+ if sch.sch.Start() {
sch.ts = make(map[string]*task)
return true
}
@@ -89,7 +89,7 @@ func (sch *Scheduler) schedule(id string, fn func(context.Context, time.Time), t
panic("nil function")
}
- // Perform within lock.
+ // Acquire lock.
sch.mu.Lock()
defer sch.mu.Unlock()
@@ -99,16 +99,14 @@ func (sch *Scheduler) schedule(id string, fn func(context.Context, time.Time), t
return false
}
- // Extract current sched context.
- doneCh := sch.sch.Done()
- ctx := runners.CancelCtx(doneCh)
+ // Extract current scheduler context.
+ ctx := runners.CancelCtx(sch.sch.Done())
// Create a new job to hold task function with
// timing, passing in the current sched context.
job := sched.NewJob(func(now time.Time) {
fn(ctx, now)
- })
- job.With(t)
+ }).With(t)
// Queue job with the scheduler,
// and store a new encompassing task.
diff --git a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go
index a12b33ab9..12faa86b2 100644
--- a/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go
+++ b/vendor/codeberg.org/gruf/go-cache/v3/ttl/schedule.go
@@ -14,7 +14,7 @@ var scheduler sched.Scheduler
func schedule(sweep func(time.Time), freq time.Duration) func() {
if !scheduler.Running() {
// ensure sched running
- _ = scheduler.Start(nil)
+ _ = scheduler.Start()
}
return scheduler.Schedule(sched.NewJob(sweep).Every(freq))
}
diff --git a/vendor/codeberg.org/gruf/go-runners/context.go b/vendor/codeberg.org/gruf/go-runners/context.go
index 12f7f1a10..e02dcab22 100644
--- a/vendor/codeberg.org/gruf/go-runners/context.go
+++ b/vendor/codeberg.org/gruf/go-runners/context.go
@@ -2,6 +2,7 @@ package runners
import (
"context"
+ "sync/atomic"
"time"
)
@@ -19,9 +20,13 @@ func Closed() context.Context {
// CtxWithCancel returns a new context.Context impl with cancel.
func CtxWithCancel() (context.Context, context.CancelFunc) {
+ var once atomic.Uint32
ctx := make(chan struct{})
- cncl := func() { close(ctx) }
- return CancelCtx(ctx), cncl
+ return CancelCtx(ctx), func() {
+ if once.CompareAndSwap(0, 1) {
+ close(ctx)
+ }
+ }
}
// CancelCtx is the simplest possible cancellable context.
diff --git a/vendor/codeberg.org/gruf/go-runners/pointer.go b/vendor/codeberg.org/gruf/go-runners/pointer.go
new file mode 100644
index 000000000..cc139309f
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-runners/pointer.go
@@ -0,0 +1,22 @@
+package runners
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+// atomic_pointer wraps an unsafe.Pointer with
+// receiver methods for their atomic counterparts.
+type atomic_pointer struct{ p unsafe.Pointer }
+
+func (p *atomic_pointer) Load() unsafe.Pointer {
+ return atomic.LoadPointer(&p.p)
+}
+
+func (p *atomic_pointer) Store(ptr unsafe.Pointer) {
+ atomic.StorePointer(&p.p, ptr)
+}
+
+func (p *atomic_pointer) CAS(old, new unsafe.Pointer) bool {
+ return atomic.CompareAndSwapPointer(&p.p, old, new)
+}
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go
deleted file mode 100644
index 644cde0b9..000000000
--- a/vendor/codeberg.org/gruf/go-runners/pool.go
+++ /dev/null
@@ -1,292 +0,0 @@
-package runners
-
-import (
- "context"
- "fmt"
- "os"
- "runtime"
- "sync"
-
- "codeberg.org/gruf/go-errors/v2"
-)
-
-// WorkerFunc represents a function processable by a worker in WorkerPool. Note
-// that implementations absolutely MUST check whether passed context is <-ctx.Done()
-// otherwise stopping the pool may block indefinitely.
-type WorkerFunc func(context.Context)
-
-// WorkerPool provides a means of enqueuing asynchronous work.
-type WorkerPool struct {
- fns chan WorkerFunc
- svc Service
-}
-
-// Start will start the main WorkerPool management loop in a new goroutine, along
-// with requested number of child worker goroutines. Returns false if already running.
-func (pool *WorkerPool) Start(workers int, queue int) bool {
- // Attempt to start the svc
- ctx, ok := pool.svc.doStart()
- if !ok {
- return false
- }
-
- if workers <= 0 {
- // Use $GOMAXPROCS as default.
- workers = runtime.GOMAXPROCS(0)
- }
-
- if queue < 0 {
- // Use reasonable queue default.
- queue = workers * 10
- }
-
- // Allocate pool queue of given size.
- //
- // This MUST be set BEFORE we return and NOT in
- // the launched goroutine, or there is a risk that
- // the pool may appear as closed for a short time
- // until the main goroutine has been entered.
- fns := make(chan WorkerFunc, queue)
- pool.fns = fns
-
- go func() {
- defer func() {
- // unlock single wait
- pool.svc.wait.Unlock()
-
- // ensure stopped
- pool.svc.Stop()
- }()
-
- var wait sync.WaitGroup
-
- // Start goroutine worker functions
- for i := 0; i < workers; i++ {
- wait.Add(1)
-
- go func() {
- defer wait.Done()
-
- // Run worker function (retry on panic)
- for !worker_run(CancelCtx(ctx), fns) {
- }
- }()
- }
-
- // Wait on ctx
- <-ctx
-
- // Drain function queue.
- //
- // All functions in the queue MUST be
- // run, so we pass them a closed context.
- //
- // This mainly allows us to block until
- // the function queue is empty, as worker
- // functions will also continue draining in
- // the background with the (now) closed ctx.
- for !drain_queue(fns) {
- // retry on panic
- }
-
- // Now the queue is empty, we can
- // safely close the channel signalling
- // all of the workers to return.
- close(fns)
- wait.Wait()
- }()
-
- return true
-}
-
-// Stop will stop the WorkerPool management loop, blocking until stopped.
-func (pool *WorkerPool) Stop() bool {
- return pool.svc.Stop()
-}
-
-// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
-func (pool *WorkerPool) Running() bool {
- return pool.svc.Running()
-}
-
-// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
-func (pool *WorkerPool) Done() <-chan struct{} {
- return pool.svc.Done()
-}
-
-// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
-// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
-// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
-// WorkerFuncs MUST respect the passed context.
-func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
- // Check valid fn
- if fn == nil {
- return
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- fn(closedctx)
-
- // Placed fn in queue
- case pool.fns <- fn:
- }
-}
-
-// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
-// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
-func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Caller ctx cancelled
- case <-ctx.Done():
- return false
-
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
- }
-}
-
-// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case
-// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue().
-// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed.
-func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- case <-ctx.Done():
- // We failed to add this entry to the worker queue before the
- // incoming context was cancelled. So to ensure processing
- // we simply queue it asynchronously and return early to caller.
- go pool.Enqueue(fn)
- return false
-
- case <-pool.svc.Done():
- // Pool ctx cancelled
- fn(closedctx)
- return false
-
- case pool.fns <- fn:
- // Placed fn in queue
- return true
- }
-}
-
-// EnqueueNow attempts Enqueue but returns false if not executed.
-func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
- // Check valid fn
- if fn == nil {
- return false
- }
-
- select {
- // Pool ctx cancelled
- case <-pool.svc.Done():
- return false
-
- // Placed fn in queue
- case pool.fns <- fn:
- return true
-
- // Queue is full
- default:
- return false
- }
-}
-
-// Queue returns the number of currently queued WorkerFuncs.
-func (pool *WorkerPool) Queue() int {
- var l int
- pool.svc.While(func() {
- l = len(pool.fns)
- })
- return l
-}
-
-// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
-func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- // Wait on next func
- fn, ok := <-fns
- if !ok {
- return true
- }
-
- // Run with ctx
- fn(ctx)
- }
-}
-
-// drain_queue will drain and run all functions in worker queue, passing in a closed context.
-func drain_queue(fns <-chan WorkerFunc) bool {
- defer func() {
- // Recover and drop any panic
- if r := recover(); r != nil {
-
- // Gather calling func frames.
- pcs := make([]uintptr, 10)
- n := runtime.Callers(3, pcs)
- i := runtime.CallersFrames(pcs[:n])
- c := gatherFrames(i, n)
-
- const msg = "worker_run: recovered panic: %v\n\n%s\n"
- fmt.Fprintf(os.Stderr, msg, r, c.String())
- }
- }()
-
- for {
- select {
- // Run with closed ctx
- case fn := <-fns:
- fn(closedctx)
-
- // Queue is empty
- default:
- return true
- }
- }
-}
-
-// gatherFrames collates runtime frames from a frame iterator.
-func gatherFrames(iter *runtime.Frames, n int) errors.Callers {
- if iter == nil {
- return nil
- }
- frames := make([]runtime.Frame, 0, n)
- for {
- f, ok := iter.Next()
- if !ok {
- break
- }
- frames = append(frames, f)
- }
- return frames
-}
diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go
index ca39ac0d0..3feeff258 100644
--- a/vendor/codeberg.org/gruf/go-runners/process.go
+++ b/vendor/codeberg.org/gruf/go-runners/process.go
@@ -2,80 +2,72 @@ package runners
import (
"fmt"
+ "unsafe"
+
"sync"
)
-// Processable defines a runnable process with error return
-// that can be passed to a Processor instance for managed running.
-type Processable func() error
-
// Processor acts similarly to a sync.Once object, except that it is reusable. After
// the first call to Process(), any further calls before this first has returned will
// block until the first call has returned, and return the same error. This ensures
// that only a single instance of it is ever running at any one time.
-type Processor struct {
- mutex sync.Mutex
- wait *sync.WaitGroup
- err *error
-}
+type Processor struct{ p atomic_pointer }
// Process will process the given function if first-call, else blocking until
// the first function has returned, returning the same error result.
-func (p *Processor) Process(proc Processable) (err error) {
- // Acquire state lock.
- p.mutex.Lock()
-
- if p.wait != nil {
- // Already running.
- //
- // Get current ptrs.
- waitPtr := p.wait
- errPtr := p.err
-
- // Free state lock.
- p.mutex.Unlock()
-
- // Wait for finish.
- waitPtr.Wait()
- return *errPtr
- }
-
- // Alloc waiter for new process.
- var wait sync.WaitGroup
-
- // No need to alloc new error as
- // we use the alloc'd named error
- // return required for panic handling.
-
- // Reset ptrs.
- p.wait = &wait
- p.err = &err
-
- // Set started.
- wait.Add(1)
- p.mutex.Unlock()
-
- defer func() {
- if r := recover(); r != nil {
- if err != nil {
- rOld := r // wrap the panic so we don't lose existing returned error
- r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld)
- }
-
- // Catch any panics and wrap as error.
- err = fmt.Errorf("caught panic: %v", r)
+func (p *Processor) Process(proc func() error) (err error) {
+ var i *proc_instance
+
+ for {
+ // Attempt to load existing instance.
+ ptr := (*proc_instance)(p.p.Load())
+ if ptr != nil {
+
+ // Wait on existing.
+ ptr.wait.Wait()
+ err = ptr.err
+ return
}
- // Mark done.
- wait.Done()
+ if i == nil {
+ // Allocate instance.
+ i = new(proc_instance)
+ i.wait.Add(1)
+ }
- // Set stopped.
- p.mutex.Lock()
- p.wait = nil
- p.mutex.Unlock()
- }()
+ // Try to acquire start slot by
+ // setting ptr to *our* instance.
+ if p.p.CAS(nil, unsafe.Pointer(i)) {
+ defer func() {
+ if r := recover(); r != nil {
+ if i.err != nil {
+ rOld := r // wrap the panic so we don't lose existing returned error
+ r = fmt.Errorf("panic occured after error %q: %v", i.err.Error(), rOld)
+ }
+
+ // Catch panics and wrap as error return.
+ i.err = fmt.Errorf("caught panic: %v", r)
+ }
+
+ // Set return.
+ err = i.err
+
+ // Release the
+ // goroutines.
+ i.wait.Done()
+
+ // Free processor.
+ p.p.Store(nil)
+ }()
+
+ // Run func.
+ i.err = proc()
+ return
+ }
+ }
+}
- // Run process.
- err = proc()
- return
+type proc_instance struct {
+ wait sync.WaitGroup
+ err error
}
diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go
index 8a7c0051a..fe41807f9 100644
--- a/vendor/codeberg.org/gruf/go-runners/service.go
+++ b/vendor/codeberg.org/gruf/go-runners/service.go
@@ -3,215 +3,220 @@ package runners
import (
"context"
"sync"
+ "sync/atomic"
+ "unsafe"
)
-// Service provides a means of tracking a single long-running service, provided protected state
-// changes and preventing multiple instances running. Also providing service state information.
-type Service struct {
- state uint32 // 0=stopped, 1=running, 2=stopping
- mutex sync.Mutex // mutex protects overall state changes
- wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
- ctx chan struct{} // ctx is the current context for running function (or nil if not running)
-}
+// Service provides a means of tracking a single long-running Service, provided protected state
+// changes and preventing multiple instances running. Also providing Service state information.
+type Service struct{ p atomic_pointer }
// Run will run the supplied function until completion, using given context to propagate cancel.
// Immediately returns false if the Service is already running, and true after completed run.
-func (svc *Service) Run(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
+func (svc *Service) Run(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
+
+ // Attempt to start.
+ ptr, ok = svc.start()
if !ok {
- return false
+ return
}
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
-
- // ensure stopped
- _ = svc.Stop()
- }()
-
- // Run with context.
- fn(CancelCtx(ctx))
-
- return true
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
+ return
}
// GoRun will run the supplied function until completion in a goroutine, using given context to
// propagate cancel. Immediately returns boolean indicating success, or that service is already running.
-func (svc *Service) GoRun(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
+func (svc *Service) GoRun(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
+
+ // Attempt to start.
+ ptr, ok = svc.start()
if !ok {
- return false
+ return
}
go func() {
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
-
- // ensure stopped
- _ = svc.Stop()
- }()
-
- // Run with context.
- fn(CancelCtx(ctx))
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
}()
- return true
+ return
}
// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
-func (svc *Service) RunWait(fn func(context.Context)) bool {
- // Attempt to start the svc
- ctx, ok := svc.doStart()
+func (svc *Service) RunWait(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
+
+ // Attempt to start.
+ ptr, ok = svc.start()
if !ok {
- <-ctx // block
- return false
+ <-ptr.done
+ return
}
- defer func() {
- // unlock single wait
- svc.wait.Unlock()
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
+ return
+}
- // ensure stopped
- _ = svc.Stop()
- }()
+// GoRunWait is functionally the same as .RunWait(), but blocks until the first instance of RunWait() returns.
+func (svc *Service) GoRunWait(fn func(context.Context)) (ok bool) {
+ var ptr *svc_instance
- // Run with context.
- fn(CancelCtx(ctx))
+ // Attempt to start.
+ ptr, ok = svc.start()
+ if !ok {
+ <-ptr.done
+ return
+ }
+
+ go func() {
+ // Run given function.
+ defer svc.on_done(ptr)
+ fn(CancelCtx(ptr.done))
+ }()
- return true
+ return
}
// Stop will attempt to stop the service, cancelling the running function's context. Immediately
// returns false if not running, and true only after Service is fully stopped.
func (svc *Service) Stop() bool {
- // Attempt to stop the svc
- ctx, ok := svc.doStop()
- if !ok {
- return false
- }
-
- defer func() {
- // Get svc lock
- svc.mutex.Lock()
-
- // Wait until stopped
- svc.wait.Lock()
- svc.wait.Unlock()
+ return svc.must_get().stop()
+}
- // Reset the svc
- svc.ctx = nil
- svc.state = 0
- svc.mutex.Unlock()
- }()
+// Running returns if Service is running (i.e. NOT fully stopped, but may be *stopping*).
+func (svc *Service) Running() bool {
+ return svc.must_get().running()
+}
- // Cancel ctx
- close(ctx)
+// Done returns a channel that's closed when Service.Stop() is called. It is
+// the same channel provided to the currently running service function.
+func (svc *Service) Done() <-chan struct{} {
+ return svc.must_get().done
+}
- return true
+func (svc *Service) start() (*svc_instance, bool) {
+ ptr := svc.must_get()
+ return ptr, ptr.start()
}
-// While allows you to execute given function guaranteed within current
-// service state. Please note that this will hold the underlying service
-// state change mutex open while executing the function.
-func (svc *Service) While(fn func()) {
- // Protect state change
- svc.mutex.Lock()
- defer svc.mutex.Unlock()
+func (svc *Service) on_done(ptr *svc_instance) {
+ // Ensure stopped.
+ ptr.stop_private()
- // Run
- fn()
+ // Free service.
+ svc.p.Store(nil)
}
-// doStart will safely set Service state to started, returning a ptr to this context insance.
-func (svc *Service) doStart() (chan struct{}, bool) {
- // Protect startup
- svc.mutex.Lock()
+func (svc *Service) must_get() *svc_instance {
+ var newptr *svc_instance
- if svc.ctx == nil {
- // this will only have been allocated
- // if svc.Done() was already called.
- svc.ctx = make(chan struct{})
- }
-
- // Take our own ptr
- ctx := svc.ctx
+ for {
+ // Try to load existing instance.
+ ptr := (*svc_instance)(svc.p.Load())
+ if ptr != nil {
+ return ptr
+ }
- if svc.state != 0 {
- // State was not stopped.
- svc.mutex.Unlock()
- return ctx, false
- }
+ if newptr == nil {
+ // Allocate new instance.
+ newptr = new(svc_instance)
+ newptr.done = make(chan struct{})
+ }
- // Set started.
- svc.state = 1
+ // Attempt to acquire slot by setting our ptr.
+ if !svc.p.CAS(nil, unsafe.Pointer(newptr)) {
+ continue
+ }
- // Start waiter.
- svc.wait.Lock()
+ return newptr
+ }
+}
- // Unlock and return
- svc.mutex.Unlock()
- return ctx, true
+type svc_instance struct {
+ wait sync.WaitGroup
+ done chan struct{}
+ state atomic.Uint32
}
-// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
-func (svc *Service) doStop() (chan struct{}, bool) {
- // Protect stop
- svc.mutex.Lock()
+const (
+ started_bit = uint32(1) << 0
+ stopping_bit = uint32(1) << 1
+ finished_bit = uint32(1) << 2
+)
- if svc.state != 1 /* not started */ {
- svc.mutex.Unlock()
- return nil, false
- }
+func (i *svc_instance) start() (ok bool) {
+ // Acquire start by setting 'started' bit.
+ switch old := i.state.Or(started_bit); {
- // state stopping
- svc.state = 2
+ case old&finished_bit != 0:
+ // Already finished.
- // Take our own ptr
- // and unlock state
- ctx := svc.ctx
- svc.mutex.Unlock()
+ case old&started_bit == 0:
+ // Successfully started!
+ i.wait.Add(1)
+ ok = true
+ }
- return ctx, true
+ return
}
-// Running returns if Service is running (i.e. state NOT stopped / stopping).
-func (svc *Service) Running() bool {
- svc.mutex.Lock()
- state := svc.state
- svc.mutex.Unlock()
- return (state == 1)
-}
+// NOTE: MAY ONLY BE CALLED BY STARTING GOROUTINE.
+func (i *svc_instance) stop_private() {
+ // Attempt set both stopping and finished bits.
+ old := i.state.Or(stopping_bit | finished_bit)
-// Done returns a channel that's closed when Service.Stop() is called. It is
-// the same channel provided to the currently running service function.
-func (svc *Service) Done() <-chan struct{} {
- var done <-chan struct{}
-
- svc.mutex.Lock()
- switch svc.state {
- // stopped
- case 0:
- if svc.ctx == nil {
- // here we create a new context so that the
- // returned 'done' channel here will still
- // be valid for when Service is next started.
- svc.ctx = make(chan struct{})
- }
- done = svc.ctx
+ // Only if we weren't already
+ // stopping do we close channel.
+ if old&stopping_bit == 0 {
+ close(i.done)
+ }
- // started
- case 1:
- done = svc.ctx
+ // Release
+ // waiters.
+ i.wait.Done()
+}
- // stopping
- case 2:
- done = svc.ctx
+func (i *svc_instance) stop() (ok bool) {
+ // Attempt to set the 'stopping' bit.
+ switch old := i.state.Or(stopping_bit); {
+
+ case old&finished_bit != 0:
+ // Already finished.
+ return
+
+ case old&started_bit == 0:
+ // This was never started
+ // to begin with, just mark
+ // as fully finished here.
+ _ = i.state.Or(finished_bit)
+ return
+
+ case old&stopping_bit == 0:
+ // We succesfully stopped
+ // instance, close channel.
+ close(i.done)
+ ok = true
}
- svc.mutex.Unlock()
- return done
+ // Wait on stop.
+ i.wait.Wait()
+ return
+}
+
+// running returns whether service was started and
+// is not yet finished. that indicates that it may
+// have been started and not yet stopped, or that
+// it was started, stopped and not yet returned.
+func (i *svc_instance) running() bool {
+ val := i.state.Load()
+ return val&started_bit != 0 &&
+ val&finished_bit == 0
}
diff --git a/vendor/codeberg.org/gruf/go-sched/README.md b/vendor/codeberg.org/gruf/go-sched/README.md
index d32a961ae..0a9439577 100644
--- a/vendor/codeberg.org/gruf/go-sched/README.md
+++ b/vendor/codeberg.org/gruf/go-sched/README.md
@@ -2,4 +2,4 @@
A simple job (both run-once and recurring) queueing library with down-to millisecond precision.
-Precision estimates based on test output (running on i7-11800h): 1ms precision with 80% tolerance. \ No newline at end of file
+Precision estimates based on test output (running on AMD Ryzen 7 7840u): 2ms precision with 95% tolerance.
diff --git a/vendor/codeberg.org/gruf/go-sched/job.go b/vendor/codeberg.org/gruf/go-sched/job.go
index 2531769d6..f3bd869d2 100644
--- a/vendor/codeberg.org/gruf/go-sched/job.go
+++ b/vendor/codeberg.org/gruf/go-sched/job.go
@@ -14,10 +14,9 @@ import (
// holding onto a next execution time safely in a concurrent environment.
type Job struct {
id uint64
- next unsafe.Pointer // *time.Time
+ next atomic_time
timing Timing
call func(time.Time)
- panic func(interface{})
}
// NewJob returns a new Job to run given function.
@@ -30,28 +29,31 @@ func NewJob(fn func(now time.Time)) *Job {
j := &Job{ // set defaults
timing: emptytiming, // i.e. fire immediately
call: fn,
- panic: func(i interface{}) { panic(i) },
}
return j
}
-// At sets this Job to execute at time, by passing (*sched.Once)(&at) to .With(). See .With() for details.
+// At sets this Job to execute at time, by passing
+// (*sched.Once)(&at) to .With(). See .With() for details.
func (job *Job) At(at time.Time) *Job {
return job.With((*Once)(&at))
}
-// Every sets this Job to execute every period, by passing sched.Period(period) to .With(). See .With() for details.
+// Every sets this Job to execute every period, by passing
+// sched.Period(period) to .With(). See .With() for details.
func (job *Job) Every(period time.Duration) *Job {
return job.With(Periodic(period))
}
-// EveryAt sets this Job to execute every period starting at time, by passing &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details.
+// EveryAt sets this Job to execute every period starting at time, by passing
+// &PeriodicAt{once: Once(at), period: Periodic(period)} to .With(). See .With() for details.
func (job *Job) EveryAt(at time.Time, period time.Duration) *Job {
return job.With(&PeriodicAt{Once: Once(at), Period: Periodic(period)})
}
-// With sets this Job's timing to given implementation, or if already set will wrap existing using sched.TimingWrap{}.
+// With sets this Job's timing to given implementation, or
+// if already set will wrap existing using sched.TimingWrap{}.
func (job *Job) With(t Timing) *Job {
if t == nil {
// Ensure a timing
@@ -78,44 +80,16 @@ func (job *Job) With(t Timing) *Job {
return job
}
-// OnPanic specifies how this job handles panics, default is an actual panic.
-func (job *Job) OnPanic(fn func(interface{})) *Job {
- if fn == nil {
- // Ensure a function
- panic("nil func")
- }
-
- if job.id != 0 {
- // Cannot update scheduled job
- panic("job already scheduled")
- }
-
- job.panic = fn
- return job
-}
-
// Next returns the next time this Job is expected to run.
func (job *Job) Next() time.Time {
- return loadTime(&job.next)
+ return job.next.Load()
}
// Run will execute this Job and pass through given now time.
-func (job *Job) Run(now time.Time) {
- defer func() {
- switch r := recover(); {
- case r == nil:
- // no panic
- case job != nil &&
- job.panic != nil:
- job.panic(r)
- default:
- panic(r)
- }
- }()
- job.call(now)
-}
+func (job *Job) Run(now time.Time) { job.call(now) }
-// String provides a debuggable string representation of Job including ID, next time and Timing type.
+// String provides a debuggable string representation
+// of Job including ID, next time and Timing type.
func (job *Job) String() string {
var buf strings.Builder
buf.WriteByte('{')
@@ -123,7 +97,7 @@ func (job *Job) String() string {
buf.WriteString(strconv.FormatUint(job.id, 10))
buf.WriteByte(' ')
buf.WriteString("next=")
- buf.WriteString(loadTime(&job.next).Format(time.StampMicro))
+ buf.WriteString(job.next.Load().Format(time.StampMicro))
buf.WriteByte(' ')
buf.WriteString("timing=")
buf.WriteString(reflect.TypeOf(job.timing).String())
@@ -131,13 +105,15 @@ func (job *Job) String() string {
return buf.String()
}
-func loadTime(p *unsafe.Pointer) time.Time {
- if p := atomic.LoadPointer(p); p != nil {
+type atomic_time struct{ p unsafe.Pointer }
+
+func (t *atomic_time) Load() time.Time {
+ if p := atomic.LoadPointer(&t.p); p != nil {
return *(*time.Time)(p)
}
return zerotime
}
-func storeTime(p *unsafe.Pointer, t time.Time) {
- atomic.StorePointer(p, unsafe.Pointer(&t))
+func (t *atomic_time) Store(v time.Time) {
+ atomic.StorePointer(&t.p, unsafe.Pointer(&v))
}
diff --git a/vendor/codeberg.org/gruf/go-sched/scheduler.go b/vendor/codeberg.org/gruf/go-sched/scheduler.go
index 79913a9b3..9ab5b8bc8 100644
--- a/vendor/codeberg.org/gruf/go-sched/scheduler.go
+++ b/vendor/codeberg.org/gruf/go-sched/scheduler.go
@@ -6,18 +6,22 @@ import (
"sync"
"sync/atomic"
"time"
+ "unsafe"
"codeberg.org/gruf/go-runners"
)
-// precision is the maximum time we can offer scheduler run-time precision down to.
-const precision = time.Millisecond
+// Precision is the maximum time we can
+// offer scheduler run-time precision down to.
+const Precision = 2 * time.Millisecond
var (
- // neverticks is a timer channel that never ticks (it's starved).
+ // neverticks is a timer channel
+ // that never ticks (it's starved).
neverticks = make(chan time.Time)
- // alwaysticks is a timer channel that always ticks (it's closed).
+ // alwaysticks is a timer channel
+ // that always ticks (it's closed).
alwaysticks = func() chan time.Time {
ch := make(chan time.Time)
close(ch)
@@ -28,48 +32,51 @@ var (
// Scheduler provides a means of running jobs at specific times and
// regular intervals, all while sharing a single underlying timer.
type Scheduler struct {
- jobs []*Job // jobs is a list of tracked Jobs to be executed
- jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
- svc runners.Service // svc manages the main scheduler routine
- jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
- rgo func(func()) // goroutine runner, allows using goroutine pool to launch jobs
+ svc runners.Service // svc manages the main scheduler routine
+ jobs []*Job // jobs is a list of tracked Jobs to be executed
+ jch atomic_channel // jch accepts either Jobs or job IDs to notify new/removed jobs
+ jid atomic.Uint64 // jid is used to iteratively generate unique IDs for jobs
}
-// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
-func (sch *Scheduler) Start(gorun func(func())) bool {
- var block sync.Mutex
+// Start will attempt to start the Scheduler. Immediately returns false
+// if the Service is already running, and true after completed run.
+func (sch *Scheduler) Start() bool {
+ var wait sync.WaitGroup
- // Use mutex to synchronize between started
+ // Use waiter to synchronize between started
// goroutine and ourselves, to ensure that
// we don't return before Scheduler init'd.
- block.Lock()
- defer block.Unlock()
+ wait.Add(1)
ok := sch.svc.GoRun(func(ctx context.Context) {
- // Create Scheduler job channel
- sch.jch = make(chan interface{})
-
- // Set goroutine runner function
- if sch.rgo = gorun; sch.rgo == nil {
- sch.rgo = func(f func()) { go f() }
- }
-
- // Unlock start routine
- block.Unlock()
-
- // Enter main loop
- sch.run(ctx)
+ // Prepare new channel.
+ ch := new(channel)
+ ch.ctx = ctx.Done()
+ ch.ch = make(chan interface{})
+ sch.jch.Store(ch)
+
+ // Release
+ // start fn
+ wait.Done()
+
+ // Main loop
+ sch.run(ch)
})
if ok {
- // Wait on goroutine
- block.Lock()
+ // Wait on
+ // goroutine
+ wait.Wait()
+ } else {
+ // Release
+ wait.Done()
}
return ok
}
-// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
+// Stop will attempt to stop the Scheduler. Immediately returns false
+// if not running, and true only after Scheduler is fully stopped.
func (sch *Scheduler) Stop() bool {
return sch.svc.Stop()
}
@@ -86,45 +93,38 @@ func (sch *Scheduler) Done() <-chan struct{} {
// Schedule will add provided Job to the Scheduler, returning a cancel function.
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
- switch {
- // Check a job was passed
- case job == nil:
+ if job == nil {
panic("nil job")
-
- // Check we are running
- case !sch.Running():
- panic("scheduler not running")
}
- // Calculate next job ID
- last := sch.jid.Load()
- next := sch.jid.Add(1)
- if next < last {
- panic("job id overflow")
+ // Load job channel.
+ ch := sch.jch.Load()
+ if ch == nil {
+ panic("not running")
}
- // Pass job to scheduler
- job.id = next
- sch.jch <- job
+ // Calculate next job ID.
+ job.id = sch.jid.Add(1)
- // Take ptrs to current state chs
- ctx := sch.svc.Done()
- jch := sch.jch
-
- // Return cancel function for job ID
- return func() {
- select {
- // Sched stopped
- case <-ctx:
-
- // Cancel this job
- case jch <- next:
- }
+ // Pass job
+ // to channel.
+ if !ch.w(job) {
+ panic("not running")
}
+
+ // Return cancel function for job
+ return func() { ch.w(job.id) }
}
// run is the main scheduler run routine, which runs for as long as ctx is valid.
-func (sch *Scheduler) run(ctx context.Context) {
+func (sch *Scheduler) run(ch *channel) {
+ defer ch.close()
+ if ch == nil {
+ panic("nil channel")
+ } else if sch == nil {
+ panic("nil scheduler")
+ }
+
var (
// now stores the current time, and will only be
// set when the timer channel is set to be the
@@ -165,39 +165,43 @@ func (sch *Scheduler) run(ctx context.Context) {
// Get now time.
now = time.Now()
- // Sort jobs by next occurring.
+ // Sort by next occurring.
sort.Sort(byNext(sch.jobs))
// Get next job time.
next := sch.jobs[0].Next()
- // If this job is _just_ about to be ready, we don't bother
+ // If this job is *just* about to be ready, we don't bother
// sleeping. It's wasted cycles only sleeping for some obscenely
// tiny amount of time we can't guarantee precision for.
- if until := next.Sub(now); until <= precision/1e3 {
+ if until := next.Sub(now); until <= Precision/1e3 {
+
// This job is behind,
// set to always tick.
tch = alwaysticks
} else {
+
// Reset timer to period.
timer.Reset(until)
- tch = timer.C
timerset = true
+ tch = timer.C
}
} else {
+
// Unset timer
tch = neverticks
}
select {
// Scheduler stopped
- case <-ctx.Done():
+ case <-ch.done():
stopdrain()
return
- // Timer ticked, run scheduled
- case t := <-tch:
- if !timerset {
+ // Timer ticked,
+ // run scheduled.
+ case t, ok := <-tch:
+ if !ok {
// 'alwaysticks' returns zero
// times, BUT 'now' will have
// been set during above sort.
@@ -205,8 +209,9 @@ func (sch *Scheduler) run(ctx context.Context) {
}
sch.schedule(t)
- // Received update, handle job/id
- case v := <-sch.jch:
+ // Received update,
+ // handle job/id.
+ case v := <-ch.r():
sch.handle(v)
stopdrain()
}
@@ -220,21 +225,21 @@ func (sch *Scheduler) handle(v interface{}) {
switch v := v.(type) {
// New job added
case *Job:
- // Get current time
+ // Get current time.
now := time.Now()
- // Update the next call time
+ // Update next call time.
next := v.timing.Next(now)
- storeTime(&v.next, next)
+ v.next.Store(next)
- // Append this job to queued
+ // Append this job to queued/
sch.jobs = append(sch.jobs, v)
// Job removed
case uint64:
for i := 0; i < len(sch.jobs); i++ {
if sch.jobs[i].id == v {
- // This is the job we're looking for! Drop this
+ // This is the job we're looking for! Drop this.
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
return
}
@@ -242,29 +247,28 @@ func (sch *Scheduler) handle(v interface{}) {
}
}
-// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
+// schedule will iterate through the scheduler jobs and
+// execute those necessary, updating their next call time.
func (sch *Scheduler) schedule(now time.Time) {
for i := 0; i < len(sch.jobs); {
- // Scope our own var
+ // Scope our own var.
job := sch.jobs[i]
// We know these jobs are ordered by .Next(), so as soon
- // as we reach one with .Next() after now, we can return
+ // as we reach one with .Next() after now, we can return.
if job.Next().After(now) {
return
}
- // Pass to runner
- sch.rgo(func() {
- job.Run(now)
- })
+ // Run the job.
+ go job.Run(now)
- // Update the next call time
+ // Update the next call time.
next := job.timing.Next(now)
- storeTime(&job.next, next)
+ job.next.Store(next)
if next.IsZero() {
- // Zero time, this job is done and can be dropped
+ // Zero time, this job is done and can be dropped.
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
continue
}
@@ -274,7 +278,8 @@ func (sch *Scheduler) schedule(now time.Time) {
}
}
-// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
+// byNext is an implementation of sort.Interface
+// to sort Jobs by their .Next() time.
type byNext []*Job
func (by byNext) Len() int {
@@ -288,3 +293,51 @@ func (by byNext) Less(i int, j int) bool {
func (by byNext) Swap(i int, j int) {
by[i], by[j] = by[j], by[i]
}
+
+// atomic_channel wraps a *channel{} with atomic store / load.
+type atomic_channel struct{ p unsafe.Pointer }
+
+func (c *atomic_channel) Load() *channel {
+ if p := atomic.LoadPointer(&c.p); p != nil {
+ return (*channel)(p)
+ }
+ return nil
+}
+
+func (c *atomic_channel) Store(v *channel) {
+ atomic.StorePointer(&c.p, unsafe.Pointer(v))
+}
+
+// channel wraps both a context done
+// channel and a generic interface channel
+// to support safe writing to an underlying
+// channel that correctly fails after close.
+type channel struct {
+ ctx <-chan struct{}
+ ch chan interface{}
+}
+
+// done returns internal context channel.
+func (ch *channel) done() <-chan struct{} {
+ return ch.ctx
+}
+
+// r returns internal channel for read.
+func (ch *channel) r() chan interface{} {
+ return ch.ch
+}
+
+// w writes 'v' to channel, or returns false if closed.
+func (ch *channel) w(v interface{}) bool {
+ select {
+ case <-ch.ctx:
+ return false
+ case ch.ch <- v:
+ return true
+ }
+}
+
+// close closes underlying channel.
+func (ch *channel) close() {
+ close(ch.ch)
+}
diff --git a/vendor/codeberg.org/gruf/go-sched/timing.go b/vendor/codeberg.org/gruf/go-sched/timing.go
index 33c230fa5..cb9b4925a 100644
--- a/vendor/codeberg.org/gruf/go-sched/timing.go
+++ b/vendor/codeberg.org/gruf/go-sched/timing.go
@@ -5,11 +5,13 @@ import (
)
var (
- // zerotime is zero time.Time (unix epoch).
- zerotime = time.Time{}
+ // zerotime is zero
+ // time.Time (unix epoch).
+ zerotime time.Time
- // emptytiming is a global timingempty to check against.
- emptytiming = timingempty{}
+ // emptytiming is a global
+ // timingempty to check against.
+ emptytiming timingempty
)
// Timing provides scheduling for a Job, determining the next time
@@ -20,14 +22,16 @@ type Timing interface {
Next(time.Time) time.Time
}
-// timingempty is a 'zero' Timing implementation that always returns zero time.
+// timingempty is a 'zero' Timing implementation
+// that always returns zero time.
type timingempty struct{}
func (timingempty) Next(time.Time) time.Time {
return zerotime
}
-// Once implements Timing to provide a run-once Job execution.
+// Once implements Timing to
+// provide a run-once Job execution.
type Once time.Time
func (o *Once) Next(time.Time) time.Time {
@@ -36,14 +40,16 @@ func (o *Once) Next(time.Time) time.Time {
return ret
}
-// Periodic implements Timing to provide a recurring Job execution.
+// Periodic implements Timing to
+// provide a recurring Job execution.
type Periodic time.Duration
func (p Periodic) Next(now time.Time) time.Time {
return now.Add(time.Duration(p))
}
-// PeriodicAt implements Timing to provide a recurring Job execution starting at 'Once' time.
+// PeriodicAt implements Timing to provide a
+// recurring Job execution starting at 'Once' time.
type PeriodicAt struct {
Once Once
Period Periodic
@@ -56,7 +62,8 @@ func (p *PeriodicAt) Next(now time.Time) time.Time {
return p.Period.Next(now)
}
-// TimingWrap allows combining two different Timing implementations.
+// TimingWrap allows combining two
+// different Timing implementations.
type TimingWrap struct {
Outer Timing
Inner Timing
diff --git a/vendor/github.com/tdewolff/parse/v2/binary.go b/vendor/github.com/tdewolff/parse/v2/binary.go
index cf4f91d4a..721864d12 100644
--- a/vendor/github.com/tdewolff/parse/v2/binary.go
+++ b/vendor/github.com/tdewolff/parse/v2/binary.go
@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
+ "sync"
)
const PageSize = 4096
@@ -45,17 +46,15 @@ func (r *binaryReaderFile) Len() int64 {
}
func (r *binaryReaderFile) Bytes(b []byte, n, off int64) ([]byte, error) {
- if _, err := r.f.Seek(off, 0); err != nil {
- return nil, err
- } else if b == nil {
+ if b == nil {
b = make([]byte, n)
}
-
- m, err := r.f.Read(b)
- if err != nil {
- return nil, err
+ if m, err := r.f.ReadAt(b, off); err != nil {
+ return b[:m], err
+ } else if off+int64(m) == r.size {
+ return b[:m], io.EOF
} else if int64(m) != n {
- return nil, errors.New("file: could not read all bytes")
+ return b[:m], errors.New("file: could not read all bytes")
}
return b, nil
}
@@ -79,16 +78,22 @@ func (r *binaryReaderBytes) Len() int64 {
}
func (r *binaryReaderBytes) Bytes(b []byte, n, off int64) ([]byte, error) {
- if off < 0 || n < 0 || int64(len(r.data)) < off || int64(len(r.data))-off < n {
+ var err error
+ if off < 0 || n < 0 {
return nil, fmt.Errorf("bytes: invalid range %d--%d", off, off+n)
+ } else if int64(len(r.data)) <= off {
+ return nil, io.EOF
+ } else if int64(len(r.data))-off <= n {
+ n = int64(len(r.data)) - off
+ err = io.EOF
}
data := r.data[off : off+n : off+n]
if b == nil {
- return data, nil
+ return data, err
}
copy(b, data)
- return b, nil
+ return b[:len(data)], err
}
type binaryReaderReader struct {
@@ -96,12 +101,13 @@ type binaryReaderReader struct {
size int64
readerAt bool
seeker bool
+ mu sync.Mutex
}
func newBinaryReaderReader(r io.Reader, n int64) *binaryReaderReader {
_, readerAt := r.(io.ReaderAt)
_, seeker := r.(io.Seeker)
- return &binaryReaderReader{r, n, readerAt, seeker}
+ return &binaryReaderReader{r, n, readerAt, seeker, sync.Mutex{}}
}
// Close closes the reader.
@@ -124,23 +130,30 @@ func (r *binaryReaderReader) Bytes(b []byte, n, off int64) ([]byte, error) {
// seeker seems faster than readerAt by 10%
if r.seeker {
+ r.mu.Lock()
if _, err := r.r.(io.Seeker).Seek(off, 0); err != nil {
+ r.mu.Unlock()
return nil, err
}
m, err := r.r.Read(b)
+ r.mu.Unlock()
if err != nil {
- return nil, err
+ return b[:m], err
+ } else if off+int64(m) == r.size {
+ return b[:m], io.EOF
} else if int64(m) != n {
- return nil, errors.New("file: could not read all bytes")
+ return b[:m], errors.New("reader: could not read all bytes")
}
return b, nil
} else if r.readerAt {
m, err := r.r.(io.ReaderAt).ReadAt(b, off)
if err != nil {
- return nil, err
+ return b[:m], err
+ } else if off+int64(m) == r.size {
+ return b[:m], io.EOF
} else if int64(m) != n {
- return nil, errors.New("file: could not read all bytes")
+ return b[:m], errors.New("reader: could not read all bytes")
}
return b, nil
}
@@ -197,12 +210,8 @@ func (r *BinaryReader) IBinaryReader() IBinaryReader {
}
func (r *BinaryReader) Clone() *BinaryReader {
- f := r.f
- if cloner, ok := f.(interface{ Clone() IBinaryReader }); ok {
- f = cloner.Clone()
- }
return &BinaryReader{
- f: f,
+ f: r.f,
pos: r.pos,
err: r.err,
ByteOrder: r.ByteOrder,
@@ -262,9 +271,6 @@ func (r *BinaryReader) Seek(off int64, whence int) (int64, error) {
// Read complies with io.Reader.
func (r *BinaryReader) Read(b []byte) (int, error) {
data, err := r.f.Bytes(b, int64(len(b)), r.pos)
- if err != nil && err != io.EOF {
- return 0, err
- }
r.pos += int64(len(data))
return len(data), err
}
@@ -272,20 +278,14 @@ func (r *BinaryReader) Read(b []byte) (int, error) {
// ReadAt complies with io.ReaderAt.
func (r *BinaryReader) ReadAt(b []byte, off int64) (int, error) {
data, err := r.f.Bytes(b, int64(len(b)), off)
- if err != nil && err != io.EOF {
- return 0, err
- }
return len(data), err
}
// ReadBytes reads n bytes.
func (r *BinaryReader) ReadBytes(n int64) []byte {
data, err := r.f.Bytes(nil, n, r.pos)
- if err != nil {
- r.err = err
- return nil
- }
- r.pos += n
+ r.pos += int64(len(data))
+ r.err = err
return data
}
diff --git a/vendor/github.com/tdewolff/parse/v2/binary_unix.go b/vendor/github.com/tdewolff/parse/v2/binary_unix.go
index 4a8979fda..25ec83dbc 100644
--- a/vendor/github.com/tdewolff/parse/v2/binary_unix.go
+++ b/vendor/github.com/tdewolff/parse/v2/binary_unix.go
@@ -5,8 +5,8 @@ package parse
import (
"errors"
"fmt"
+ "io"
"os"
- "runtime"
"syscall"
)
@@ -25,6 +25,8 @@ func newBinaryReaderMmap(filename string) (*binaryReaderMmap, error) {
info, err := f.Stat()
if err != nil {
return nil, err
+ } else if !info.Mode().IsRegular() {
+ return nil, fmt.Errorf("mmap: not a regular file: %v", filename)
}
size := info.Size()
@@ -48,7 +50,7 @@ func newBinaryReaderMmap(filename string) (*binaryReaderMmap, error) {
return nil, err
}
r := &binaryReaderMmap{data, size}
- runtime.SetFinalizer(r, (*binaryReaderMmap).Close)
+ //runtime.SetFinalizer(r, (*binaryReaderMmap).Close)
return r, nil
}
@@ -62,7 +64,7 @@ func (r *binaryReaderMmap) Close() error {
}
data := r.data
r.data = nil
- runtime.SetFinalizer(r, nil)
+ //runtime.SetFinalizer(r, nil)
return syscall.Munmap(data)
}
@@ -72,18 +74,24 @@ func (r *binaryReaderMmap) Len() int64 {
}
func (r *binaryReaderMmap) Bytes(b []byte, n, off int64) ([]byte, error) {
+ var err error
if r.data == nil {
return nil, errors.New("mmap: closed")
- } else if off < 0 || n < 0 || int64(len(r.data)) < off || int64(len(r.data))-off < n {
+ } else if off < 0 || n < 0 {
return nil, fmt.Errorf("mmap: invalid range %d--%d", off, off+n)
+ } else if int64(len(r.data)) <= off {
+ return nil, io.EOF
+ } else if int64(len(r.data))-off <= n {
+ n = int64(len(r.data)) - off
+ err = io.EOF
}
data := r.data[off : off+n : off+n]
if b == nil {
- return data, nil
+ return data, err
}
copy(b, data)
- return b, nil
+ return b[:len(data)], err
}
func NewBinaryReaderMmap(filename string) (*BinaryReader, error) {
diff --git a/vendor/github.com/tdewolff/parse/v2/html/lex.go b/vendor/github.com/tdewolff/parse/v2/html/lex.go
index e44a77ce8..a1954ff59 100644
--- a/vendor/github.com/tdewolff/parse/v2/html/lex.go
+++ b/vendor/github.com/tdewolff/parse/v2/html/lex.go
@@ -399,10 +399,12 @@ func (l *Lexer) shiftStartTag() (TokenType, []byte) {
func (l *Lexer) shiftAttribute() []byte {
nameStart := l.r.Pos()
var c byte
- if 0 < len(l.tmplBegin) && l.at(l.tmplBegin...) {
- l.r.Move(len(l.tmplBegin))
- l.moveTemplate()
- l.hasTmpl = true
+ if 0 < len(l.tmplBegin) {
+ for l.at(l.tmplBegin...) {
+ l.r.Move(len(l.tmplBegin))
+ l.moveTemplate()
+ l.hasTmpl = true
+ }
}
for { // attribute name state
if c = l.r.Peek(0); c == ' ' || c == '=' || c == '>' || c == '/' && l.r.Peek(1) == '>' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == 0 && l.r.Err() != nil {
diff --git a/vendor/modules.txt b/vendor/modules.txt
index d768c157e..ccf3a0b8e 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -227,7 +227,7 @@ codeberg.org/gruf/go-bytesize
# codeberg.org/gruf/go-byteutil v1.3.0
## explicit; go 1.20
codeberg.org/gruf/go-byteutil
-# codeberg.org/gruf/go-cache/v3 v3.6.1
+# codeberg.org/gruf/go-cache/v3 v3.6.2
## explicit; go 1.20
codeberg.org/gruf/go-cache/v3
codeberg.org/gruf/go-cache/v3/simple
@@ -277,10 +277,10 @@ codeberg.org/gruf/go-mempool
# codeberg.org/gruf/go-mutexes v1.5.8
## explicit; go 1.24.0
codeberg.org/gruf/go-mutexes
-# codeberg.org/gruf/go-runners v1.6.3
+# codeberg.org/gruf/go-runners v1.7.0
## explicit; go 1.19
codeberg.org/gruf/go-runners
-# codeberg.org/gruf/go-sched v1.2.4
+# codeberg.org/gruf/go-sched v1.3.0
## explicit; go 1.19
codeberg.org/gruf/go-sched
# codeberg.org/gruf/go-split v1.2.0
@@ -864,11 +864,11 @@ github.com/stretchr/testify/suite
# github.com/subosito/gotenv v1.6.0
## explicit; go 1.18
github.com/subosito/gotenv
-# github.com/tdewolff/minify/v2 v2.24.3
+# github.com/tdewolff/minify/v2 v2.24.4
## explicit; go 1.17
github.com/tdewolff/minify/v2
github.com/tdewolff/minify/v2/html
-# github.com/tdewolff/parse/v2 v2.8.3
+# github.com/tdewolff/parse/v2 v2.8.4
## explicit; go 1.11
github.com/tdewolff/parse/v2
github.com/tdewolff/parse/v2/buffer