diff options
author | 2023-02-13 18:40:48 +0000 | |
---|---|---|
committer | 2023-02-13 18:40:48 +0000 | |
commit | acc95923da555b2bf17a5638e62e533218c5840a (patch) | |
tree | 7df5d0636137efa5b49298a8f0ced81d35767a5b /vendor/codeberg.org/gruf/go-runners | |
parent | [docs] move federating with gotosocial documentation into single file (#1494) (diff) | |
download | gotosocial-acc95923da555b2bf17a5638e62e533218c5840a.tar.xz |
[performance] processing media and scheduled jobs improvements (#1482)
* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling
* improved code comment
* fix worker tryUntil function, update go-runners/go-sched
* make preprocess functions package public, use these where possible to stop doubled up processing
* remove separate emoji worker pool
* limit calls to time.Now() during media preprocessing
* use Processor{} to manage singular runtime of processing media
* ensure workers get started when media manager is used
* improved error setting in processing media, fix media test
* port changes from processingmedia to processing emoji
* finish code commenting
* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced
* linterrrrrrrrrrrrrrrr
---------
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners')
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/pool.go | 28 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/process.go | 75 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/run.go | 124 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 52 |
4 files changed, 141 insertions, 138 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/pool.go b/vendor/codeberg.org/gruf/go-runners/pool.go index 16222b2e1..3d9105986 100644 --- a/vendor/codeberg.org/gruf/go-runners/pool.go +++ b/vendor/codeberg.org/gruf/go-runners/pool.go @@ -157,6 +157,34 @@ func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool { } } +// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case +// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue(). +// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed. +func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) { + // Check valid fn + if fn == nil { + return false + } + + select { + case <-ctx.Done(): + // We failed to add this entry to the worker queue before the + // incoming context was cancelled. So to ensure processing + // we simply queue it asynchronously and return early to caller. + go pool.Enqueue(fn) + return false + + case <-pool.svc.Done(): + // Pool ctx cancelled + fn(closedctx) + return false + + case pool.fns <- fn: + // Placed fn in queue + return true + } +} + // EnqueueNow attempts Enqueue but returns false if not executed. func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool { // Check valid fn diff --git a/vendor/codeberg.org/gruf/go-runners/process.go b/vendor/codeberg.org/gruf/go-runners/process.go new file mode 100644 index 000000000..908e6edca --- /dev/null +++ b/vendor/codeberg.org/gruf/go-runners/process.go @@ -0,0 +1,75 @@ +package runners + +import ( + "fmt" + "sync" +) + +// Processable defines a runnable process with error return +// that can be passed to a Processor instance for managed running. +type Processable func() error + +// Processor acts similarly to a sync.Once object, except that it is reusable. After +// the first call to Process(), any further calls before this first has returned will +// block until the first call has returned, and return the same error. This ensures +// that only a single instance of it is ever running at any one time. +type Processor struct { + mutex sync.Mutex + state uint32 + wait sync.WaitGroup + err *error +} + +// Process will process the given function if first-call, else blocking until +// the first function has returned, returning the same error result. +func (p *Processor) Process(proc Processable) (err error) { + // Acquire state lock. + p.mutex.Lock() + + if p.state != 0 { + // Already running. + // + // Get current err ptr. + errPtr := p.err + + // Wait until finish. + p.mutex.Unlock() + p.wait.Wait() + return *errPtr + } + + // Reset error ptr. + p.err = new(error) + + // Set started. + p.wait.Add(1) + p.state = 1 + p.mutex.Unlock() + + defer func() { + if r := recover(); r != nil { + if err != nil { + rOld := r // wrap the panic so we don't lose existing returned error + r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) + } + + // Catch any panics and wrap as error. + err = fmt.Errorf("caught panic: %v", r) + } + + // Store error. + *p.err = err + + // Mark done. + p.wait.Done() + + // Set stopped. + p.mutex.Lock() + p.state = 0 + p.mutex.Unlock() + }() + + // Run process. + err = proc() + return +} diff --git a/vendor/codeberg.org/gruf/go-runners/run.go b/vendor/codeberg.org/gruf/go-runners/run.go deleted file mode 100644 index 67d19b40c..000000000 --- a/vendor/codeberg.org/gruf/go-runners/run.go +++ /dev/null @@ -1,124 +0,0 @@ -package runners - -import ( - "context" - "errors" - "fmt" - "time" - - "codeberg.org/gruf/go-atomics" -) - -// FuncRunner provides a means of managing long-running functions e.g. main logic loops. -type FuncRunner struct { - // HandOff is the time after which a blocking function will be considered handed off - HandOff time.Duration - - // ErrorHandler is the function that errors are passed to when encountered by the - // provided function. This can be used both for logging, and for error filtering - ErrorHandler func(err error) error - - svc Service // underlying service to manage start/stop - err atomics.Error -} - -// Go will attempt to run 'fn' asynchronously. The provided context is used to propagate requested -// cancel if FuncRunner.Stop() is called. Any returned error will be passed to FuncRunner.ErrorHandler -// for filtering/logging/etc. Any blocking functions will be waited on for FuncRunner.HandOff amount of -// time before considering the function as handed off. Returned bool is success state, i.e. returns true -// if function is successfully handed off or returns within hand off time with nil error. -func (r *FuncRunner) Go(fn func(ctx context.Context) error) bool { - var has bool - - done := make(chan struct{}) - - go func() { - var cancelled bool - - has = r.svc.Run(func(ctx context.Context) { - // reset error - r.err.Store(nil) - - // Run supplied func and set errror if returned - if err := Run(func() error { return fn(ctx) }); err != nil { - r.err.Store(err) - } - - // signal done - close(done) - - // Check if cancelled - select { - case <-ctx.Done(): - cancelled = true - default: - cancelled = false - } - }) - - switch has { - // returned after starting - case true: - // Load set error - err := r.err.Load() - - // filter out errors due FuncRunner.Stop() being called - if cancelled && errors.Is(err, context.Canceled) { - // filter out errors from FuncRunner.Stop() being called - r.err.Store(nil) - } else if err != nil && r.ErrorHandler != nil { - // pass any non-nil error to set handler - r.err.Store(r.ErrorHandler(err)) - } - - // already running - case false: - close(done) - } - }() - - // get valid handoff to use - handoff := r.HandOff - if handoff < 1 { - handoff = time.Second * 5 - } - - select { - // handed off (long-run successful) - case <-time.After(handoff): - return true - - // 'fn' returned, check error - case <-done: - return has - } -} - -// Stop will cancel the context supplied to the running function. -func (r *FuncRunner) Stop() bool { - return r.svc.Stop() -} - -// Err returns the last-set error value. -func (r *FuncRunner) Err() error { - return r.err.Load() -} - -// Run will execute the supplied 'fn' catching any panics. Returns either function-returned error or formatted panic. -func Run(fn func() error) (err error) { - defer func() { - if r := recover(); r != nil { - if e, ok := r.(error); ok { - // wrap and preserve existing error - err = fmt.Errorf("caught panic: %w", e) - } else { - // simply create new error fromt iface - err = fmt.Errorf("caught panic: %v", r) - } - } - }() - - // run supplied func - err = fn() - return -} diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index c019a10f6..8a7c0051a 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -9,7 +9,7 @@ import ( // changes and preventing multiple instances running. Also providing service state information. type Service struct { state uint32 // 0=stopped, 1=running, 2=stopping - mutex sync.Mutex // mutext protects overall state changes + mutex sync.Mutex // mutex protects overall state changes wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' ctx chan struct{} // ctx is the current context for running function (or nil if not running) } @@ -62,6 +62,29 @@ func (svc *Service) GoRun(fn func(context.Context)) bool { return true } +// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns. +func (svc *Service) RunWait(fn func(context.Context)) bool { + // Attempt to start the svc + ctx, ok := svc.doStart() + if !ok { + <-ctx // block + return false + } + + defer func() { + // unlock single wait + svc.wait.Unlock() + + // ensure stopped + _ = svc.Stop() + }() + + // Run with context. + fn(CancelCtx(ctx)) + + return true +} + // Stop will attempt to stop the service, cancelling the running function's context. Immediately // returns false if not running, and true only after Service is fully stopped. func (svc *Service) Stop() bool { @@ -108,28 +131,29 @@ func (svc *Service) doStart() (chan struct{}, bool) { // Protect startup svc.mutex.Lock() - if svc.state != 0 /* not stopped */ { - svc.mutex.Unlock() - return nil, false - } - - // state started - svc.state = 1 - if svc.ctx == nil { // this will only have been allocated // if svc.Done() was already called. svc.ctx = make(chan struct{}) } - // Start the waiter - svc.wait.Lock() - // Take our own ptr - // and unlock state ctx := svc.ctx - svc.mutex.Unlock() + if svc.state != 0 { + // State was not stopped. + svc.mutex.Unlock() + return ctx, false + } + + // Set started. + svc.state = 1 + + // Start waiter. + svc.wait.Lock() + + // Unlock and return + svc.mutex.Unlock() return ctx, true } |