diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes/map.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 496 |
1 files changed, 320 insertions, 176 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index cb31a9543..c0f740eec 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -6,260 +6,347 @@ import ( "sync/atomic" ) -// locktype defines maskable mutexmap lock types. -type locktype uint8 - const ( // possible lock types. - lockTypeRead = locktype(1) << 0 - lockTypeWrite = locktype(1) << 1 - lockTypeMap = locktype(1) << 2 + lockTypeRead = uint8(1) << 0 + lockTypeWrite = uint8(1) << 1 + lockTypeMap = uint8(1) << 2 // possible mutexmap states. stateUnlockd = uint8(0) stateRLocked = uint8(1) stateLocked = uint8(2) stateInUse = uint8(3) + + // default values. + defaultWake = 1024 ) -// permitLockType returns if provided locktype is permitted to go ahead in current state. -func permitLockType(state uint8, lt locktype) bool { +// acquireState attempts to acquire required map state for lockType. +func acquireState(state uint8, lt uint8) (uint8, bool) { switch state { // Unlocked state // (all allowed) case stateUnlockd: - return true // Keys locked, no state lock. // (don't allow map locks) case stateInUse: - return lt&lockTypeMap == 0 + if lt&lockTypeMap != 0 { + return 0, false + } // Read locked // (only allow read locks) case stateRLocked: - return lt&lockTypeRead != 0 + if lt&lockTypeRead == 0 { + return 0, false + } // Write locked // (none allowed) case stateLocked: - return false + return 0, false // shouldn't reach here default: panic("unexpected state") } + + switch { + // If unlocked and not a map + // lock request, set in use + case lt&lockTypeMap == 0: + if state == stateUnlockd { + state = stateInUse + } + + // Set read lock state + case lt&lockTypeRead != 0: + state = stateRLocked + + // Set write lock state + case lt&lockTypeWrite != 0: + state = stateLocked + + default: + panic("unexpected lock type") + } + + return state, true } -// MutexMap is a structure that allows having a map of self-evicting mutexes -// by key. You do not need to worry about managing the contents of the map, -// only requesting RLock/Lock for keys, and ensuring to call the returned -// unlock functions. +// MutexMap is a structure that allows read / write locking key, performing +// as you'd expect a map[string]*sync.RWMutex to perform. The differences +// being that the entire map can itself be read / write locked, it uses memory +// pooling for the mutex (not quite) structures, and it is self-evicting. The +// core configurations of maximum no. open locks and wake modulus* are user +// definable. +// +// * The wake modulus is the number that the current number of open locks is +// modulused against to determine how often to notify sleeping goroutines. +// These are goroutines that are attempting to lock a key / whole map and are +// awaiting a permissible state (.e.g no key write locks allowed when the +// map is read locked). type MutexMap struct { - mus map[string]RWMutex - mapMu sync.Mutex - pool sync.Pool - queue []func() - evict []func() + qpool pool + queue []*sync.Mutex + + mumap map[string]*rwmutex + mpool pool + evict []*rwmutex + count int32 maxmu int32 + wake int32 + + mapmu sync.Mutex state uint8 } // NewMap returns a new MutexMap instance with provided max no. open mutexes. -func NewMap(max int32) MutexMap { +func NewMap(max, wake int32) MutexMap { + // Determine wake mod. + if wake < 1 { + wake = defaultWake + } + + // Determine max no. mutexes if max < 1 { - // Default = 128 * GOMAXPROCS procs := runtime.GOMAXPROCS(0) - max = int32(procs * 128) + max = wake * int32(procs) } + return MutexMap{ - mus: make(map[string]RWMutex), - pool: sync.Pool{ - New: func() interface{} { - return NewRW() + qpool: pool{ + alloc: func() interface{} { + return &sync.Mutex{} + }, + }, + mumap: make(map[string]*rwmutex, max), + mpool: pool{ + alloc: func() interface{} { + return &rwmutex{} }, }, maxmu: max, + wake: wake, } } -// acquire will either acquire a mutex from pool or alloc. -func (mm *MutexMap) acquire() RWMutex { - return mm.pool.Get().(RWMutex) -} +// MAX sets the MutexMap max open locks and wake modulus, returns current values. +// For values less than zero defaults are set, and zero is non-op. +func (mm *MutexMap) SET(max, wake int32) (int32, int32) { + mm.mapmu.Lock() + + switch { + // Set default wake + case wake < 0: + mm.wake = defaultWake + + // Set supplied wake + case wake > 0: + mm.wake = wake + } + + switch { + // Set default max + case max < 0: + procs := runtime.GOMAXPROCS(0) + mm.maxmu = wake * int32(procs) + + // Set supplied max + case max > 0: + mm.maxmu = max + } -// release will release provided mutex to pool. -func (mm *MutexMap) release(mu RWMutex) { - mm.pool.Put(mu) + // Fetch values + max = mm.maxmu + wake = mm.wake + + mm.mapmu.Unlock() + return max, wake } -// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true, -// returning with map lock. Note that 'cond' is performed within a map lock. +// spinLock will wait (using a mutex to sleep thread) until conditional returns true. func (mm *MutexMap) spinLock(cond func() bool) { - mu := mm.acquire() - defer mm.release(mu) + var mu *sync.Mutex for { - // Get map lock - mm.mapMu.Lock() + // Acquire map lock + mm.mapmu.Lock() - // Check if return if cond() { + // Release mu if needed + if mu != nil { + mm.qpool.Release(mu) + } return } + // Alloc mu if needed + if mu == nil { + v := mm.qpool.Acquire() + mu = v.(*sync.Mutex) + } + // Queue ourselves - unlock := mu.Lock() - mm.queue = append(mm.queue, unlock) - mm.mapMu.Unlock() + mm.queue = append(mm.queue, mu) + mu.Lock() + + // Unlock map + mm.mapmu.Unlock() // Wait on notify - mu.Lock()() + mu.Lock() + mu.Unlock() } } -// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return. -func (mm *MutexMap) lockMutex(key string, lt locktype) func() { - var unlock func() +// lock will acquire a lock of given type on the 'mutex' at key. +func (mm *MutexMap) lock(key string, lt uint8) func() { + var ok bool + var mu *rwmutex + + // Spin lock until returns true + mm.spinLock(func() bool { + // Check not overloaded + if !(mm.count < mm.maxmu) { + return false + } + + // Attempt to acquire usable map state + state, ok := acquireState(mm.state, lt) + if !ok { + return false + } + + // Update state + mm.state = state + + // Ensure mutex at key + // is in lockable state + mu, ok = mm.mumap[key] + return !ok || mu.CanLock(lt) + }) - // Incr counter + // Incr count mm.count++ - // Check for existing mutex at key - mu, ok := mm.mus[key] if !ok { + // No mutex found for key + // Alloc from pool - mu = mm.acquire() - mm.mus[key] = mu - - // Queue mutex for eviction - mm.evict = append(mm.evict, func() { - delete(mm.mus, key) - mm.pool.Put(mu) - }) - } + v := mm.mpool.Acquire() + mu = v.(*rwmutex) + mm.mumap[key] = mu - // If no state, set in use. - // State will already have been - // set if this is from LockState{} - if mm.state == stateUnlockd { - mm.state = stateInUse - } + // Set our key + mu.key = key - switch { - // Read lock - case lt&lockTypeRead != 0: - unlock = mu.RLock() + // Queue for eviction + mm.evict = append(mm.evict, mu) + } - // Write lock - case lt&lockTypeWrite != 0: - unlock = mu.Lock() + // Lock mutex + mu.Lock(lt) - // shouldn't reach here - default: - panic("unexpected lock type") - } + // Unlock map + mm.mapmu.Unlock() - // Unlock map + return - mm.mapMu.Unlock() return func() { - mm.mapMu.Lock() - unlock() - go mm.onUnlock() + mm.mapmu.Lock() + mu.Unlock() + go mm.cleanup() } } -// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex. -func (mm *MutexMap) onUnlock() { - // Decr counter +// lockMap will lock the whole map under given lock type. +func (mm *MutexMap) lockMap(lt uint8) { + // Spin lock until returns true + mm.spinLock(func() bool { + // Attempt to acquire usable map state + state, ok := acquireState(mm.state, lt) + if !ok { + return false + } + + // Update state + mm.state = state + + return true + }) + + // Incr count + mm.count++ + + // State acquired, unlock + mm.mapmu.Unlock() +} + +// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map. +func (mm *MutexMap) cleanup() { + // Decr count mm.count-- - if mm.count < 1 { - // Perform all queued evictions - for i := 0; i < len(mm.evict); i++ { - mm.evict[i]() + if mm.count%mm.wake == 0 { + // Notify queued routines + for _, mu := range mm.queue { + mu.Unlock() } - // Notify all waiting goroutines - for i := 0; i < len(mm.queue); i++ { - mm.queue[i]() + // Reset queue + mm.queue = mm.queue[:0] + } + + if mm.count < 1 { + // Perform evictions + for _, mu := range mm.evict { + key := mu.key + mu.key = "" + delete(mm.mumap, key) + mm.mpool.Release(mu) } - // Reset the map state - mm.evict = nil - mm.queue = nil + // Reset map state + mm.evict = mm.evict[:0] mm.state = stateUnlockd + mm.mpool.GC() + mm.qpool.GC() } - // Finally, unlock - mm.mapMu.Unlock() + // Unlock map + mm.mapmu.Unlock() } // RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks. // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. func (mm *MutexMap) RLockMap() *LockState { - return mm.getMapLock(lockTypeRead) + mm.lockMap(lockTypeRead | lockTypeMap) + return &LockState{ + mmap: mm, + ltyp: lockTypeRead, + } } // LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks. // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. func (mm *MutexMap) LockMap() *LockState { - return mm.getMapLock(lockTypeWrite) + mm.lockMap(lockTypeWrite | lockTypeMap) + return &LockState{ + mmap: mm, + ltyp: lockTypeWrite, + } } // RLock acquires a mutex read lock for supplied key, returning an RUnlock function. func (mm *MutexMap) RLock(key string) (runlock func()) { - return mm.getLock(key, lockTypeRead) + return mm.lock(key, lockTypeRead) } // Lock acquires a mutex write lock for supplied key, returning an Unlock function. func (mm *MutexMap) Lock(key string) (unlock func()) { - return mm.getLock(key, lockTypeWrite) -} - -// getLock will fetch lock of provided type, for given key, returning unlock function. -func (mm *MutexMap) getLock(key string, lt locktype) func() { - // Spin until achieve lock - mm.spinLock(func() bool { - return permitLockType(mm.state, lt) && - mm.count < mm.maxmu // not overloaded - }) - - // Perform actual mutex lock - return mm.lockMutex(key, lt) -} - -// getMapLock will acquire a map lock of provided type, returning a LockState session. -func (mm *MutexMap) getMapLock(lt locktype) *LockState { - // Spin until achieve lock - mm.spinLock(func() bool { - return permitLockType(mm.state, lt|lockTypeMap) && - mm.count < mm.maxmu // not overloaded - }) - - // Incr counter - mm.count++ - - switch { - // Set read lock state - case lt&lockTypeRead != 0: - mm.state = stateRLocked - - // Set write lock state - case lt&lockTypeWrite != 0: - mm.state = stateLocked - - default: - panic("unexpected lock type") - } - - // Unlock + return - mm.mapMu.Unlock() - return &LockState{ - mmap: mm, - ltyp: lt, - } + return mm.lock(key, lockTypeWrite) } // LockState represents a window to a locked MutexMap. @@ -267,56 +354,113 @@ type LockState struct { wait sync.WaitGroup mmap *MutexMap done uint32 - ltyp locktype + ltyp uint8 } // Lock: see MutexMap.Lock() definition. Will panic if map only read locked. func (st *LockState) Lock(key string) (unlock func()) { - return st.getLock(key, lockTypeWrite) + return st.lock(key, lockTypeWrite) } // RLock: see MutexMap.RLock() definition. func (st *LockState) RLock(key string) (runlock func()) { - return st.getLock(key, lockTypeRead) + return st.lock(key, lockTypeRead) } -// UnlockMap will close this state and release the currently locked map. -func (st *LockState) UnlockMap() { - // Set state to finished (or panic if already done) - if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { - panic("called UnlockMap() on expired state") - } - - // Wait until done - st.wait.Wait() - - // Async reset map - st.mmap.mapMu.Lock() - go st.mmap.onUnlock() -} - -// getLock: see MutexMap.getLock() definition. -func (st *LockState) getLock(key string, lt locktype) func() { +// lock: see MutexMap.lock() definition. +func (st *LockState) lock(key string, lt uint8) func() { st.wait.Add(1) // track lock - // Check if closed, or if write lock is allowed if atomic.LoadUint32(&st.done) == 1 { - panic("map lock closed") + panic("called (r)lock on unlocked state") } else if lt&lockTypeWrite != 0 && st.ltyp&lockTypeWrite == 0 { - panic("called .Lock() on rlocked map") + panic("called lock on rlocked map") } - // Spin until achieve map lock + var ok bool + var mu *rwmutex + + // Spin lock until returns true st.mmap.spinLock(func() bool { - return st.mmap.count < st.mmap.maxmu - }) // i.e. not overloaded + // Check not overloaded + if !(st.mmap.count < st.mmap.maxmu) { + return false + } + + // Ensure mutex at key + // is in lockable state + mu, ok = st.mmap.mumap[key] + return !ok || mu.CanLock(lt) + }) + + // Incr count + st.mmap.count++ + + if !ok { + // No mutex found for key + + // Alloc from pool + v := st.mmap.mpool.Acquire() + mu = v.(*rwmutex) + st.mmap.mumap[key] = mu + + // Set our key + mu.key = key + + // Queue for eviction + st.mmap.evict = append(st.mmap.evict, mu) + } - // Perform actual mutex lock - unlock := st.mmap.lockMutex(key, lt) + // Lock mutex + mu.Lock(lt) + + // Unlock map + st.mmap.mapmu.Unlock() return func() { - unlock() - st.wait.Done() + st.mmap.mapmu.Lock() + mu.Unlock() + go st.mmap.cleanup() + st.wait.Add(-1) + } +} + +// UnlockMap will close this state and release the currently locked map. +func (st *LockState) UnlockMap() { + if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { + panic("called unlockmap on expired state") + } + st.wait.Wait() + st.mmap.mapmu.Lock() + go st.mmap.cleanup() +} + +// rwmutex is a very simple *representation* of a read-write +// mutex, though not one in implementation. it works by +// tracking the lock state for a given map key, which is +// protected by the map's mutex. +type rwmutex struct { + rcnt uint32 + lock uint8 + key string +} + +func (mu *rwmutex) CanLock(lt uint8) bool { + return mu.lock == 0 || + (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0) +} + +func (mu *rwmutex) Lock(lt uint8) { + mu.lock = lt + if lt&lockTypeRead != 0 { + mu.rcnt++ + } +} + +func (mu *rwmutex) Unlock() { + mu.rcnt-- + if mu.rcnt == 0 { + mu.lock = 0 } } |