summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r--vendor/codeberg.org/gruf/go-mutexes/map.go335
-rw-r--r--vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go67
-rw-r--r--vendor/codeberg.org/gruf/go-nowish/LICENSE9
-rw-r--r--vendor/codeberg.org/gruf/go-nowish/README.md3
-rw-r--r--vendor/codeberg.org/gruf/go-nowish/clock.go132
-rw-r--r--vendor/codeberg.org/gruf/go-nowish/timeout.go233
-rw-r--r--vendor/codeberg.org/gruf/go-nowish/util.go10
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/iterator.go10
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/state.go82
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/store.go40
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/block.go86
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/disk.go67
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/errors.go14
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/fs.go9
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/lock.go75
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/memory.go78
16 files changed, 637 insertions, 613 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go
index ea917ee5e..cb31a9543 100644
--- a/vendor/codeberg.org/gruf/go-mutexes/map.go
+++ b/vendor/codeberg.org/gruf/go-mutexes/map.go
@@ -1,105 +1,322 @@
package mutexes
import (
+ "runtime"
"sync"
+ "sync/atomic"
)
+// locktype defines maskable mutexmap lock types.
+type locktype uint8
+
+const (
+ // possible lock types.
+ lockTypeRead = locktype(1) << 0
+ lockTypeWrite = locktype(1) << 1
+ lockTypeMap = locktype(1) << 2
+
+ // possible mutexmap states.
+ stateUnlockd = uint8(0)
+ stateRLocked = uint8(1)
+ stateLocked = uint8(2)
+ stateInUse = uint8(3)
+)
+
+// permitLockType returns if provided locktype is permitted to go ahead in current state.
+func permitLockType(state uint8, lt locktype) bool {
+ switch state {
+ // Unlocked state
+ // (all allowed)
+ case stateUnlockd:
+ return true
+
+ // Keys locked, no state lock.
+ // (don't allow map locks)
+ case stateInUse:
+ return lt&lockTypeMap == 0
+
+ // Read locked
+ // (only allow read locks)
+ case stateRLocked:
+ return lt&lockTypeRead != 0
+
+ // Write locked
+ // (none allowed)
+ case stateLocked:
+ return false
+
+ // shouldn't reach here
+ default:
+ panic("unexpected state")
+ }
+}
+
// MutexMap is a structure that allows having a map of self-evicting mutexes
// by key. You do not need to worry about managing the contents of the map,
// only requesting RLock/Lock for keys, and ensuring to call the returned
// unlock functions.
type MutexMap struct {
- // NOTE:
- // Individual keyed mutexes should ONLY ever
- // be locked within the protection of the outer
- // mapMu lock. If you lock these outside the
- // protection of this, there is a chance for
- // deadlocks
-
mus map[string]RWMutex
mapMu sync.Mutex
pool sync.Pool
+ queue []func()
+ evict []func()
+ count int32
+ maxmu int32
+ state uint8
}
-// NewMap returns a new MutexMap instance based on supplied
-// RWMutex allocator function, nil implies use default
-func NewMap(newFn func() RWMutex) MutexMap {
- if newFn == nil {
- newFn = NewRW
+// NewMap returns a new MutexMap instance with provided max no. open mutexes.
+func NewMap(max int32) MutexMap {
+ if max < 1 {
+ // Default = 128 * GOMAXPROCS
+ procs := runtime.GOMAXPROCS(0)
+ max = int32(procs * 128)
}
return MutexMap{
- mus: make(map[string]RWMutex),
- mapMu: sync.Mutex{},
+ mus: make(map[string]RWMutex),
pool: sync.Pool{
New: func() interface{} {
- return newFn()
+ return NewRW()
},
},
+ maxmu: max,
+ }
+}
+
+// acquire will either acquire a mutex from pool or alloc.
+func (mm *MutexMap) acquire() RWMutex {
+ return mm.pool.Get().(RWMutex)
+}
+
+// release will release provided mutex to pool.
+func (mm *MutexMap) release(mu RWMutex) {
+ mm.pool.Put(mu)
+}
+
+// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true,
+// returning with map lock. Note that 'cond' is performed within a map lock.
+func (mm *MutexMap) spinLock(cond func() bool) {
+ mu := mm.acquire()
+ defer mm.release(mu)
+
+ for {
+ // Get map lock
+ mm.mapMu.Lock()
+
+ // Check if return
+ if cond() {
+ return
+ }
+
+ // Queue ourselves
+ unlock := mu.Lock()
+ mm.queue = append(mm.queue, unlock)
+ mm.mapMu.Unlock()
+
+ // Wait on notify
+ mu.Lock()()
+ }
+}
+
+// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return.
+func (mm *MutexMap) lockMutex(key string, lt locktype) func() {
+ var unlock func()
+
+ // Incr counter
+ mm.count++
+
+ // Check for existing mutex at key
+ mu, ok := mm.mus[key]
+ if !ok {
+ // Alloc from pool
+ mu = mm.acquire()
+ mm.mus[key] = mu
+
+ // Queue mutex for eviction
+ mm.evict = append(mm.evict, func() {
+ delete(mm.mus, key)
+ mm.pool.Put(mu)
+ })
+ }
+
+ // If no state, set in use.
+ // State will already have been
+ // set if this is from LockState{}
+ if mm.state == stateUnlockd {
+ mm.state = stateInUse
+ }
+
+ switch {
+ // Read lock
+ case lt&lockTypeRead != 0:
+ unlock = mu.RLock()
+
+ // Write lock
+ case lt&lockTypeWrite != 0:
+ unlock = mu.Lock()
+
+ // shouldn't reach here
+ default:
+ panic("unexpected lock type")
+ }
+
+ // Unlock map + return
+ mm.mapMu.Unlock()
+ return func() {
+ mm.mapMu.Lock()
+ unlock()
+ go mm.onUnlock()
}
}
-func (mm *MutexMap) evict(key string, mu RWMutex) {
- // Acquire map lock
- mm.mapMu.Lock()
+// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex.
+func (mm *MutexMap) onUnlock() {
+ // Decr counter
+ mm.count--
+
+ if mm.count < 1 {
+ // Perform all queued evictions
+ for i := 0; i < len(mm.evict); i++ {
+ mm.evict[i]()
+ }
- // Toggle mutex lock to
- // ensure it is unused
- unlock := mu.Lock()
- unlock()
+ // Notify all waiting goroutines
+ for i := 0; i < len(mm.queue); i++ {
+ mm.queue[i]()
+ }
- // Delete mutex key
- delete(mm.mus, key)
+ // Reset the map state
+ mm.evict = nil
+ mm.queue = nil
+ mm.state = stateUnlockd
+ }
+
+ // Finally, unlock
mm.mapMu.Unlock()
+}
- // Release to pool
- mm.pool.Put(mu)
+// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.
+// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
+func (mm *MutexMap) RLockMap() *LockState {
+ return mm.getMapLock(lockTypeRead)
}
-// RLock acquires a mutex read lock for supplied key, returning an RUnlock function
-func (mm *MutexMap) RLock(key string) func() {
- return mm.getLock(key, func(mu RWMutex) func() {
- return mu.RLock()
+// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.
+// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
+func (mm *MutexMap) LockMap() *LockState {
+ return mm.getMapLock(lockTypeWrite)
+}
+
+// RLock acquires a mutex read lock for supplied key, returning an RUnlock function.
+func (mm *MutexMap) RLock(key string) (runlock func()) {
+ return mm.getLock(key, lockTypeRead)
+}
+
+// Lock acquires a mutex write lock for supplied key, returning an Unlock function.
+func (mm *MutexMap) Lock(key string) (unlock func()) {
+ return mm.getLock(key, lockTypeWrite)
+}
+
+// getLock will fetch lock of provided type, for given key, returning unlock function.
+func (mm *MutexMap) getLock(key string, lt locktype) func() {
+ // Spin until achieve lock
+ mm.spinLock(func() bool {
+ return permitLockType(mm.state, lt) &&
+ mm.count < mm.maxmu // not overloaded
})
+
+ // Perform actual mutex lock
+ return mm.lockMutex(key, lt)
}
-// Lock acquires a mutex lock for supplied key, returning an Unlock function
-func (mm *MutexMap) Lock(key string) func() {
- return mm.getLock(key, func(mu RWMutex) func() {
- return mu.Lock()
+// getMapLock will acquire a map lock of provided type, returning a LockState session.
+func (mm *MutexMap) getMapLock(lt locktype) *LockState {
+ // Spin until achieve lock
+ mm.spinLock(func() bool {
+ return permitLockType(mm.state, lt|lockTypeMap) &&
+ mm.count < mm.maxmu // not overloaded
})
+
+ // Incr counter
+ mm.count++
+
+ switch {
+ // Set read lock state
+ case lt&lockTypeRead != 0:
+ mm.state = stateRLocked
+
+ // Set write lock state
+ case lt&lockTypeWrite != 0:
+ mm.state = stateLocked
+
+ default:
+ panic("unexpected lock type")
+ }
+
+ // Unlock + return
+ mm.mapMu.Unlock()
+ return &LockState{
+ mmap: mm,
+ ltyp: lt,
+ }
}
-func (mm *MutexMap) getLock(key string, doLock func(RWMutex) func()) func() {
- // Get map lock
- mm.mapMu.Lock()
+// LockState represents a window to a locked MutexMap.
+type LockState struct {
+ wait sync.WaitGroup
+ mmap *MutexMap
+ done uint32
+ ltyp locktype
+}
- // Look for mutex
- mu, ok := mm.mus[key]
- if ok {
- // Lock and return
- // its unlocker func
- unlock := doLock(mu)
- mm.mapMu.Unlock()
- return unlock
+// Lock: see MutexMap.Lock() definition. Will panic if map only read locked.
+func (st *LockState) Lock(key string) (unlock func()) {
+ return st.getLock(key, lockTypeWrite)
+}
+
+// RLock: see MutexMap.RLock() definition.
+func (st *LockState) RLock(key string) (runlock func()) {
+ return st.getLock(key, lockTypeRead)
+}
+
+// UnlockMap will close this state and release the currently locked map.
+func (st *LockState) UnlockMap() {
+ // Set state to finished (or panic if already done)
+ if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
+ panic("called UnlockMap() on expired state")
}
- // Note: even though the mutex data structure is
- // small, benchmarking does actually show that pooled
- // alloc of mutexes here is faster
+ // Wait until done
+ st.wait.Wait()
- // Acquire mu + add
- mu = mm.pool.Get().(RWMutex)
- mm.mus[key] = mu
+ // Async reset map
+ st.mmap.mapMu.Lock()
+ go st.mmap.onUnlock()
+}
- // Lock mutex + unlock map
- unlockFn := doLock(mu)
- mm.mapMu.Unlock()
+// getLock: see MutexMap.getLock() definition.
+func (st *LockState) getLock(key string, lt locktype) func() {
+ st.wait.Add(1) // track lock
- return func() {
- // Unlock mutex
- unlockFn()
+ // Check if closed, or if write lock is allowed
+ if atomic.LoadUint32(&st.done) == 1 {
+ panic("map lock closed")
+ } else if lt&lockTypeWrite != 0 &&
+ st.ltyp&lockTypeWrite == 0 {
+ panic("called .Lock() on rlocked map")
+ }
+
+ // Spin until achieve map lock
+ st.mmap.spinLock(func() bool {
+ return st.mmap.count < st.mmap.maxmu
+ }) // i.e. not overloaded
- // Release function
- go mm.evict(key, mu)
+ // Perform actual mutex lock
+ unlock := st.mmap.lockMutex(key, lt)
+
+ return func() {
+ unlock()
+ st.wait.Done()
}
}
diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go
index 5da69ef25..2e7b8f802 100644
--- a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go
+++ b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go
@@ -3,8 +3,6 @@ package mutexes
import (
"sync"
"time"
-
- "codeberg.org/gruf/go-nowish"
)
// TimeoutMutex defines a Mutex with timeouts on locks
@@ -73,14 +71,6 @@ func (mu *timeoutRWMutex) RLockFunc(fn func()) func() {
return mutexTimeout(mu.rd, mu.mu.RLock(), fn)
}
-// timeoutPool provides nowish.Timeout objects for timeout mutexes
-var timeoutPool = sync.Pool{
- New: func() interface{} {
- t := nowish.NewTimeout()
- return &t
- },
-}
-
// mutexTimeout performs a timed unlock, calling supplied fn if timeout is reached
func mutexTimeout(d time.Duration, unlock func(), fn func()) func() {
if d < 1 {
@@ -88,18 +78,65 @@ func mutexTimeout(d time.Duration, unlock func(), fn func()) func() {
return unlock
}
- // Acquire timeout obj
- t := timeoutPool.Get().(*nowish.Timeout)
+ // Acquire timer from pool
+ t := timerPool.Get().(*timer)
- // Start the timeout with hook
- t.Start(d, fn)
+ // Start the timer
+ go t.Start(d, fn)
// Return func cancelling timeout,
// replacing Timeout in pool and
// finally unlocking mutex
return func() {
+ defer timerPool.Put(t)
t.Cancel()
- timeoutPool.Put(t)
unlock()
}
}
+
+// timerPool is the global &timer{} pool.
+var timerPool = sync.Pool{
+ New: func() interface{} {
+ return newtimer()
+ },
+}
+
+// timer represents a reusable cancellable timer.
+type timer struct {
+ t *time.Timer
+ c chan struct{}
+}
+
+// newtimer returns a new timer instance.
+func newtimer() *timer {
+ t := time.NewTimer(time.Minute)
+ t.Stop()
+ return &timer{t: t, c: make(chan struct{})}
+}
+
+// Start will start the timer with duration 'd', performing 'fn' on timeout.
+func (t *timer) Start(d time.Duration, fn func()) {
+ t.t.Reset(d)
+ select {
+ // Timed out
+ case <-t.t.C:
+ fn()
+
+ // Cancelled
+ case <-t.c:
+ }
+}
+
+// Cancel will attempt to cancel the running timer.
+func (t *timer) Cancel() {
+ select {
+ // cancel successful
+ case t.c <- struct{}{}:
+ if !t.t.Stop() {
+ <-t.t.C
+ } // stop timer
+
+ // already stopped
+ default:
+ }
+}
diff --git a/vendor/codeberg.org/gruf/go-nowish/LICENSE b/vendor/codeberg.org/gruf/go-nowish/LICENSE
deleted file mode 100644
index b7c4417ac..000000000
--- a/vendor/codeberg.org/gruf/go-nowish/LICENSE
+++ /dev/null
@@ -1,9 +0,0 @@
-MIT License
-
-Copyright (c) 2021 gruf
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/codeberg.org/gruf/go-nowish/README.md b/vendor/codeberg.org/gruf/go-nowish/README.md
deleted file mode 100644
index 4070e5013..000000000
--- a/vendor/codeberg.org/gruf/go-nowish/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-a simple Go library with useful time utiities:
-- Clock: a high performance clock giving a good "ish" representation of "now" (hence the name!)
-- Timeout: a reusable structure for enforcing timeouts with a cancel
diff --git a/vendor/codeberg.org/gruf/go-nowish/clock.go b/vendor/codeberg.org/gruf/go-nowish/clock.go
deleted file mode 100644
index 781e59f18..000000000
--- a/vendor/codeberg.org/gruf/go-nowish/clock.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package nowish
-
-import (
- "sync"
- "sync/atomic"
- "time"
- "unsafe"
-)
-
-// Start returns a new Clock instance initialized and
-// started with the provided precision, along with the
-// stop function for it's underlying timer
-func Start(precision time.Duration) (*Clock, func()) {
- c := Clock{}
- return &c, c.Start(precision)
-}
-
-type Clock struct {
- // format stores the time formatting style string
- format string
-
- // valid indicates whether the current value stored in .Format is valid
- valid uint32
-
- // mutex protects writes to .Format, not because it would be unsafe, but
- // because we want to minimize unnnecessary allocations
- mutex sync.Mutex
-
- // nowfmt is an unsafe pointer to the last-updated time format string
- nowfmt unsafe.Pointer
-
- // now is an unsafe pointer to the last-updated time.Time object
- now unsafe.Pointer
-}
-
-// Start starts the clock with the provided precision, the returned
-// function is the stop function for the underlying timer. For >= 2ms,
-// actual precision is usually within AT LEAST 10% of requested precision,
-// less than this and the actual precision very quickly deteriorates.
-func (c *Clock) Start(precision time.Duration) func() {
- // Create ticker from duration
- tick := time.NewTicker(precision / 10)
-
- // Set initial time
- t := time.Now()
- atomic.StorePointer(&c.now, unsafe.Pointer(&t))
-
- // Set initial format
- s := ""
- atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&s))
-
- // If formatting string unset, set default
- c.mutex.Lock()
- if c.format == "" {
- c.format = time.RFC822
- }
- c.mutex.Unlock()
-
- // Start main routine
- go c.run(tick)
-
- // Return stop fn
- return tick.Stop
-}
-
-// run is the internal clock ticking loop.
-func (c *Clock) run(tick *time.Ticker) {
- for {
- // Wait on tick
- _, ok := <-tick.C
-
- // Channel closed
- if !ok {
- break
- }
-
- // Update time
- t := time.Now()
- atomic.StorePointer(&c.now, unsafe.Pointer(&t))
-
- // Invalidate format string
- atomic.StoreUint32(&c.valid, 0)
- }
-}
-
-// Now returns a good (ish) estimate of the current 'now' time.
-func (c *Clock) Now() time.Time {
- return *(*time.Time)(atomic.LoadPointer(&c.now))
-}
-
-// NowFormat returns the formatted "now" time, cached until next tick and "now" updates.
-func (c *Clock) NowFormat() string {
- // If format still valid, return this
- if atomic.LoadUint32(&c.valid) == 1 {
- return *(*string)(atomic.LoadPointer(&c.nowfmt))
- }
-
- // Get mutex lock
- c.mutex.Lock()
-
- // Double check still invalid
- if atomic.LoadUint32(&c.valid) == 1 {
- c.mutex.Unlock()
- return *(*string)(atomic.LoadPointer(&c.nowfmt))
- }
-
- // Calculate time format
- nowfmt := c.Now().Format(c.format)
-
- // Update the stored value and set valid!
- atomic.StorePointer(&c.nowfmt, unsafe.Pointer(&nowfmt))
- atomic.StoreUint32(&c.valid, 1)
-
- // Unlock and return
- c.mutex.Unlock()
- return nowfmt
-}
-
-// SetFormat sets the time format string used by .NowFormat().
-func (c *Clock) SetFormat(format string) {
- // Get mutex lock
- c.mutex.Lock()
-
- // Update time format
- c.format = format
-
- // Invalidate current format string
- atomic.StoreUint32(&c.valid, 0)
-
- // Unlock
- c.mutex.Unlock()
-}
diff --git a/vendor/codeberg.org/gruf/go-nowish/timeout.go b/vendor/codeberg.org/gruf/go-nowish/timeout.go
deleted file mode 100644
index 7fe3e1d1d..000000000
--- a/vendor/codeberg.org/gruf/go-nowish/timeout.go
+++ /dev/null
@@ -1,233 +0,0 @@
-package nowish
-
-import (
- "sync"
- "sync/atomic"
- "time"
-)
-
-// Timeout provides a reusable structure for enforcing timeouts with a cancel.
-type Timeout struct {
- timer *time.Timer // timer is the underlying timeout-timer
- cncl syncer // cncl is the cancel synchronization channel
- next int64 // next is the next timeout duration to run on
- state uint32 // state stores the current timeout state
- mu sync.Mutex // mu protects state, and helps synchronize return of .Start()
-}
-
-// NewTimeout returns a new Timeout instance.
-func NewTimeout() Timeout {
- timer := time.NewTimer(time.Minute)
- timer.Stop() // don't keep it running
- return Timeout{
- timer: timer,
- cncl: make(syncer),
- }
-}
-
-// startTimeout is the main timeout routine, handling starting the
-// timeout runner at first and upon any time extensions, and handling
-// any received cancels by stopping the running timer.
-func (t *Timeout) startTimeout(hook func()) {
- var cancelled bool
-
- // Receive first timeout duration
- d := atomic.SwapInt64(&t.next, 0)
-
- // Indicate finished starting, this
- // was left locked by t.start().
- t.mu.Unlock()
-
- for {
- // Run supplied timeout
- cancelled = t.runTimeout(d)
- if cancelled {
- break
- }
-
- // Check for extension or set timed out
- d = atomic.SwapInt64(&t.next, 0)
- if d < 1 {
- if t.timedOut() {
- // timeout reached
- hook()
- break
- } else {
- // already cancelled
- t.cncl.wait()
- cancelled = true
- break
- }
- }
-
- if !t.extend() {
- // already cancelled
- t.cncl.wait()
- cancelled = true
- break
- }
- }
-
- if cancelled {
- // Release the .Cancel()
- defer t.cncl.notify()
- }
-
- // Mark as done
- t.reset()
-}
-
-// runTimeout will until supplied timeout or cancel called.
-func (t *Timeout) runTimeout(d int64) (cancelled bool) {
- // Start the timer for 'd'
- t.timer.Reset(time.Duration(d))
-
- select {
- // Timeout reached
- case <-t.timer.C:
- if !t.timingOut() {
- // a sneaky cancel!
- t.cncl.wait()
- cancelled = true
- }
-
- // Cancel called
- case <-t.cncl.wait():
- cancelled = true
- if !t.timer.Stop() {
- <-t.timer.C
- }
- }
-
- return cancelled
-}
-
-// Start starts the timer with supplied timeout. If timeout is reached before
-// cancel then supplied timeout hook will be called. Panic will be called if
-// Timeout is already running when calling this function.
-func (t *Timeout) Start(d time.Duration, hook func()) {
- if !t.start() {
- t.mu.Unlock() // need to unlock
- panic("timeout already started")
- }
-
- // Start the timeout
- atomic.StoreInt64(&t.next, int64(d))
- go t.startTimeout(hook)
-
- // Wait until start
- t.mu.Lock()
- t.mu.Unlock()
-}
-
-// Extend will attempt to extend the timeout runner's time, returns false if not running.
-func (t *Timeout) Extend(d time.Duration) bool {
- var ok bool
- if ok = t.running(); ok {
- atomic.AddInt64(&t.next, int64(d))
- }
- return ok
-}
-
-// Cancel cancels the currently running timer. If a cancel is achieved, then
-// this function will return after the timeout goroutine is finished.
-func (t *Timeout) Cancel() {
- if !t.cancel() {
- return
- }
- t.cncl.notify()
- <-t.cncl.wait()
-}
-
-// possible timeout states.
-const (
- stopped = 0
- started = 1
- timingOut = 2
- cancelled = 3
- timedOut = 4
-)
-
-// cas will perform a compare and swap where the compare is a provided function.
-func (t *Timeout) cas(check func(uint32) bool, swap uint32) bool {
- var cas bool
-
- t.mu.Lock()
- if cas = check(t.state); cas {
- t.state = swap
- }
- t.mu.Unlock()
-
- return cas
-}
-
-// start attempts to mark the timeout state as 'started', note DOES NOT unlock Timeout.mu.
-func (t *Timeout) start() bool {
- var ok bool
-
- t.mu.Lock()
- if ok = (t.state == stopped); ok {
- t.state = started
- }
-
- // don't unlock
- return ok
-}
-
-// timingOut attempts to mark the timeout state as 'timing out'.
-func (t *Timeout) timingOut() bool {
- return t.cas(func(u uint32) bool {
- return (u == started)
- }, timingOut)
-}
-
-// timedOut attempts mark the 'timing out' state as 'timed out'.
-func (t *Timeout) timedOut() bool {
- return t.cas(func(u uint32) bool {
- return (u == timingOut)
- }, timedOut)
-}
-
-// extend attempts to extend a 'timing out' state by moving it back to 'started'.
-func (t *Timeout) extend() bool {
- return t.cas(func(u uint32) bool {
- return (u == started) ||
- (u == timingOut)
- }, started)
-}
-
-// running returns whether the state is anything other than 'stopped'.
-func (t *Timeout) running() bool {
- t.mu.Lock()
- running := (t.state != stopped)
- t.mu.Unlock()
- return running
-}
-
-// cancel attempts to mark the timeout state as 'cancelled'.
-func (t *Timeout) cancel() bool {
- return t.cas(func(u uint32) bool {
- return (u == started) ||
- (u == timingOut)
- }, cancelled)
-}
-
-// reset marks the timeout state as 'stopped'.
-func (t *Timeout) reset() {
- t.mu.Lock()
- t.state = stopped
- t.mu.Unlock()
-}
-
-// syncer provides helpful receiver methods for a synchronization channel.
-type syncer (chan struct{})
-
-// notify blocks on sending an empty value down channel.
-func (s syncer) notify() {
- s <- struct{}{}
-}
-
-// wait returns the underlying channel for blocking until '.notify()'.
-func (s syncer) wait() <-chan struct{} {
- return s
-}
diff --git a/vendor/codeberg.org/gruf/go-nowish/util.go b/vendor/codeberg.org/gruf/go-nowish/util.go
deleted file mode 100644
index 31fe9050e..000000000
--- a/vendor/codeberg.org/gruf/go-nowish/util.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package nowish
-
-//nolint
-type noCopy struct{}
-
-//nolint
-func (*noCopy) Lock() {}
-
-//nolint
-func (*noCopy) Unlock() {}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go
index ddaaf60cf..da743ead1 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go
@@ -2,6 +2,7 @@ package kv
import (
"codeberg.org/gruf/go-errors"
+ "codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
)
@@ -17,10 +18,10 @@ var ErrIteratorClosed = errors.New("store/kv: iterator closed")
// have multiple iterators running concurrently
type KVIterator struct {
store *KVStore // store is the linked KVStore
+ state *mutexes.LockState
entries []storage.StorageEntry
index int
key string
- onClose func()
}
// Next attempts to set the next key-value pair, the
@@ -43,13 +44,10 @@ func (i *KVIterator) Key() string {
// Release releases the KVIterator and KVStore's read lock
func (i *KVIterator) Release() {
- // Reset key, path, entries
+ i.state.UnlockMap()
i.store = nil
i.key = ""
i.entries = nil
-
- // Perform requested callback
- i.onClose()
}
// Value returns the next value from the KVStore
@@ -60,5 +58,5 @@ func (i *KVIterator) Value() ([]byte, error) {
}
// Attempt to fetch from store
- return i.store.get(i.store.mutexMap.RLock, i.key)
+ return i.store.get(i.state.RLock, i.key)
}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go
index 330928bce..0b226e107 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/state.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/state.go
@@ -2,9 +2,9 @@ package kv
import (
"io"
- "sync"
"codeberg.org/gruf/go-errors"
+ "codeberg.org/gruf/go-mutexes"
)
var ErrStateClosed = errors.New("store/kv: state closed")
@@ -16,61 +16,42 @@ var ErrStateClosed = errors.New("store/kv: state closed")
// then the state has zero guarantees
type StateRO struct {
store *KVStore
- mutex sync.RWMutex
+ state *mutexes.LockState
}
func (st *StateRO) Get(key string) ([]byte, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.get(st.store.mutexMap.RLock, key)
+ return st.store.get(st.state.RLock, key)
}
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.getStream(st.store.mutexMap.RLock, key)
+ return st.store.getStream(st.state.RLock, key)
}
func (st *StateRO) Has(key string) (bool, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
- return st.store.has(st.store.mutexMap.RLock, key)
+ return st.store.has(st.state.RLock, key)
}
func (st *StateRO) Release() {
- // Get state write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Release the store
- if st.store != nil {
- st.store.mutex.RUnlock()
- st.store = nil
- }
+ st.state.UnlockMap()
+ st.store = nil
}
// StateRW provides a read-write window to the store. While this
@@ -80,101 +61,70 @@ func (st *StateRO) Release() {
// then the state has zero guarantees
type StateRW struct {
store *KVStore
- mutex sync.RWMutex
+ state *mutexes.LockState
}
func (st *StateRW) Get(key string) ([]byte, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.get(st.store.mutexMap.RLock, key)
+ return st.store.get(st.state.RLock, key)
}
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.getStream(st.store.mutexMap.RLock, key)
+ return st.store.getStream(st.state.RLock, key)
}
func (st *StateRW) Put(key string, value []byte) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.put(st.store.mutexMap.Lock, key, value)
+ return st.store.put(st.state.Lock, key, value)
}
func (st *StateRW) PutStream(key string, r io.Reader) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.putStream(st.store.mutexMap.Lock, key, r)
+ return st.store.putStream(st.state.Lock, key, r)
}
func (st *StateRW) Has(key string) (bool, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
- return st.store.has(st.store.mutexMap.RLock, key)
+ return st.store.has(st.state.RLock, key)
}
func (st *StateRW) Delete(key string) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.delete(st.store.mutexMap.Lock, key)
+ return st.store.delete(st.state.Lock, key)
}
func (st *StateRW) Release() {
- // Get state write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Release the store
- if st.store != nil {
- st.store.mutex.Unlock()
- st.store = nil
- }
+ st.state.UnlockMap()
+ st.store = nil
}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go
index 4c3a31140..a8741afe0 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/store.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/store.go
@@ -2,7 +2,6 @@ package kv
import (
"io"
- "sync"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
@@ -11,9 +10,8 @@ import (
// KVStore is a very simple, yet performant key-value store
type KVStore struct {
- mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
- mutex sync.RWMutex // mutex is the total store mutex
- storage storage.Storage // storage is the underlying storage
+ mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access
+ storage storage.Storage // storage is the underlying storage
}
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
@@ -47,26 +45,19 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Return new KVStore
return &KVStore{
- mutexMap: mutexes.NewMap(mutexes.NewRW),
- mutex: sync.RWMutex{},
- storage: storage,
+ mutex: mutexes.NewMap(-1),
+ storage: storage,
}, nil
}
// RLock acquires a read-lock on supplied key, returning unlock function.
func (st *KVStore) RLock(key string) (runlock func()) {
- st.mutex.RLock()
- runlock = st.mutexMap.RLock(key)
- st.mutex.RUnlock()
- return runlock
+ return st.mutex.RLock(key)
}
// Lock acquires a write-lock on supplied key, returning unlock function.
func (st *KVStore) Lock(key string) (unlock func()) {
- st.mutex.Lock()
- unlock = st.mutexMap.Lock(key)
- st.mutex.Unlock()
- return unlock
+ return st.mutex.Lock(key)
}
// Get fetches the bytes for supplied key in the store
@@ -167,7 +158,7 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
}
// Get store read lock
- st.mutex.RLock()
+ state := st.mutex.RLockMap()
// Setup the walk keys function
entries := []storage.StorageEntry{}
@@ -184,24 +175,24 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
// Walk keys in the storage
err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn})
if err != nil {
- st.mutex.RUnlock()
+ state.UnlockMap()
return nil, err
}
// Return new iterator
return &KVIterator{
store: st,
+ state: state,
entries: entries,
index: -1,
key: "",
- onClose: st.mutex.RUnlock,
}, nil
}
// Read provides a read-only window to the store, holding it in a read-locked state until release
func (st *KVStore) Read() *StateRO {
- st.mutex.RLock()
- return &StateRO{store: st}
+ state := st.mutex.RLockMap()
+ return &StateRO{store: st, state: state}
}
// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return.
@@ -216,8 +207,8 @@ func (st *KVStore) ReadFn(fn func(*StateRO)) {
// Update provides a read-write window to the store, holding it in a write-locked state until release
func (st *KVStore) Update() *StateRW {
- st.mutex.Lock()
- return &StateRW{store: st}
+ state := st.mutex.LockMap()
+ return &StateRW{store: st, state: state}
}
// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return.
@@ -229,3 +220,8 @@ func (st *KVStore) UpdateFn(fn func(*StateRW)) {
// Pass to fn
fn(state)
}
+
+// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work).
+func (st *KVStore) Close() error {
+ return st.storage.Close()
+}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go
index bc35b07ac..5075c7d17 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/block.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/block.go
@@ -1,6 +1,7 @@
package storage
import (
+ "crypto/sha256"
"io"
"io/fs"
"os"
@@ -13,7 +14,6 @@ import (
"codeberg.org/gruf/go-hashenc"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/util"
- "github.com/zeebo/blake3"
)
var (
@@ -77,7 +77,7 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig {
// BlockStorage is a Storage implementation that stores input data as chunks on
// a filesystem. Each value is chunked into blocks of configured size and these
-// blocks are stored with name equal to their base64-encoded BLAKE3 hash-sum. A
+// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
// "node" file is finally created containing an array of hashes contained within
// this value
type BlockStorage struct {
@@ -87,7 +87,7 @@ type BlockStorage struct {
config BlockConfig // cfg is the supplied configuration for this store
hashPool sync.Pool // hashPool is this store's hashEncoder pool
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
- lock *LockableFile // lock is the opened lockfile for this storage instance
+ lock *Lock // lock is the opened lockfile for this storage instance
// NOTE:
// BlockStorage does not need to lock each of the underlying block files
@@ -140,11 +140,9 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
}
// Open and acquire storage lock for path
- lock, err := OpenLock(pb.Join(path, LockFile))
+ lock, err := OpenLock(pb.Join(path, lockFile))
if err != nil {
return nil, err
- } else if err := lock.Lock(); err != nil {
- return nil, err
}
// Figure out the largest size for bufpool slices
@@ -174,14 +172,23 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
// Clean implements storage.Clean()
func (st *BlockStorage) Clean() error {
- nodes := map[string]*node{}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Walk nodes dir for entries
+ nodes := map[string]*node{}
onceErr := errors.OnceError{}
+
+ // Walk nodes dir for entries
err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
// Only deal with regular files
if !fsentry.Type().IsRegular() {
@@ -303,6 +310,7 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
if err != nil {
return nil, err
}
+ defer rc.Close()
// Read all bytes and return
return io.ReadAll(rc)
@@ -316,9 +324,19 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
return nil, err
}
+ // Track open
+ st.lock.Add()
+
+ // Check if open
+ if st.lock.Closed() {
+ st.lock.Done()
+ return nil, ErrClosed
+ }
+
// Attempt to open RO file
file, err := open(npath, defaultFileROFlags)
if err != nil {
+ st.lock.Done()
return nil, err
}
defer file.Close()
@@ -338,14 +356,16 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
nil,
)
if err != nil {
+ st.lock.Done()
return nil, err
}
- // Return new block reader
- return util.NopReadCloser(&blockReader{
+ // Prepare block reader and return
+ rc := util.NopReadCloser(&blockReader{
storage: st,
node: &node,
- }), nil
+ }) // we wrap the blockreader to decr lockfile waitgroup
+ return util.ReadCloserWithCallback(rc, st.lock.Done), nil
}
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
@@ -383,6 +403,15 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Check if this exists
ok, err := stat(key)
if err != nil {
@@ -567,6 +596,15 @@ func (st *BlockStorage) Stat(key string) (bool, error) {
return false, err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
// Check for file on disk
return stat(kpath)
}
@@ -579,18 +617,35 @@ func (st *BlockStorage) Remove(key string) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Attempt to remove file
return os.Remove(kpath)
}
// Close implements Storage.Close()
func (st *BlockStorage) Close() error {
- defer st.lock.Close()
- return st.lock.Unlock()
+ return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys()
func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
@@ -800,7 +855,7 @@ var (
// encodedHashLen is the once-calculated encoded hash-sum length
encodedHashLen = base64Encoding.EncodedLen(
- blake3.New().Size(),
+ sha256.New().Size(),
)
)
@@ -812,9 +867,8 @@ type hashEncoder struct {
// newHashEncoder returns a new hashEncoder instance
func newHashEncoder() *hashEncoder {
- hash := blake3.New()
return &hashEncoder{
- henc: hashenc.New(hash, base64Encoding),
+ henc: hashenc.New(sha256.New(), base64Encoding),
ebuf: make([]byte, encodedHashLen),
}
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go
index 9b5430437..2ee00ddee 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/disk.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go
@@ -71,7 +71,7 @@ type DiskStorage struct {
path string // path is the root path of this store
bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
config DiskConfig // cfg is the supplied configuration for this store
- lock *LockableFile // lock is the opened lockfile for this storage instance
+ lock *Lock // lock is the opened lockfile for this storage instance
}
// OpenFile opens a DiskStorage instance for given folder path and configuration
@@ -118,11 +118,9 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
}
// Open and acquire storage lock for path
- lock, err := OpenLock(pb.Join(path, LockFile))
+ lock, err := OpenLock(pb.Join(path, lockFile))
if err != nil {
return nil, err
- } else if err := lock.Lock(); err != nil {
- return nil, err
}
// Return new DiskStorage
@@ -136,6 +134,11 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
// Clean implements Storage.Clean()
func (st *DiskStorage) Clean() error {
+ st.lock.Add()
+ defer st.lock.Done()
+ if st.lock.Closed() {
+ return ErrClosed
+ }
return util.CleanDirs(st.path)
}
@@ -160,9 +163,18 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
return nil, err
}
+ // Track open
+ st.lock.Add()
+
+ // Check if open
+ if st.lock.Closed() {
+ return nil, ErrClosed
+ }
+
// Attempt to open file (replace ENOENT with our own)
file, err := open(kpath, defaultFileROFlags)
if err != nil {
+ st.lock.Done()
return nil, errSwapNotFound(err)
}
@@ -170,12 +182,14 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
cFile, err := st.config.Compression.Reader(file)
if err != nil {
file.Close() // close this here, ignore error
+ st.lock.Done()
return nil, err
}
// Wrap compressor to ensure file close
return util.ReadCloserWithCallback(cFile, func() {
file.Close()
+ st.lock.Done()
}), nil
}
@@ -192,6 +206,15 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
@@ -242,6 +265,15 @@ func (st *DiskStorage) Stat(key string) (bool, error) {
return false, err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
// Check for file on disk
return stat(kpath)
}
@@ -254,18 +286,35 @@ func (st *DiskStorage) Remove(key string) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Attempt to remove file
return os.Remove(kpath)
}
// Close implements Storage.Close()
func (st *DiskStorage) Close() error {
- defer st.lock.Close()
- return st.lock.Unlock()
+ return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys()
func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
@@ -286,13 +335,13 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
// filepath checks and returns a formatted filepath for given key
func (st *DiskStorage) filepath(key string) (string, error) {
+ // Calculate transformed key path
+ key = st.config.Transform.KeyToPath(key)
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Calculate transformed key path
- key = st.config.Transform.KeyToPath(key)
-
// Generated joined root path
pb.AppendString(st.path)
pb.AppendString(key)
diff --git a/vendor/codeberg.org/gruf/go-store/storage/errors.go b/vendor/codeberg.org/gruf/go-store/storage/errors.go
index 016593596..ad2b742e6 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/errors.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/errors.go
@@ -19,6 +19,9 @@ func (e errorString) Extend(s string, a ...interface{}) errorString {
}
var (
+ // ErrClosed is returned on operations on a closed storage
+ ErrClosed = errorString("store/storage: closed")
+
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = errorString("store/storage: key not found")
@@ -39,6 +42,9 @@ var (
// errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
errCorruptNodes = errorString("store/storage: corrupted nodes")
+
+ // ErrAlreadyLocked is returned on fail opening a storage lockfile
+ ErrAlreadyLocked = errorString("store/storage: storage lock already open")
)
// errSwapNoop performs no error swaps
@@ -61,3 +67,11 @@ func errSwapExist(err error) error {
}
return err
}
+
+// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked
+func errSwapUnavailable(err error) error {
+ if err == syscall.EAGAIN {
+ return ErrAlreadyLocked
+ }
+ return err
+}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go
index ff4c857c3..b1c3560d2 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/fs.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/fs.go
@@ -8,11 +8,14 @@ import (
)
const (
- defaultDirPerms = 0755
- defaultFilePerms = 0644
+ // default file permission bits
+ defaultDirPerms = 0755
+ defaultFilePerms = 0644
+
+ // default file open flags
defaultFileROFlags = syscall.O_RDONLY
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
- defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT
+ defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
)
// NOTE:
diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/storage/lock.go
index a757830cc..fae4351bf 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/lock.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/lock.go
@@ -1,38 +1,81 @@
package storage
import (
- "os"
+ "sync"
+ "sync/atomic"
"syscall"
"codeberg.org/gruf/go-store/util"
)
-// LockFile is our standard lockfile name.
-const LockFile = "store.lock"
+// lockFile is our standard lockfile name.
+var lockFile = "store.lock"
-type LockableFile struct {
- *os.File
+// IsLockKey returns whether storage key is our lockfile.
+func IsLockKey(key string) bool {
+ return key == lockFile
+}
+
+// Lock represents a filesystem lock to ensure only one storage instance open per path.
+type Lock struct {
+ fd int
+ wg sync.WaitGroup
+ st uint32
}
// OpenLock opens a lockfile at path.
-func OpenLock(path string) (*LockableFile, error) {
- file, err := open(path, defaultFileLockFlags)
+func OpenLock(path string) (*Lock, error) {
+ var fd int
+
+ // Open the file descriptor at path
+ err := util.RetryOnEINTR(func() (err error) {
+ fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms)
+ return
+ })
if err != nil {
return nil, err
}
- return &LockableFile{file}, nil
+
+ // Get a flock on the file descriptor
+ err = util.RetryOnEINTR(func() error {
+ return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB)
+ })
+ if err != nil {
+ return nil, errSwapUnavailable(err)
+ }
+
+ return &Lock{fd: fd}, nil
}
-func (f *LockableFile) Lock() error {
- return f.flock(syscall.LOCK_EX | syscall.LOCK_NB)
+// Add will add '1' to the underlying sync.WaitGroup.
+func (f *Lock) Add() {
+ f.wg.Add(1)
}
-func (f *LockableFile) Unlock() error {
- return f.flock(syscall.LOCK_UN | syscall.LOCK_NB)
+// Done will decrememnt '1' from the underlying sync.WaitGroup.
+func (f *Lock) Done() {
+ f.wg.Done()
}
-func (f *LockableFile) flock(how int) error {
- return util.RetryOnEINTR(func() error {
- return syscall.Flock(int(f.Fd()), how)
- })
+// Close will attempt to close the lockfile and file descriptor.
+func (f *Lock) Close() error {
+ var err error
+ if atomic.CompareAndSwapUint32(&f.st, 0, 1) {
+ // Wait until done
+ f.wg.Wait()
+
+ // Ensure gets closed
+ defer syscall.Close(f.fd)
+
+ // Call funlock on the file descriptor
+ err = util.RetryOnEINTR(func() error {
+ return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB)
+ })
+ }
+ return err
+}
+
+// Closed will return whether this lockfile has been closed (and unlocked).
+func (f *Lock) Closed() bool {
+ return (atomic.LoadUint32(&f.st) == 1)
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/memory.go b/vendor/codeberg.org/gruf/go-store/storage/memory.go
index 7daa4a483..2dab562d6 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/memory.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/memory.go
@@ -14,6 +14,7 @@ type MemoryStorage struct {
ow bool // overwrites
fs map[string][]byte
mu sync.Mutex
+ st uint32
}
// OpenMemory opens a new MemoryStorage instance with internal map of 'size'.
@@ -27,13 +28,26 @@ func OpenMemory(size int, overwrites bool) *MemoryStorage {
// Clean implements Storage.Clean().
func (st *MemoryStorage) Clean() error {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ if st.st == 1 {
+ return ErrClosed
+ }
return nil
}
// ReadBytes implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
- // Safely check store
+ // Lock storage
st.mu.Lock()
+
+ // Check store open
+ if st.st == 1 {
+ st.mu.Unlock()
+ return nil, ErrClosed
+ }
+
+ // Check for key
b, ok := st.fs[key]
st.mu.Unlock()
@@ -48,8 +62,16 @@ func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
// ReadStream implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
- // Safely check store
+ // Lock storage
st.mu.Lock()
+
+ // Check store open
+ if st.st == 1 {
+ st.mu.Unlock()
+ return nil, ErrClosed
+ }
+
+ // Check for key
b, ok := st.fs[key]
st.mu.Unlock()
@@ -66,19 +88,24 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
// WriteBytes implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(key string, b []byte) error {
- // Safely check store
+ // Lock storage
st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
_, ok := st.fs[key]
// Check for already exist
if ok && !st.ow {
- st.mu.Unlock()
return ErrAlreadyExists
}
// Write + unlock
st.fs[key] = bytes.Copy(b)
- st.mu.Unlock()
return nil
}
@@ -96,43 +123,66 @@ func (st *MemoryStorage) WriteStream(key string, r io.Reader) error {
// Stat implements Storage.Stat().
func (st *MemoryStorage) Stat(key string) (bool, error) {
+ // Lock storage
st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return false, ErrClosed
+ }
+
+ // Check for key
_, ok := st.fs[key]
- st.mu.Unlock()
return ok, nil
}
// Remove implements Storage.Remove().
func (st *MemoryStorage) Remove(key string) error {
- // Safely check store
+ // Lock storage
st.mu.Lock()
- _, ok := st.fs[key]
+ defer st.mu.Unlock()
- // Check in store
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
+ // Check for key
+ _, ok := st.fs[key]
if !ok {
- st.mu.Unlock()
return ErrNotFound
}
- // Delete + unlock
+ // Remove from store
delete(st.fs, key)
- st.mu.Unlock()
+
return nil
}
// Close implements Storage.Close().
func (st *MemoryStorage) Close() error {
+ st.mu.Lock()
+ st.st = 1
+ st.mu.Unlock()
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error {
- // Safely walk storage keys
+ // Lock storage
st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
+ // Walk store keys
for key := range st.fs {
opts.WalkFn(entry(key))
}
- st.mu.Unlock()
return nil
}