summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-mutexes/map.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes/map.go')
-rw-r--r--vendor/codeberg.org/gruf/go-mutexes/map.go496
1 files changed, 320 insertions, 176 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go
index cb31a9543..c0f740eec 100644
--- a/vendor/codeberg.org/gruf/go-mutexes/map.go
+++ b/vendor/codeberg.org/gruf/go-mutexes/map.go
@@ -6,260 +6,347 @@ import (
"sync/atomic"
)
-// locktype defines maskable mutexmap lock types.
-type locktype uint8
-
const (
// possible lock types.
- lockTypeRead = locktype(1) << 0
- lockTypeWrite = locktype(1) << 1
- lockTypeMap = locktype(1) << 2
+ lockTypeRead = uint8(1) << 0
+ lockTypeWrite = uint8(1) << 1
+ lockTypeMap = uint8(1) << 2
// possible mutexmap states.
stateUnlockd = uint8(0)
stateRLocked = uint8(1)
stateLocked = uint8(2)
stateInUse = uint8(3)
+
+ // default values.
+ defaultWake = 1024
)
-// permitLockType returns if provided locktype is permitted to go ahead in current state.
-func permitLockType(state uint8, lt locktype) bool {
+// acquireState attempts to acquire required map state for lockType.
+func acquireState(state uint8, lt uint8) (uint8, bool) {
switch state {
// Unlocked state
// (all allowed)
case stateUnlockd:
- return true
// Keys locked, no state lock.
// (don't allow map locks)
case stateInUse:
- return lt&lockTypeMap == 0
+ if lt&lockTypeMap != 0 {
+ return 0, false
+ }
// Read locked
// (only allow read locks)
case stateRLocked:
- return lt&lockTypeRead != 0
+ if lt&lockTypeRead == 0 {
+ return 0, false
+ }
// Write locked
// (none allowed)
case stateLocked:
- return false
+ return 0, false
// shouldn't reach here
default:
panic("unexpected state")
}
+
+ switch {
+ // If unlocked and not a map
+ // lock request, set in use
+ case lt&lockTypeMap == 0:
+ if state == stateUnlockd {
+ state = stateInUse
+ }
+
+ // Set read lock state
+ case lt&lockTypeRead != 0:
+ state = stateRLocked
+
+ // Set write lock state
+ case lt&lockTypeWrite != 0:
+ state = stateLocked
+
+ default:
+ panic("unexpected lock type")
+ }
+
+ return state, true
}
-// MutexMap is a structure that allows having a map of self-evicting mutexes
-// by key. You do not need to worry about managing the contents of the map,
-// only requesting RLock/Lock for keys, and ensuring to call the returned
-// unlock functions.
+// MutexMap is a structure that allows read / write locking key, performing
+// as you'd expect a map[string]*sync.RWMutex to perform. The differences
+// being that the entire map can itself be read / write locked, it uses memory
+// pooling for the mutex (not quite) structures, and it is self-evicting. The
+// core configurations of maximum no. open locks and wake modulus* are user
+// definable.
+//
+// * The wake modulus is the number that the current number of open locks is
+// modulused against to determine how often to notify sleeping goroutines.
+// These are goroutines that are attempting to lock a key / whole map and are
+// awaiting a permissible state (.e.g no key write locks allowed when the
+// map is read locked).
type MutexMap struct {
- mus map[string]RWMutex
- mapMu sync.Mutex
- pool sync.Pool
- queue []func()
- evict []func()
+ qpool pool
+ queue []*sync.Mutex
+
+ mumap map[string]*rwmutex
+ mpool pool
+ evict []*rwmutex
+
count int32
maxmu int32
+ wake int32
+
+ mapmu sync.Mutex
state uint8
}
// NewMap returns a new MutexMap instance with provided max no. open mutexes.
-func NewMap(max int32) MutexMap {
+func NewMap(max, wake int32) MutexMap {
+ // Determine wake mod.
+ if wake < 1 {
+ wake = defaultWake
+ }
+
+ // Determine max no. mutexes
if max < 1 {
- // Default = 128 * GOMAXPROCS
procs := runtime.GOMAXPROCS(0)
- max = int32(procs * 128)
+ max = wake * int32(procs)
}
+
return MutexMap{
- mus: make(map[string]RWMutex),
- pool: sync.Pool{
- New: func() interface{} {
- return NewRW()
+ qpool: pool{
+ alloc: func() interface{} {
+ return &sync.Mutex{}
+ },
+ },
+ mumap: make(map[string]*rwmutex, max),
+ mpool: pool{
+ alloc: func() interface{} {
+ return &rwmutex{}
},
},
maxmu: max,
+ wake: wake,
}
}
-// acquire will either acquire a mutex from pool or alloc.
-func (mm *MutexMap) acquire() RWMutex {
- return mm.pool.Get().(RWMutex)
-}
+// MAX sets the MutexMap max open locks and wake modulus, returns current values.
+// For values less than zero defaults are set, and zero is non-op.
+func (mm *MutexMap) SET(max, wake int32) (int32, int32) {
+ mm.mapmu.Lock()
+
+ switch {
+ // Set default wake
+ case wake < 0:
+ mm.wake = defaultWake
+
+ // Set supplied wake
+ case wake > 0:
+ mm.wake = wake
+ }
+
+ switch {
+ // Set default max
+ case max < 0:
+ procs := runtime.GOMAXPROCS(0)
+ mm.maxmu = wake * int32(procs)
+
+ // Set supplied max
+ case max > 0:
+ mm.maxmu = max
+ }
-// release will release provided mutex to pool.
-func (mm *MutexMap) release(mu RWMutex) {
- mm.pool.Put(mu)
+ // Fetch values
+ max = mm.maxmu
+ wake = mm.wake
+
+ mm.mapmu.Unlock()
+ return max, wake
}
-// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true,
-// returning with map lock. Note that 'cond' is performed within a map lock.
+// spinLock will wait (using a mutex to sleep thread) until conditional returns true.
func (mm *MutexMap) spinLock(cond func() bool) {
- mu := mm.acquire()
- defer mm.release(mu)
+ var mu *sync.Mutex
for {
- // Get map lock
- mm.mapMu.Lock()
+ // Acquire map lock
+ mm.mapmu.Lock()
- // Check if return
if cond() {
+ // Release mu if needed
+ if mu != nil {
+ mm.qpool.Release(mu)
+ }
return
}
+ // Alloc mu if needed
+ if mu == nil {
+ v := mm.qpool.Acquire()
+ mu = v.(*sync.Mutex)
+ }
+
// Queue ourselves
- unlock := mu.Lock()
- mm.queue = append(mm.queue, unlock)
- mm.mapMu.Unlock()
+ mm.queue = append(mm.queue, mu)
+ mu.Lock()
+
+ // Unlock map
+ mm.mapmu.Unlock()
// Wait on notify
- mu.Lock()()
+ mu.Lock()
+ mu.Unlock()
}
}
-// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return.
-func (mm *MutexMap) lockMutex(key string, lt locktype) func() {
- var unlock func()
+// lock will acquire a lock of given type on the 'mutex' at key.
+func (mm *MutexMap) lock(key string, lt uint8) func() {
+ var ok bool
+ var mu *rwmutex
+
+ // Spin lock until returns true
+ mm.spinLock(func() bool {
+ // Check not overloaded
+ if !(mm.count < mm.maxmu) {
+ return false
+ }
+
+ // Attempt to acquire usable map state
+ state, ok := acquireState(mm.state, lt)
+ if !ok {
+ return false
+ }
+
+ // Update state
+ mm.state = state
+
+ // Ensure mutex at key
+ // is in lockable state
+ mu, ok = mm.mumap[key]
+ return !ok || mu.CanLock(lt)
+ })
- // Incr counter
+ // Incr count
mm.count++
- // Check for existing mutex at key
- mu, ok := mm.mus[key]
if !ok {
+ // No mutex found for key
+
// Alloc from pool
- mu = mm.acquire()
- mm.mus[key] = mu
-
- // Queue mutex for eviction
- mm.evict = append(mm.evict, func() {
- delete(mm.mus, key)
- mm.pool.Put(mu)
- })
- }
+ v := mm.mpool.Acquire()
+ mu = v.(*rwmutex)
+ mm.mumap[key] = mu
- // If no state, set in use.
- // State will already have been
- // set if this is from LockState{}
- if mm.state == stateUnlockd {
- mm.state = stateInUse
- }
+ // Set our key
+ mu.key = key
- switch {
- // Read lock
- case lt&lockTypeRead != 0:
- unlock = mu.RLock()
+ // Queue for eviction
+ mm.evict = append(mm.evict, mu)
+ }
- // Write lock
- case lt&lockTypeWrite != 0:
- unlock = mu.Lock()
+ // Lock mutex
+ mu.Lock(lt)
- // shouldn't reach here
- default:
- panic("unexpected lock type")
- }
+ // Unlock map
+ mm.mapmu.Unlock()
- // Unlock map + return
- mm.mapMu.Unlock()
return func() {
- mm.mapMu.Lock()
- unlock()
- go mm.onUnlock()
+ mm.mapmu.Lock()
+ mu.Unlock()
+ go mm.cleanup()
}
}
-// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex.
-func (mm *MutexMap) onUnlock() {
- // Decr counter
+// lockMap will lock the whole map under given lock type.
+func (mm *MutexMap) lockMap(lt uint8) {
+ // Spin lock until returns true
+ mm.spinLock(func() bool {
+ // Attempt to acquire usable map state
+ state, ok := acquireState(mm.state, lt)
+ if !ok {
+ return false
+ }
+
+ // Update state
+ mm.state = state
+
+ return true
+ })
+
+ // Incr count
+ mm.count++
+
+ // State acquired, unlock
+ mm.mapmu.Unlock()
+}
+
+// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map.
+func (mm *MutexMap) cleanup() {
+ // Decr count
mm.count--
- if mm.count < 1 {
- // Perform all queued evictions
- for i := 0; i < len(mm.evict); i++ {
- mm.evict[i]()
+ if mm.count%mm.wake == 0 {
+ // Notify queued routines
+ for _, mu := range mm.queue {
+ mu.Unlock()
}
- // Notify all waiting goroutines
- for i := 0; i < len(mm.queue); i++ {
- mm.queue[i]()
+ // Reset queue
+ mm.queue = mm.queue[:0]
+ }
+
+ if mm.count < 1 {
+ // Perform evictions
+ for _, mu := range mm.evict {
+ key := mu.key
+ mu.key = ""
+ delete(mm.mumap, key)
+ mm.mpool.Release(mu)
}
- // Reset the map state
- mm.evict = nil
- mm.queue = nil
+ // Reset map state
+ mm.evict = mm.evict[:0]
mm.state = stateUnlockd
+ mm.mpool.GC()
+ mm.qpool.GC()
}
- // Finally, unlock
- mm.mapMu.Unlock()
+ // Unlock map
+ mm.mapmu.Unlock()
}
// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
func (mm *MutexMap) RLockMap() *LockState {
- return mm.getMapLock(lockTypeRead)
+ mm.lockMap(lockTypeRead | lockTypeMap)
+ return &LockState{
+ mmap: mm,
+ ltyp: lockTypeRead,
+ }
}
// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
func (mm *MutexMap) LockMap() *LockState {
- return mm.getMapLock(lockTypeWrite)
+ mm.lockMap(lockTypeWrite | lockTypeMap)
+ return &LockState{
+ mmap: mm,
+ ltyp: lockTypeWrite,
+ }
}
// RLock acquires a mutex read lock for supplied key, returning an RUnlock function.
func (mm *MutexMap) RLock(key string) (runlock func()) {
- return mm.getLock(key, lockTypeRead)
+ return mm.lock(key, lockTypeRead)
}
// Lock acquires a mutex write lock for supplied key, returning an Unlock function.
func (mm *MutexMap) Lock(key string) (unlock func()) {
- return mm.getLock(key, lockTypeWrite)
-}
-
-// getLock will fetch lock of provided type, for given key, returning unlock function.
-func (mm *MutexMap) getLock(key string, lt locktype) func() {
- // Spin until achieve lock
- mm.spinLock(func() bool {
- return permitLockType(mm.state, lt) &&
- mm.count < mm.maxmu // not overloaded
- })
-
- // Perform actual mutex lock
- return mm.lockMutex(key, lt)
-}
-
-// getMapLock will acquire a map lock of provided type, returning a LockState session.
-func (mm *MutexMap) getMapLock(lt locktype) *LockState {
- // Spin until achieve lock
- mm.spinLock(func() bool {
- return permitLockType(mm.state, lt|lockTypeMap) &&
- mm.count < mm.maxmu // not overloaded
- })
-
- // Incr counter
- mm.count++
-
- switch {
- // Set read lock state
- case lt&lockTypeRead != 0:
- mm.state = stateRLocked
-
- // Set write lock state
- case lt&lockTypeWrite != 0:
- mm.state = stateLocked
-
- default:
- panic("unexpected lock type")
- }
-
- // Unlock + return
- mm.mapMu.Unlock()
- return &LockState{
- mmap: mm,
- ltyp: lt,
- }
+ return mm.lock(key, lockTypeWrite)
}
// LockState represents a window to a locked MutexMap.
@@ -267,56 +354,113 @@ type LockState struct {
wait sync.WaitGroup
mmap *MutexMap
done uint32
- ltyp locktype
+ ltyp uint8
}
// Lock: see MutexMap.Lock() definition. Will panic if map only read locked.
func (st *LockState) Lock(key string) (unlock func()) {
- return st.getLock(key, lockTypeWrite)
+ return st.lock(key, lockTypeWrite)
}
// RLock: see MutexMap.RLock() definition.
func (st *LockState) RLock(key string) (runlock func()) {
- return st.getLock(key, lockTypeRead)
+ return st.lock(key, lockTypeRead)
}
-// UnlockMap will close this state and release the currently locked map.
-func (st *LockState) UnlockMap() {
- // Set state to finished (or panic if already done)
- if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
- panic("called UnlockMap() on expired state")
- }
-
- // Wait until done
- st.wait.Wait()
-
- // Async reset map
- st.mmap.mapMu.Lock()
- go st.mmap.onUnlock()
-}
-
-// getLock: see MutexMap.getLock() definition.
-func (st *LockState) getLock(key string, lt locktype) func() {
+// lock: see MutexMap.lock() definition.
+func (st *LockState) lock(key string, lt uint8) func() {
st.wait.Add(1) // track lock
- // Check if closed, or if write lock is allowed
if atomic.LoadUint32(&st.done) == 1 {
- panic("map lock closed")
+ panic("called (r)lock on unlocked state")
} else if lt&lockTypeWrite != 0 &&
st.ltyp&lockTypeWrite == 0 {
- panic("called .Lock() on rlocked map")
+ panic("called lock on rlocked map")
}
- // Spin until achieve map lock
+ var ok bool
+ var mu *rwmutex
+
+ // Spin lock until returns true
st.mmap.spinLock(func() bool {
- return st.mmap.count < st.mmap.maxmu
- }) // i.e. not overloaded
+ // Check not overloaded
+ if !(st.mmap.count < st.mmap.maxmu) {
+ return false
+ }
+
+ // Ensure mutex at key
+ // is in lockable state
+ mu, ok = st.mmap.mumap[key]
+ return !ok || mu.CanLock(lt)
+ })
+
+ // Incr count
+ st.mmap.count++
+
+ if !ok {
+ // No mutex found for key
+
+ // Alloc from pool
+ v := st.mmap.mpool.Acquire()
+ mu = v.(*rwmutex)
+ st.mmap.mumap[key] = mu
+
+ // Set our key
+ mu.key = key
+
+ // Queue for eviction
+ st.mmap.evict = append(st.mmap.evict, mu)
+ }
- // Perform actual mutex lock
- unlock := st.mmap.lockMutex(key, lt)
+ // Lock mutex
+ mu.Lock(lt)
+
+ // Unlock map
+ st.mmap.mapmu.Unlock()
return func() {
- unlock()
- st.wait.Done()
+ st.mmap.mapmu.Lock()
+ mu.Unlock()
+ go st.mmap.cleanup()
+ st.wait.Add(-1)
+ }
+}
+
+// UnlockMap will close this state and release the currently locked map.
+func (st *LockState) UnlockMap() {
+ if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
+ panic("called unlockmap on expired state")
+ }
+ st.wait.Wait()
+ st.mmap.mapmu.Lock()
+ go st.mmap.cleanup()
+}
+
+// rwmutex is a very simple *representation* of a read-write
+// mutex, though not one in implementation. it works by
+// tracking the lock state for a given map key, which is
+// protected by the map's mutex.
+type rwmutex struct {
+ rcnt uint32
+ lock uint8
+ key string
+}
+
+func (mu *rwmutex) CanLock(lt uint8) bool {
+ return mu.lock == 0 ||
+ (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0)
+}
+
+func (mu *rwmutex) Lock(lt uint8) {
+ mu.lock = lt
+ if lt&lockTypeRead != 0 {
+ mu.rcnt++
+ }
+}
+
+func (mu *rwmutex) Unlock() {
+ mu.rcnt--
+ if mu.rcnt == 0 {
+ mu.lock = 0
}
}