diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-runners/service.go')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-runners/service.go | 311 |
1 files changed, 158 insertions, 153 deletions
diff --git a/vendor/codeberg.org/gruf/go-runners/service.go b/vendor/codeberg.org/gruf/go-runners/service.go index 8a7c0051a..fe41807f9 100644 --- a/vendor/codeberg.org/gruf/go-runners/service.go +++ b/vendor/codeberg.org/gruf/go-runners/service.go @@ -3,215 +3,220 @@ package runners import ( "context" "sync" + "sync/atomic" + "unsafe" ) -// Service provides a means of tracking a single long-running service, provided protected state -// changes and preventing multiple instances running. Also providing service state information. -type Service struct { - state uint32 // 0=stopped, 1=running, 2=stopping - mutex sync.Mutex // mutex protects overall state changes - wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex' - ctx chan struct{} // ctx is the current context for running function (or nil if not running) -} +// Service provides a means of tracking a single long-running Service, provided protected state +// changes and preventing multiple instances running. Also providing Service state information. +type Service struct{ p atomic_pointer } // Run will run the supplied function until completion, using given context to propagate cancel. // Immediately returns false if the Service is already running, and true after completed run. -func (svc *Service) Run(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) Run(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) - - return true + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return } // GoRun will run the supplied function until completion in a goroutine, using given context to // propagate cancel. Immediately returns boolean indicating success, or that service is already running. -func (svc *Service) GoRun(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) GoRun(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - return false + return } go func() { - defer func() { - // unlock single wait - svc.wait.Unlock() - - // ensure stopped - _ = svc.Stop() - }() - - // Run with context. - fn(CancelCtx(ctx)) + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) }() - return true + return } // RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns. -func (svc *Service) RunWait(fn func(context.Context)) bool { - // Attempt to start the svc - ctx, ok := svc.doStart() +func (svc *Service) RunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance + + // Attempt to start. + ptr, ok = svc.start() if !ok { - <-ctx // block - return false + <-ptr.done + return } - defer func() { - // unlock single wait - svc.wait.Unlock() + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + return +} - // ensure stopped - _ = svc.Stop() - }() +// GoRunWait is functionally the same as .RunWait(), but blocks until the first instance of RunWait() returns. +func (svc *Service) GoRunWait(fn func(context.Context)) (ok bool) { + var ptr *svc_instance - // Run with context. - fn(CancelCtx(ctx)) + // Attempt to start. + ptr, ok = svc.start() + if !ok { + <-ptr.done + return + } + + go func() { + // Run given function. + defer svc.on_done(ptr) + fn(CancelCtx(ptr.done)) + }() - return true + return } // Stop will attempt to stop the service, cancelling the running function's context. Immediately // returns false if not running, and true only after Service is fully stopped. func (svc *Service) Stop() bool { - // Attempt to stop the svc - ctx, ok := svc.doStop() - if !ok { - return false - } - - defer func() { - // Get svc lock - svc.mutex.Lock() - - // Wait until stopped - svc.wait.Lock() - svc.wait.Unlock() + return svc.must_get().stop() +} - // Reset the svc - svc.ctx = nil - svc.state = 0 - svc.mutex.Unlock() - }() +// Running returns if Service is running (i.e. NOT fully stopped, but may be *stopping*). +func (svc *Service) Running() bool { + return svc.must_get().running() +} - // Cancel ctx - close(ctx) +// Done returns a channel that's closed when Service.Stop() is called. It is +// the same channel provided to the currently running service function. +func (svc *Service) Done() <-chan struct{} { + return svc.must_get().done +} - return true +func (svc *Service) start() (*svc_instance, bool) { + ptr := svc.must_get() + return ptr, ptr.start() } -// While allows you to execute given function guaranteed within current -// service state. Please note that this will hold the underlying service -// state change mutex open while executing the function. -func (svc *Service) While(fn func()) { - // Protect state change - svc.mutex.Lock() - defer svc.mutex.Unlock() +func (svc *Service) on_done(ptr *svc_instance) { + // Ensure stopped. + ptr.stop_private() - // Run - fn() + // Free service. + svc.p.Store(nil) } -// doStart will safely set Service state to started, returning a ptr to this context insance. -func (svc *Service) doStart() (chan struct{}, bool) { - // Protect startup - svc.mutex.Lock() +func (svc *Service) must_get() *svc_instance { + var newptr *svc_instance - if svc.ctx == nil { - // this will only have been allocated - // if svc.Done() was already called. - svc.ctx = make(chan struct{}) - } - - // Take our own ptr - ctx := svc.ctx + for { + // Try to load existing instance. + ptr := (*svc_instance)(svc.p.Load()) + if ptr != nil { + return ptr + } - if svc.state != 0 { - // State was not stopped. - svc.mutex.Unlock() - return ctx, false - } + if newptr == nil { + // Allocate new instance. + newptr = new(svc_instance) + newptr.done = make(chan struct{}) + } - // Set started. - svc.state = 1 + // Attempt to acquire slot by setting our ptr. + if !svc.p.CAS(nil, unsafe.Pointer(newptr)) { + continue + } - // Start waiter. - svc.wait.Lock() + return newptr + } +} - // Unlock and return - svc.mutex.Unlock() - return ctx, true +type svc_instance struct { + wait sync.WaitGroup + done chan struct{} + state atomic.Uint32 } -// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance. -func (svc *Service) doStop() (chan struct{}, bool) { - // Protect stop - svc.mutex.Lock() +const ( + started_bit = uint32(1) << 0 + stopping_bit = uint32(1) << 1 + finished_bit = uint32(1) << 2 +) - if svc.state != 1 /* not started */ { - svc.mutex.Unlock() - return nil, false - } +func (i *svc_instance) start() (ok bool) { + // Acquire start by setting 'started' bit. + switch old := i.state.Or(started_bit); { - // state stopping - svc.state = 2 + case old&finished_bit != 0: + // Already finished. - // Take our own ptr - // and unlock state - ctx := svc.ctx - svc.mutex.Unlock() + case old&started_bit == 0: + // Successfully started! + i.wait.Add(1) + ok = true + } - return ctx, true + return } -// Running returns if Service is running (i.e. state NOT stopped / stopping). -func (svc *Service) Running() bool { - svc.mutex.Lock() - state := svc.state - svc.mutex.Unlock() - return (state == 1) -} +// NOTE: MAY ONLY BE CALLED BY STARTING GOROUTINE. +func (i *svc_instance) stop_private() { + // Attempt set both stopping and finished bits. + old := i.state.Or(stopping_bit | finished_bit) -// Done returns a channel that's closed when Service.Stop() is called. It is -// the same channel provided to the currently running service function. -func (svc *Service) Done() <-chan struct{} { - var done <-chan struct{} - - svc.mutex.Lock() - switch svc.state { - // stopped - case 0: - if svc.ctx == nil { - // here we create a new context so that the - // returned 'done' channel here will still - // be valid for when Service is next started. - svc.ctx = make(chan struct{}) - } - done = svc.ctx + // Only if we weren't already + // stopping do we close channel. + if old&stopping_bit == 0 { + close(i.done) + } - // started - case 1: - done = svc.ctx + // Release + // waiters. + i.wait.Done() +} - // stopping - case 2: - done = svc.ctx +func (i *svc_instance) stop() (ok bool) { + // Attempt to set the 'stopping' bit. + switch old := i.state.Or(stopping_bit); { + + case old&finished_bit != 0: + // Already finished. + return + + case old&started_bit == 0: + // This was never started + // to begin with, just mark + // as fully finished here. + _ = i.state.Or(finished_bit) + return + + case old&stopping_bit == 0: + // We succesfully stopped + // instance, close channel. + close(i.done) + ok = true } - svc.mutex.Unlock() - return done + // Wait on stop. + i.wait.Wait() + return +} + +// running returns whether service was started and +// is not yet finished. that indicates that it may +// have been started and not yet stopped, or that +// it was started, stopped and not yet returned. +func (i *svc_instance) running() bool { + val := i.state.Load() + return val&started_bit != 0 && + val&finished_bit == 0 } |
