From 635ad2a42f10a5b24f08021782b71b4cf8326e19 Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Mon, 20 Dec 2021 09:35:32 +0000 Subject: Update codeberg.org/gruf libraries and fix go-store issue (#347) * update codeberg.org/gruf/ libraries Signed-off-by: kim * another update Signed-off-by: kim --- vendor/codeberg.org/gruf/go-nowish/clock.go | 132 ++++++++++++++ vendor/codeberg.org/gruf/go-nowish/time.go | 141 --------------- vendor/codeberg.org/gruf/go-nowish/timeout.go | 245 +++++++++++++++++++------- 3 files changed, 312 insertions(+), 206 deletions(-) create mode 100644 vendor/codeberg.org/gruf/go-nowish/clock.go delete mode 100644 vendor/codeberg.org/gruf/go-nowish/time.go (limited to 'vendor/codeberg.org/gruf/go-nowish') diff --git a/vendor/codeberg.org/gruf/go-nowish/clock.go b/vendor/codeberg.org/gruf/go-nowish/clock.go new file mode 100644 index 000000000..781e59f18 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-nowish/clock.go @@ -0,0 +1,132 @@ +package nowish + +import ( + "sync" + "sync/atomic" + "time" + "unsafe" +) + +// Start returns a new Clock instance initialized and +// started with the provided precision, along with the +// stop function for it's underlying timer +func Start(precision time.Duration) (*Clock, func()) { + c := Clock{} + return &c, c.Start(precision) +} + +type Clock struct { + // format stores the time formatting style string + format string + + // valid indicates whether the current value stored in .Format is valid + valid uint32 + + // mutex protects writes to .Format, not because it would be unsafe, but + // because we want to minimize unnnecessary allocations + mutex sync.Mutex + + // nowfmt is an unsafe pointer to the last-updated time format string + nowfmt unsafe.Pointer + + // now is an unsafe pointer to the last-updated time.Time object + now unsafe.Pointer +} + +// Start starts the clock with the provided precision, the returned +// function is the stop function for the underlying timer. For >= 2ms, +// actual precision is usually within AT LEAST 10% of requested precision, +// less than this and the actual precision very quickly deteriorates. +func (c *Clock) Start(precision time.Duration) func() { + // Create ticker from duration + tick := time.NewTicker(precision / 10) + + // Set initial time + t := time.Now() + atomic.StorePointer(&c.now, unsafe.Pointer(&t)) + + // Set initial format + s := "" + atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&s)) + + // If formatting string unset, set default + c.mutex.Lock() + if c.format == "" { + c.format = time.RFC822 + } + c.mutex.Unlock() + + // Start main routine + go c.run(tick) + + // Return stop fn + return tick.Stop +} + +// run is the internal clock ticking loop. +func (c *Clock) run(tick *time.Ticker) { + for { + // Wait on tick + _, ok := <-tick.C + + // Channel closed + if !ok { + break + } + + // Update time + t := time.Now() + atomic.StorePointer(&c.now, unsafe.Pointer(&t)) + + // Invalidate format string + atomic.StoreUint32(&c.valid, 0) + } +} + +// Now returns a good (ish) estimate of the current 'now' time. +func (c *Clock) Now() time.Time { + return *(*time.Time)(atomic.LoadPointer(&c.now)) +} + +// NowFormat returns the formatted "now" time, cached until next tick and "now" updates. +func (c *Clock) NowFormat() string { + // If format still valid, return this + if atomic.LoadUint32(&c.valid) == 1 { + return *(*string)(atomic.LoadPointer(&c.nowfmt)) + } + + // Get mutex lock + c.mutex.Lock() + + // Double check still invalid + if atomic.LoadUint32(&c.valid) == 1 { + c.mutex.Unlock() + return *(*string)(atomic.LoadPointer(&c.nowfmt)) + } + + // Calculate time format + nowfmt := c.Now().Format(c.format) + + // Update the stored value and set valid! + atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&nowfmt)) + atomic.StoreUint32(&c.valid, 1) + + // Unlock and return + c.mutex.Unlock() + return nowfmt +} + +// SetFormat sets the time format string used by .NowFormat(). +func (c *Clock) SetFormat(format string) { + // Get mutex lock + c.mutex.Lock() + + // Update time format + c.format = format + + // Invalidate current format string + atomic.StoreUint32(&c.valid, 0) + + // Unlock + c.mutex.Unlock() +} diff --git a/vendor/codeberg.org/gruf/go-nowish/time.go b/vendor/codeberg.org/gruf/go-nowish/time.go deleted file mode 100644 index 674ff5669..000000000 --- a/vendor/codeberg.org/gruf/go-nowish/time.go +++ /dev/null @@ -1,141 +0,0 @@ -package nowish - -import ( - "sync" - "sync/atomic" - "time" - "unsafe" -) - -// Start returns a new Clock instance initialized and -// started with the provided precision, along with the -// stop function for it's underlying timer -func Start(precision time.Duration) (*Clock, func()) { - c := Clock{} - return &c, c.Start(precision) -} - -type Clock struct { - noCopy noCopy //nolint noCopy because a copy will fuck with atomics - - // format stores the time formatting style string - format string - - // valid indicates whether the current value stored in .Format is valid - valid uint32 - - // mutex protects writes to .Format, not because it would be unsafe, but - // because we want to minimize unnnecessary allocations - mutex sync.Mutex - - // nowfmt is an unsafe pointer to the last-updated time format string - nowfmt unsafe.Pointer - - // now is an unsafe pointer to the last-updated time.Time object - now unsafe.Pointer -} - -// Start starts the clock with the provided precision, the -// returned function is the stop function for the underlying timer -func (c *Clock) Start(precision time.Duration) func() { - // Create ticker from duration - tick := time.NewTicker(precision) - - // Set initial time - t := time.Now() - atomic.StorePointer(&c.now, unsafe.Pointer(&t)) - - // Set initial format - s := "" - atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&s)) - - // If formatting string unset, set default - c.mutex.Lock() - if c.format == "" { - c.format = time.RFC822 - } - c.mutex.Unlock() - - // Start main routine - go c.run(tick) - - // Return stop fn - return tick.Stop -} - -// run is the internal clock ticking loop -func (c *Clock) run(tick *time.Ticker) { - for { - // Wait on tick - _, ok := <-tick.C - - // Channel closed - if !ok { - break - } - - // Update time - t := time.Now() - atomic.StorePointer(&c.now, unsafe.Pointer(&t)) - - // Invalidate format string - atomic.StoreUint32(&c.valid, 0) - } -} - -// Now returns a good (ish) estimate of the current 'now' time -func (c *Clock) Now() time.Time { - return *(*time.Time)(atomic.LoadPointer(&c.now)) -} - -// NowFormat returns the formatted "now" time, cached until next tick and "now" updates -func (c *Clock) NowFormat() string { - // If format still valid, return this - if atomic.LoadUint32(&c.valid) == 1 { - return *(*string)(atomic.LoadPointer(&c.nowfmt)) - } - - // Get mutex lock - c.mutex.Lock() - - // Double check still invalid - if atomic.LoadUint32(&c.valid) == 1 { - c.mutex.Unlock() - return *(*string)(atomic.LoadPointer(&c.nowfmt)) - } - - // Calculate time format - b := c.Now().AppendFormat( - make([]byte, 0, len(c.format)), - c.format, - ) - - // Update the stored value and set valid! - atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&b)) - atomic.StoreUint32(&c.valid, 1) - - // Unlock and return - c.mutex.Unlock() - - // Note: - // it's safe to do this conversion here - // because this byte slice will never change. - // and we have the direct pointer to it, we're - // not requesting it atomicly via c.Format - return *(*string)(unsafe.Pointer(&b)) -} - -// SetFormat sets the time format string used by .NowFormat() -func (c *Clock) SetFormat(format string) { - // Get mutex lock - c.mutex.Lock() - - // Update time format - c.format = format - - // Invalidate current format string - atomic.StoreUint32(&c.valid, 0) - - // Unlock - c.mutex.Unlock() -} diff --git a/vendor/codeberg.org/gruf/go-nowish/timeout.go b/vendor/codeberg.org/gruf/go-nowish/timeout.go index 1097f5d3d..7fe3e1d1d 100644 --- a/vendor/codeberg.org/gruf/go-nowish/timeout.go +++ b/vendor/codeberg.org/gruf/go-nowish/timeout.go @@ -6,113 +6,228 @@ import ( "time" ) -// Timeout provides a reusable structure for enforcing timeouts with a cancel +// Timeout provides a reusable structure for enforcing timeouts with a cancel. type Timeout struct { - noCopy noCopy //nolint noCopy because a copy will mess with atomics - - tk *time.Timer // tk is the underlying timeout-timer - ch syncer // ch is the cancel synchronization channel - wg sync.WaitGroup // wg is the waitgroup to hold .Start() until timeout goroutine started - st timeoutState // st stores the current timeout state (and protects concurrent use) + timer *time.Timer // timer is the underlying timeout-timer + cncl syncer // cncl is the cancel synchronization channel + next int64 // next is the next timeout duration to run on + state uint32 // state stores the current timeout state + mu sync.Mutex // mu protects state, and helps synchronize return of .Start() } -// NewTimeout returns a new Timeout instance +// NewTimeout returns a new Timeout instance. func NewTimeout() Timeout { - tk := time.NewTimer(time.Minute) - tk.Stop() // don't keep it running + timer := time.NewTimer(time.Minute) + timer.Stop() // don't keep it running return Timeout{ - tk: tk, - ch: make(syncer), + timer: timer, + cncl: make(syncer), } } -func (t *Timeout) runTimeout(hook func()) { - t.wg.Add(1) - go func() { - cancelled := false +// startTimeout is the main timeout routine, handling starting the +// timeout runner at first and upon any time extensions, and handling +// any received cancels by stopping the running timer. +func (t *Timeout) startTimeout(hook func()) { + var cancelled bool + + // Receive first timeout duration + d := atomic.SwapInt64(&t.next, 0) - // Signal started - t.wg.Done() + // Indicate finished starting, this + // was left locked by t.start(). + t.mu.Unlock() - select { - // Timeout reached - case <-t.tk.C: - if !t.st.stop() /* a sneaky cancel! */ { - t.ch.recv() + for { + // Run supplied timeout + cancelled = t.runTimeout(d) + if cancelled { + break + } + + // Check for extension or set timed out + d = atomic.SwapInt64(&t.next, 0) + if d < 1 { + if t.timedOut() { + // timeout reached + hook() + break + } else { + // already cancelled + t.cncl.wait() cancelled = true - defer t.ch.send() + break } + } - // Cancel called - case <-t.ch: + if !t.extend() { + // already cancelled + t.cncl.wait() cancelled = true - defer t.ch.send() + break } + } - // Ensure timer stopped - if cancelled && !t.tk.Stop() { - <-t.tk.C - } + if cancelled { + // Release the .Cancel() + defer t.cncl.notify() + } + + // Mark as done + t.reset() +} - // Defer reset state - defer t.st.reset() +// runTimeout will until supplied timeout or cancel called. +func (t *Timeout) runTimeout(d int64) (cancelled bool) { + // Start the timer for 'd' + t.timer.Reset(time.Duration(d)) + + select { + // Timeout reached + case <-t.timer.C: + if !t.timingOut() { + // a sneaky cancel! + t.cncl.wait() + cancelled = true + } - // If timed out call hook - if !cancelled { - hook() + // Cancel called + case <-t.cncl.wait(): + cancelled = true + if !t.timer.Stop() { + <-t.timer.C } - }() - t.wg.Wait() + } + + return cancelled } // Start starts the timer with supplied timeout. If timeout is reached before -// cancel then supplied timeout hook will be called. Error may be called if -// Timeout is already running when this function is called +// cancel then supplied timeout hook will be called. Panic will be called if +// Timeout is already running when calling this function. func (t *Timeout) Start(d time.Duration, hook func()) { - if !t.st.start() { - panic("nowish: timeout already started") + if !t.start() { + t.mu.Unlock() // need to unlock + panic("timeout already started") + } + + // Start the timeout + atomic.StoreInt64(&t.next, int64(d)) + go t.startTimeout(hook) + + // Wait until start + t.mu.Lock() + t.mu.Unlock() +} + +// Extend will attempt to extend the timeout runner's time, returns false if not running. +func (t *Timeout) Extend(d time.Duration) bool { + var ok bool + if ok = t.running(); ok { + atomic.AddInt64(&t.next, int64(d)) } - t.runTimeout(hook) - t.tk.Reset(d) + return ok } // Cancel cancels the currently running timer. If a cancel is achieved, then -// this function will return after the timeout goroutine is finished +// this function will return after the timeout goroutine is finished. func (t *Timeout) Cancel() { - if !t.st.stop() { + if !t.cancel() { return } - t.ch.send() - t.ch.recv() + t.cncl.notify() + <-t.cncl.wait() +} + +// possible timeout states. +const ( + stopped = 0 + started = 1 + timingOut = 2 + cancelled = 3 + timedOut = 4 +) + +// cas will perform a compare and swap where the compare is a provided function. +func (t *Timeout) cas(check func(uint32) bool, swap uint32) bool { + var cas bool + + t.mu.Lock() + if cas = check(t.state); cas { + t.state = swap + } + t.mu.Unlock() + + return cas +} + +// start attempts to mark the timeout state as 'started', note DOES NOT unlock Timeout.mu. +func (t *Timeout) start() bool { + var ok bool + + t.mu.Lock() + if ok = (t.state == stopped); ok { + t.state = started + } + + // don't unlock + return ok } -// timeoutState provides a thread-safe timeout state mechanism -type timeoutState uint32 +// timingOut attempts to mark the timeout state as 'timing out'. +func (t *Timeout) timingOut() bool { + return t.cas(func(u uint32) bool { + return (u == started) + }, timingOut) +} + +// timedOut attempts mark the 'timing out' state as 'timed out'. +func (t *Timeout) timedOut() bool { + return t.cas(func(u uint32) bool { + return (u == timingOut) + }, timedOut) +} + +// extend attempts to extend a 'timing out' state by moving it back to 'started'. +func (t *Timeout) extend() bool { + return t.cas(func(u uint32) bool { + return (u == started) || + (u == timingOut) + }, started) +} -// start attempts to start the state, must be already reset, returns success -func (t *timeoutState) start() bool { - return atomic.CompareAndSwapUint32((*uint32)(t), 0, 1) +// running returns whether the state is anything other than 'stopped'. +func (t *Timeout) running() bool { + t.mu.Lock() + running := (t.state != stopped) + t.mu.Unlock() + return running } -// stop attempts to stop the state, must already be started, returns success -func (t *timeoutState) stop() bool { - return atomic.CompareAndSwapUint32((*uint32)(t), 1, 2) +// cancel attempts to mark the timeout state as 'cancelled'. +func (t *Timeout) cancel() bool { + return t.cas(func(u uint32) bool { + return (u == started) || + (u == timingOut) + }, cancelled) } -// reset is fairly self explanatory -func (t *timeoutState) reset() { - atomic.StoreUint32((*uint32)(t), 0) +// reset marks the timeout state as 'stopped'. +func (t *Timeout) reset() { + t.mu.Lock() + t.state = stopped + t.mu.Unlock() } -// syncer provides helpful receiver methods for a synchronization channel +// syncer provides helpful receiver methods for a synchronization channel. type syncer (chan struct{}) -// send blocks on sending an empty value down channel -func (s syncer) send() { +// notify blocks on sending an empty value down channel. +func (s syncer) notify() { s <- struct{}{} } -// recv blocks on receiving (and dropping) empty value from channel -func (s syncer) recv() { - <-s +// wait returns the underlying channel for blocking until '.notify()'. +func (s syncer) wait() <-chan struct{} { + return s } -- cgit v1.2.3