diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-nowish/timeout.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-nowish/timeout.go | 245 |
1 files changed, 180 insertions, 65 deletions
diff --git a/vendor/codeberg.org/gruf/go-nowish/timeout.go b/vendor/codeberg.org/gruf/go-nowish/timeout.go index 1097f5d3d..7fe3e1d1d 100644 --- a/vendor/codeberg.org/gruf/go-nowish/timeout.go +++ b/vendor/codeberg.org/gruf/go-nowish/timeout.go @@ -6,113 +6,228 @@ import ( "time" ) -// Timeout provides a reusable structure for enforcing timeouts with a cancel +// Timeout provides a reusable structure for enforcing timeouts with a cancel. type Timeout struct { - noCopy noCopy //nolint noCopy because a copy will mess with atomics - - tk *time.Timer // tk is the underlying timeout-timer - ch syncer // ch is the cancel synchronization channel - wg sync.WaitGroup // wg is the waitgroup to hold .Start() until timeout goroutine started - st timeoutState // st stores the current timeout state (and protects concurrent use) + timer *time.Timer // timer is the underlying timeout-timer + cncl syncer // cncl is the cancel synchronization channel + next int64 // next is the next timeout duration to run on + state uint32 // state stores the current timeout state + mu sync.Mutex // mu protects state, and helps synchronize return of .Start() } -// NewTimeout returns a new Timeout instance +// NewTimeout returns a new Timeout instance. func NewTimeout() Timeout { - tk := time.NewTimer(time.Minute) - tk.Stop() // don't keep it running + timer := time.NewTimer(time.Minute) + timer.Stop() // don't keep it running return Timeout{ - tk: tk, - ch: make(syncer), + timer: timer, + cncl: make(syncer), } } -func (t *Timeout) runTimeout(hook func()) { - t.wg.Add(1) - go func() { - cancelled := false +// startTimeout is the main timeout routine, handling starting the +// timeout runner at first and upon any time extensions, and handling +// any received cancels by stopping the running timer. +func (t *Timeout) startTimeout(hook func()) { + var cancelled bool + + // Receive first timeout duration + d := atomic.SwapInt64(&t.next, 0) - // Signal started - t.wg.Done() + // Indicate finished starting, this + // was left locked by t.start(). + t.mu.Unlock() - select { - // Timeout reached - case <-t.tk.C: - if !t.st.stop() /* a sneaky cancel! */ { - t.ch.recv() + for { + // Run supplied timeout + cancelled = t.runTimeout(d) + if cancelled { + break + } + + // Check for extension or set timed out + d = atomic.SwapInt64(&t.next, 0) + if d < 1 { + if t.timedOut() { + // timeout reached + hook() + break + } else { + // already cancelled + t.cncl.wait() cancelled = true - defer t.ch.send() + break } + } - // Cancel called - case <-t.ch: + if !t.extend() { + // already cancelled + t.cncl.wait() cancelled = true - defer t.ch.send() + break } + } - // Ensure timer stopped - if cancelled && !t.tk.Stop() { - <-t.tk.C - } + if cancelled { + // Release the .Cancel() + defer t.cncl.notify() + } + + // Mark as done + t.reset() +} - // Defer reset state - defer t.st.reset() +// runTimeout will until supplied timeout or cancel called. +func (t *Timeout) runTimeout(d int64) (cancelled bool) { + // Start the timer for 'd' + t.timer.Reset(time.Duration(d)) + + select { + // Timeout reached + case <-t.timer.C: + if !t.timingOut() { + // a sneaky cancel! + t.cncl.wait() + cancelled = true + } - // If timed out call hook - if !cancelled { - hook() + // Cancel called + case <-t.cncl.wait(): + cancelled = true + if !t.timer.Stop() { + <-t.timer.C } - }() - t.wg.Wait() + } + + return cancelled } // Start starts the timer with supplied timeout. If timeout is reached before -// cancel then supplied timeout hook will be called. Error may be called if -// Timeout is already running when this function is called +// cancel then supplied timeout hook will be called. Panic will be called if +// Timeout is already running when calling this function. func (t *Timeout) Start(d time.Duration, hook func()) { - if !t.st.start() { - panic("nowish: timeout already started") + if !t.start() { + t.mu.Unlock() // need to unlock + panic("timeout already started") + } + + // Start the timeout + atomic.StoreInt64(&t.next, int64(d)) + go t.startTimeout(hook) + + // Wait until start + t.mu.Lock() + t.mu.Unlock() +} + +// Extend will attempt to extend the timeout runner's time, returns false if not running. +func (t *Timeout) Extend(d time.Duration) bool { + var ok bool + if ok = t.running(); ok { + atomic.AddInt64(&t.next, int64(d)) } - t.runTimeout(hook) - t.tk.Reset(d) + return ok } // Cancel cancels the currently running timer. If a cancel is achieved, then -// this function will return after the timeout goroutine is finished +// this function will return after the timeout goroutine is finished. func (t *Timeout) Cancel() { - if !t.st.stop() { + if !t.cancel() { return } - t.ch.send() - t.ch.recv() + t.cncl.notify() + <-t.cncl.wait() +} + +// possible timeout states. +const ( + stopped = 0 + started = 1 + timingOut = 2 + cancelled = 3 + timedOut = 4 +) + +// cas will perform a compare and swap where the compare is a provided function. +func (t *Timeout) cas(check func(uint32) bool, swap uint32) bool { + var cas bool + + t.mu.Lock() + if cas = check(t.state); cas { + t.state = swap + } + t.mu.Unlock() + + return cas +} + +// start attempts to mark the timeout state as 'started', note DOES NOT unlock Timeout.mu. +func (t *Timeout) start() bool { + var ok bool + + t.mu.Lock() + if ok = (t.state == stopped); ok { + t.state = started + } + + // don't unlock + return ok } -// timeoutState provides a thread-safe timeout state mechanism -type timeoutState uint32 +// timingOut attempts to mark the timeout state as 'timing out'. +func (t *Timeout) timingOut() bool { + return t.cas(func(u uint32) bool { + return (u == started) + }, timingOut) +} + +// timedOut attempts mark the 'timing out' state as 'timed out'. +func (t *Timeout) timedOut() bool { + return t.cas(func(u uint32) bool { + return (u == timingOut) + }, timedOut) +} + +// extend attempts to extend a 'timing out' state by moving it back to 'started'. +func (t *Timeout) extend() bool { + return t.cas(func(u uint32) bool { + return (u == started) || + (u == timingOut) + }, started) +} -// start attempts to start the state, must be already reset, returns success -func (t *timeoutState) start() bool { - return atomic.CompareAndSwapUint32((*uint32)(t), 0, 1) +// running returns whether the state is anything other than 'stopped'. +func (t *Timeout) running() bool { + t.mu.Lock() + running := (t.state != stopped) + t.mu.Unlock() + return running } -// stop attempts to stop the state, must already be started, returns success -func (t *timeoutState) stop() bool { - return atomic.CompareAndSwapUint32((*uint32)(t), 1, 2) +// cancel attempts to mark the timeout state as 'cancelled'. +func (t *Timeout) cancel() bool { + return t.cas(func(u uint32) bool { + return (u == started) || + (u == timingOut) + }, cancelled) } -// reset is fairly self explanatory -func (t *timeoutState) reset() { - atomic.StoreUint32((*uint32)(t), 0) +// reset marks the timeout state as 'stopped'. +func (t *Timeout) reset() { + t.mu.Lock() + t.state = stopped + t.mu.Unlock() } -// syncer provides helpful receiver methods for a synchronization channel +// syncer provides helpful receiver methods for a synchronization channel. type syncer (chan struct{}) -// send blocks on sending an empty value down channel -func (s syncer) send() { +// notify blocks on sending an empty value down channel. +func (s syncer) notify() { s <- struct{}{} } -// recv blocks on receiving (and dropping) empty value from channel -func (s syncer) recv() { - <-s +// wait returns the underlying channel for blocking until '.notify()'. +func (s syncer) wait() <-chan struct{} { + return s } |