diff options
author | 2023-10-31 11:12:22 +0000 | |
---|---|---|
committer | 2023-10-31 11:12:22 +0000 | |
commit | ce71a5a7902963538fc54583588850563f6746cc (patch) | |
tree | 3e869eba6d25d2db5fe81184ffee595e451b3147 /vendor/codeberg.org/gruf/go-mutexes/map.go | |
parent | [bugfix] Relax `Mention` parsing, allowing either href or name (#2320) (diff) | |
download | gotosocial-ce71a5a7902963538fc54583588850563f6746cc.tar.xz |
[feature] add per-uri dereferencer locks (#2291)
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes/map.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 568 |
1 files changed, 184 insertions, 384 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index 73f8f1821..6fcd9c9b1 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -1,9 +1,8 @@ package mutexes import ( - "runtime" "sync" - "sync/atomic" + "unsafe" ) const ( @@ -12,452 +11,253 @@ const ( lockTypeWrite = uint8(1) << 1 lockTypeMap = uint8(1) << 2 - // possible mutexmap states. - stateUnlockd = uint8(0) - stateRLocked = uint8(1) - stateLocked = uint8(2) - stateInUse = uint8(3) - - // default values. - defaultWake = 1024 + // frequency of GC cycles + // per no. unlocks. i.e. + // every 'gcfreq' unlocks. + gcfreq = 1024 ) -// acquireState attempts to acquire required map state for lockType. -func acquireState(state uint8, lt uint8) (uint8, bool) { - switch state { - // Unlocked state - // (all allowed) - case stateUnlockd: - - // Keys locked, no state lock. - // (don't allow map locks) - case stateInUse: - if lt&lockTypeMap != 0 { - return 0, false - } - - // Read locked - // (only allow read locks) - case stateRLocked: - if lt&lockTypeRead == 0 { - return 0, false - } - - // Write locked - // (none allowed) - case stateLocked: - return 0, false - - // shouldn't reach here - default: - panic("unexpected state") - } - - switch { - // If unlocked and not a map - // lock request, set in use - case lt&lockTypeMap == 0: - if state == stateUnlockd { - state = stateInUse - } - - // Set read lock state - case lt&lockTypeRead != 0: - state = stateRLocked - - // Set write lock state - case lt&lockTypeWrite != 0: - state = stateLocked - - default: - panic("unexpected lock type") - } - - return state, true -} - -// MutexMap is a structure that allows read / write locking key, performing -// as you'd expect a map[string]*sync.RWMutex to perform. The differences -// being that the entire map can itself be read / write locked, it uses memory -// pooling for the mutex (not quite) structures, and it is self-evicting. The -// core configurations of maximum no. open locks and wake modulus* are user -// definable. +// MutexMap is a structure that allows read / write locking +// per key, performing as you'd expect a map[string]*RWMutex +// to perform, without you needing to worry about deadlocks +// between competing read / write locks and the map's own mutex. +// It uses memory pooling for the internal "mutex" (ish) types +// and performs self-eviction of keys. // -// * The wake modulus is the number that the current number of open locks is -// modulused against to determine how often to notify sleeping goroutines. -// These are goroutines that are attempting to lock a key / whole map and are -// awaiting a permissible state (.e.g no key write locks allowed when the -// map is read locked). +// Under the hood this is achieved using a single mutex for the +// map, state tracking for individual keys, and some simple waitgroup +// type structures to park / block goroutines waiting for keys. type MutexMap struct { - queue *sync.WaitGroup - qucnt int32 - - mumap map[string]*rwmutex - mpool pool - evict []*rwmutex - - count int32 - maxmu int32 - wake int32 - - mapmu sync.Mutex - state uint8 + mapmu sync.Mutex + mumap map[string]*rwmutexish + mupool rwmutexPool + count uint32 } -// NewMap returns a new MutexMap instance with provided max no. open mutexes. -func NewMap(max, wake int32) MutexMap { - // Determine wake mod. - if wake < 1 { - wake = defaultWake +// checkInit ensures MutexMap is initialized (UNSAFE). +func (mm *MutexMap) checkInit() { + if mm.mumap == nil { + mm.mumap = make(map[string]*rwmutexish) } +} - // Determine max no. mutexes - if max < 1 { - procs := runtime.GOMAXPROCS(0) - max = wake * int32(procs) - } +// Lock acquires a write lock on key in map, returning unlock function. +func (mm *MutexMap) Lock(key string) func() { + return mm.lock(key, lockTypeWrite) +} - return MutexMap{ - queue: &sync.WaitGroup{}, - mumap: make(map[string]*rwmutex, max), - maxmu: max, - wake: wake, - } +// RLock acquires a read lock on key in map, returning runlock function. +func (mm *MutexMap) RLock(key string) func() { + return mm.lock(key, lockTypeRead) } -// SET sets the MutexMap max open locks and wake modulus, returns current values. -// For values less than zero defaults are set, and zero is non-op. -func (mm *MutexMap) SET(max, wake int32) (int32, int32) { +func (mm *MutexMap) lock(key string, lt uint8) func() { + // Perform first map lock + // and check initialization + // OUTSIDE the main loop. mm.mapmu.Lock() + mm.checkInit() - switch { - // Set default wake - case wake < 0: - mm.wake = defaultWake - - // Set supplied wake - case wake > 0: - mm.wake = wake - } - - switch { - // Set default max - case max < 0: - procs := runtime.GOMAXPROCS(0) - mm.maxmu = wake * int32(procs) - - // Set supplied max - case max > 0: - mm.maxmu = max - } - - // Fetch values - max = mm.maxmu - wake = mm.wake - - mm.mapmu.Unlock() - return max, wake -} - -// spinLock will wait (using a mutex to sleep thread) until conditional returns true. -func (mm *MutexMap) spinLock(cond func() bool) { for { - // Acquire map lock - mm.mapmu.Lock() + // Check map for mu. + mu := mm.mumap[key] - if cond() { - return + if mu == nil { + // Allocate new mutex. + mu = mm.mupool.Acquire() + mm.mumap[key] = mu } - // Current queue ptr - queue := mm.queue - - // Queue ourselves - queue.Add(1) - mm.qucnt++ + if !mu.Lock(lt) { + // Wait on mutex unlock, after + // immediately relocking map mu. + mu.WaitRelock(&mm.mapmu) + continue + } - // Unlock map + // Done with map. mm.mapmu.Unlock() - // Wait on notify - mm.queue.Wait() + // Return mutex unlock function. + return func() { mm.unlock(key, mu) } } } -// lock will acquire a lock of given type on the 'mutex' at key. -func (mm *MutexMap) lock(key string, lt uint8) func() { - var ok bool - var mu *rwmutex - - // Spin lock until returns true - mm.spinLock(func() bool { - // Check not overloaded - if !(mm.count < mm.maxmu) { - return false - } - - // Attempt to acquire usable map state - state, ok := acquireState(mm.state, lt) - if !ok { - return false - } - - // Update state - mm.state = state - - // Ensure mutex at key - // is in lockable state - mu, ok = mm.mumap[key] - return !ok || mu.CanLock(lt) - }) - - // Incr count - mm.count++ - - if !ok { - // No mutex found for key - - // Alloc mu from pool - mu = mm.mpool.Acquire() - mm.mumap[key] = mu +func (mm *MutexMap) unlock(key string, mu *rwmutexish) { + // Get map lock. + mm.mapmu.Lock() - // Set our key - mu.key = key + // Unlock mutex. + if mu.Unlock() { - // Queue for eviction - mm.evict = append(mm.evict, mu) + // Mutex fully unlocked + // with zero waiters. Self + // evict and release it. + delete(mm.mumap, key) + mm.mupool.Release(mu) } - // Lock mutex - mu.Lock(lt) - - // Unlock map - mm.mapmu.Unlock() - - return func() { - mm.mapmu.Lock() - mu.Unlock() - mm.cleanup() + if mm.count++; mm.count%gcfreq == 0 { + // Every 'gcfreq' unlocks perform + // a garbage collection to keep + // us squeaky clean :] + mm.mupool.GC() } -} -// lockMap will lock the whole map under given lock type. -func (mm *MutexMap) lockMap(lt uint8) { - // Spin lock until returns true - mm.spinLock(func() bool { - // Attempt to acquire usable map state - state, ok := acquireState(mm.state, lt) - if !ok { - return false - } - - // Update state - mm.state = state - - return true - }) - - // Incr count - mm.count++ - - // State acquired, unlock + // Done with map. mm.mapmu.Unlock() } -// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map. -func (mm *MutexMap) cleanup() { - // Decr count - mm.count-- - - // Calculate current wake modulus - wakemod := mm.count % mm.wake - - if mm.count != 0 && wakemod != 0 { - // Fast path => no cleanup. - // Unlock, return early - mm.mapmu.Unlock() - return - } - - go func() { - if wakemod == 0 { - // Release queued goroutines - mm.queue.Add(-int(mm.qucnt)) - - // Allocate new queue and reset - mm.queue = &sync.WaitGroup{} - mm.qucnt = 0 - } - - if mm.count == 0 { - // Perform evictions - for _, mu := range mm.evict { - key := mu.key - mu.key = "" - delete(mm.mumap, key) - mm.mpool.Release(mu) - } - - // Reset map state - mm.evict = mm.evict[:0] - mm.state = stateUnlockd - mm.mpool.GC() - } - - // Unlock map - mm.mapmu.Unlock() - }() +// rwmutexPool is a very simply memory rwmutexPool. +type rwmutexPool struct { + current []*rwmutexish + victim []*rwmutexish } -// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks. -// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. -func (mm *MutexMap) RLockMap() *LockState { - mm.lockMap(lockTypeRead | lockTypeMap) - return &LockState{ - mmap: mm, - ltyp: lockTypeRead, +// Acquire will returns a rwmutexState from rwmutexPool (or alloc new). +func (p *rwmutexPool) Acquire() *rwmutexish { + // First try the current queue + if l := len(p.current) - 1; l >= 0 { + mu := p.current[l] + p.current = p.current[:l] + return mu } -} -// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks. -// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. -func (mm *MutexMap) LockMap() *LockState { - mm.lockMap(lockTypeWrite | lockTypeMap) - return &LockState{ - mmap: mm, - ltyp: lockTypeWrite, + // Next try the victim queue. + if l := len(p.victim) - 1; l >= 0 { + mu := p.victim[l] + p.victim = p.victim[:l] + return mu } -} - -// RLock acquires a mutex read lock for supplied key, returning an RUnlock function. -func (mm *MutexMap) RLock(key string) (runlock func()) { - return mm.lock(key, lockTypeRead) -} -// Lock acquires a mutex write lock for supplied key, returning an Unlock function. -func (mm *MutexMap) Lock(key string) (unlock func()) { - return mm.lock(key, lockTypeWrite) + // Lastly, alloc new. + mu := new(rwmutexish) + return mu } -// LockState represents a window to a locked MutexMap. -type LockState struct { - wait sync.WaitGroup - mmap *MutexMap - done uint32 - ltyp uint8 +// Release places a sync.rwmutexState back in the rwmutexPool. +func (p *rwmutexPool) Release(mu *rwmutexish) { + p.current = append(p.current, mu) } -// Lock: see MutexMap.Lock() definition. Will panic if map only read locked. -func (st *LockState) Lock(key string) (unlock func()) { - return st.lock(key, lockTypeWrite) +// GC will clear out unused entries from the rwmutexPool. +func (p *rwmutexPool) GC() { + current := p.current + p.current = nil + p.victim = current } -// RLock: see MutexMap.RLock() definition. -func (st *LockState) RLock(key string) (runlock func()) { - return st.lock(key, lockTypeRead) +// rwmutexish is a RW mutex (ish), i.e. the representation +// of one only to be accessed within +type rwmutexish struct { + tr trigger + ln int32 // no. locks + wn int32 // no. waiters + lt uint8 // lock type } -// lock: see MutexMap.lock() definition. -func (st *LockState) lock(key string, lt uint8) func() { - st.wait.Add(1) // track lock - - if atomic.LoadUint32(&st.done) == 1 { - panic("called (r)lock on unlocked state") - } else if lt&lockTypeWrite != 0 && - st.ltyp&lockTypeWrite == 0 { - panic("called lock on rlocked map") - } - - var ok bool - var mu *rwmutex - - // Spin lock until returns true - st.mmap.spinLock(func() bool { - // Check not overloaded - if !(st.mmap.count < st.mmap.maxmu) { +// Lock will lock the mutex for given lock type, in the +// sense that it will update the internal state tracker +// accordingly. Return value is true on successful lock. +func (mu *rwmutexish) Lock(lt uint8) bool { + switch mu.lt { + case lockTypeRead: + // already read locked, + // only permit more reads. + if lt != lockTypeRead { return false } - // Ensure mutex at key - // is in lockable state - mu, ok = st.mmap.mumap[key] - return !ok || mu.CanLock(lt) - }) + case lockTypeWrite: + // already write locked, + // no other locks allowed. + return false - // Incr count - st.mmap.count++ + default: + // Fully unlocked. + mu.lt = lt + } - if !ok { - // No mutex found for key + // Update + // count. + mu.ln++ - // Alloc mu from pool - mu = st.mmap.mpool.Acquire() - st.mmap.mumap[key] = mu + return true +} - // Set our key - mu.key = key +// Unlock will unlock the mutex, in the sense that +// it will update the internal state tracker accordingly. +// On any unlock it will awaken sleeping waiting threads. +// Returned boolean is if unlocked=true AND waiters=0. +func (mu *rwmutexish) Unlock() bool { + var ok bool - // Queue for eviction - st.mmap.evict = append(st.mmap.evict, mu) + switch mu.ln--; { + case mu.ln > 0 && mu.lt == lockTypeWrite: + panic("BUG: multiple writer locks") + case mu.ln < 0: + panic("BUG: negative lock count") + case mu.ln == 0: + // Fully unlocked. + mu.lt = 0 + + // Only return true + // with no waiters. + ok = (mu.wn == 0) } - // Lock mutex - mu.Lock(lt) - - // Unlock map - st.mmap.mapmu.Unlock() - - return func() { - st.mmap.mapmu.Lock() - mu.Unlock() - st.mmap.cleanup() - st.wait.Add(-1) - } + // Awake all waiting + // goroutines for mu. + mu.tr.Trigger() + return ok } -// UnlockMap will close this state and release the currently locked map. -func (st *LockState) UnlockMap() { - if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { - panic("called unlockmap on expired state") - } - st.wait.Wait() - st.mmap.mapmu.Lock() - st.mmap.cleanup() +// WaitRelock expects a mutex to be passed in already in +// the lock state. It incr the rwmutexish waiter count before +// unlocking the outer mutex and blocking on internal trigger. +// On awake it will relock outer mutex and decr wait count. +func (mu *rwmutexish) WaitRelock(outer *sync.Mutex) { + mu.wn++ + outer.Unlock() + mu.tr.Wait() + outer.Lock() + mu.wn-- } -// rwmutex is a very simple *representation* of a read-write -// mutex, though not one in implementation. it works by -// tracking the lock state for a given map key, which is -// protected by the map's mutex. -type rwmutex struct { - rcnt int32 // read lock count - lock uint8 // lock type - key string // map key -} +// trigger uses the internals of sync.Cond to provide +// a waitgroup type structure (including goroutine parks) +// without such a heavy reliance on a delta value. +type trigger struct{ notifyList } -func (mu *rwmutex) CanLock(lt uint8) bool { - return mu.lock == 0 || - (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0) +func (t *trigger) Trigger() { + runtime_notifyListNotifyAll(&t.notifyList) } -func (mu *rwmutex) Lock(lt uint8) { - // Set lock type - mu.lock = lt +func (t *trigger) Wait() { + v := runtime_notifyListAdd(&t.notifyList) + runtime_notifyListWait(&t.notifyList, v) +} - if lt&lockTypeRead != 0 { - // RLock, increment - mu.rcnt++ - } +// Approximation of notifyList in runtime/sema.go. +type notifyList struct { + wait uint32 + notify uint32 + lock uintptr // key field of the mutex + head unsafe.Pointer + tail unsafe.Pointer } -func (mu *rwmutex) Unlock() { - if mu.rcnt > 0 { - // RUnlock - mu.rcnt-- - } +// See runtime/sema.go for documentation. +// +//go:linkname runtime_notifyListAdd sync.runtime_notifyListAdd +func runtime_notifyListAdd(l *notifyList) uint32 - if mu.rcnt == 0 { - // Total unlock - mu.lock = 0 - } -} +// See runtime/sema.go for documentation. +// +//go:linkname runtime_notifyListWait sync.runtime_notifyListWait +func runtime_notifyListWait(l *notifyList, t uint32) + +// See runtime/sema.go for documentation. +// +//go:linkname runtime_notifyListNotifyAll sync.runtime_notifyListNotifyAll +func runtime_notifyListNotifyAll(l *notifyList) |