diff options
author | 2023-10-31 11:12:22 +0000 | |
---|---|---|
committer | 2023-10-31 11:12:22 +0000 | |
commit | ce71a5a7902963538fc54583588850563f6746cc (patch) | |
tree | 3e869eba6d25d2db5fe81184ffee595e451b3147 /vendor/codeberg.org/gruf/go-mutexes | |
parent | [bugfix] Relax `Mention` parsing, allowing either href or name (#2320) (diff) | |
download | gotosocial-ce71a5a7902963538fc54583588850563f6746cc.tar.xz |
[feature] add per-uri dereferencer locks (#2291)
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes')
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/LICENSE | 2 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 568 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map_pool.go | 39 |
3 files changed, 185 insertions, 424 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/LICENSE b/vendor/codeberg.org/gruf/go-mutexes/LICENSE index b7c4417ac..d6f08d0ab 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/LICENSE +++ b/vendor/codeberg.org/gruf/go-mutexes/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 gruf +Copyright (c) gruf Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index 73f8f1821..6fcd9c9b1 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -1,9 +1,8 @@ package mutexes import ( - "runtime" "sync" - "sync/atomic" + "unsafe" ) const ( @@ -12,452 +11,253 @@ const ( lockTypeWrite = uint8(1) << 1 lockTypeMap = uint8(1) << 2 - // possible mutexmap states. - stateUnlockd = uint8(0) - stateRLocked = uint8(1) - stateLocked = uint8(2) - stateInUse = uint8(3) - - // default values. - defaultWake = 1024 + // frequency of GC cycles + // per no. unlocks. i.e. + // every 'gcfreq' unlocks. + gcfreq = 1024 ) -// acquireState attempts to acquire required map state for lockType. -func acquireState(state uint8, lt uint8) (uint8, bool) { - switch state { - // Unlocked state - // (all allowed) - case stateUnlockd: - - // Keys locked, no state lock. - // (don't allow map locks) - case stateInUse: - if lt&lockTypeMap != 0 { - return 0, false - } - - // Read locked - // (only allow read locks) - case stateRLocked: - if lt&lockTypeRead == 0 { - return 0, false - } - - // Write locked - // (none allowed) - case stateLocked: - return 0, false - - // shouldn't reach here - default: - panic("unexpected state") - } - - switch { - // If unlocked and not a map - // lock request, set in use - case lt&lockTypeMap == 0: - if state == stateUnlockd { - state = stateInUse - } - - // Set read lock state - case lt&lockTypeRead != 0: - state = stateRLocked - - // Set write lock state - case lt&lockTypeWrite != 0: - state = stateLocked - - default: - panic("unexpected lock type") - } - - return state, true -} - -// MutexMap is a structure that allows read / write locking key, performing -// as you'd expect a map[string]*sync.RWMutex to perform. The differences -// being that the entire map can itself be read / write locked, it uses memory -// pooling for the mutex (not quite) structures, and it is self-evicting. The -// core configurations of maximum no. open locks and wake modulus* are user -// definable. +// MutexMap is a structure that allows read / write locking +// per key, performing as you'd expect a map[string]*RWMutex +// to perform, without you needing to worry about deadlocks +// between competing read / write locks and the map's own mutex. +// It uses memory pooling for the internal "mutex" (ish) types +// and performs self-eviction of keys. // -// * The wake modulus is the number that the current number of open locks is -// modulused against to determine how often to notify sleeping goroutines. -// These are goroutines that are attempting to lock a key / whole map and are -// awaiting a permissible state (.e.g no key write locks allowed when the -// map is read locked). +// Under the hood this is achieved using a single mutex for the +// map, state tracking for individual keys, and some simple waitgroup +// type structures to park / block goroutines waiting for keys. type MutexMap struct { - queue *sync.WaitGroup - qucnt int32 - - mumap map[string]*rwmutex - mpool pool - evict []*rwmutex - - count int32 - maxmu int32 - wake int32 - - mapmu sync.Mutex - state uint8 + mapmu sync.Mutex + mumap map[string]*rwmutexish + mupool rwmutexPool + count uint32 } -// NewMap returns a new MutexMap instance with provided max no. open mutexes. -func NewMap(max, wake int32) MutexMap { - // Determine wake mod. - if wake < 1 { - wake = defaultWake +// checkInit ensures MutexMap is initialized (UNSAFE). +func (mm *MutexMap) checkInit() { + if mm.mumap == nil { + mm.mumap = make(map[string]*rwmutexish) } +} - // Determine max no. mutexes - if max < 1 { - procs := runtime.GOMAXPROCS(0) - max = wake * int32(procs) - } +// Lock acquires a write lock on key in map, returning unlock function. +func (mm *MutexMap) Lock(key string) func() { + return mm.lock(key, lockTypeWrite) +} - return MutexMap{ - queue: &sync.WaitGroup{}, - mumap: make(map[string]*rwmutex, max), - maxmu: max, - wake: wake, - } +// RLock acquires a read lock on key in map, returning runlock function. +func (mm *MutexMap) RLock(key string) func() { + return mm.lock(key, lockTypeRead) } -// SET sets the MutexMap max open locks and wake modulus, returns current values. -// For values less than zero defaults are set, and zero is non-op. -func (mm *MutexMap) SET(max, wake int32) (int32, int32) { +func (mm *MutexMap) lock(key string, lt uint8) func() { + // Perform first map lock + // and check initialization + // OUTSIDE the main loop. mm.mapmu.Lock() + mm.checkInit() - switch { - // Set default wake - case wake < 0: - mm.wake = defaultWake - - // Set supplied wake - case wake > 0: - mm.wake = wake - } - - switch { - // Set default max - case max < 0: - procs := runtime.GOMAXPROCS(0) - mm.maxmu = wake * int32(procs) - - // Set supplied max - case max > 0: - mm.maxmu = max - } - - // Fetch values - max = mm.maxmu - wake = mm.wake - - mm.mapmu.Unlock() - return max, wake -} - -// spinLock will wait (using a mutex to sleep thread) until conditional returns true. -func (mm *MutexMap) spinLock(cond func() bool) { for { - // Acquire map lock - mm.mapmu.Lock() + // Check map for mu. + mu := mm.mumap[key] - if cond() { - return + if mu == nil { + // Allocate new mutex. + mu = mm.mupool.Acquire() + mm.mumap[key] = mu } - // Current queue ptr - queue := mm.queue - - // Queue ourselves - queue.Add(1) - mm.qucnt++ + if !mu.Lock(lt) { + // Wait on mutex unlock, after + // immediately relocking map mu. + mu.WaitRelock(&mm.mapmu) + continue + } - // Unlock map + // Done with map. mm.mapmu.Unlock() - // Wait on notify - mm.queue.Wait() + // Return mutex unlock function. + return func() { mm.unlock(key, mu) } } } -// lock will acquire a lock of given type on the 'mutex' at key. -func (mm *MutexMap) lock(key string, lt uint8) func() { - var ok bool - var mu *rwmutex - - // Spin lock until returns true - mm.spinLock(func() bool { - // Check not overloaded - if !(mm.count < mm.maxmu) { - return false - } - - // Attempt to acquire usable map state - state, ok := acquireState(mm.state, lt) - if !ok { - return false - } - - // Update state - mm.state = state - - // Ensure mutex at key - // is in lockable state - mu, ok = mm.mumap[key] - return !ok || mu.CanLock(lt) - }) - - // Incr count - mm.count++ - - if !ok { - // No mutex found for key - - // Alloc mu from pool - mu = mm.mpool.Acquire() - mm.mumap[key] = mu +func (mm *MutexMap) unlock(key string, mu *rwmutexish) { + // Get map lock. + mm.mapmu.Lock() - // Set our key - mu.key = key + // Unlock mutex. + if mu.Unlock() { - // Queue for eviction - mm.evict = append(mm.evict, mu) + // Mutex fully unlocked + // with zero waiters. Self + // evict and release it. + delete(mm.mumap, key) + mm.mupool.Release(mu) } - // Lock mutex - mu.Lock(lt) - - // Unlock map - mm.mapmu.Unlock() - - return func() { - mm.mapmu.Lock() - mu.Unlock() - mm.cleanup() + if mm.count++; mm.count%gcfreq == 0 { + // Every 'gcfreq' unlocks perform + // a garbage collection to keep + // us squeaky clean :] + mm.mupool.GC() } -} -// lockMap will lock the whole map under given lock type. -func (mm *MutexMap) lockMap(lt uint8) { - // Spin lock until returns true - mm.spinLock(func() bool { - // Attempt to acquire usable map state - state, ok := acquireState(mm.state, lt) - if !ok { - return false - } - - // Update state - mm.state = state - - return true - }) - - // Incr count - mm.count++ - - // State acquired, unlock + // Done with map. mm.mapmu.Unlock() } -// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map. -func (mm *MutexMap) cleanup() { - // Decr count - mm.count-- - - // Calculate current wake modulus - wakemod := mm.count % mm.wake - - if mm.count != 0 && wakemod != 0 { - // Fast path => no cleanup. - // Unlock, return early - mm.mapmu.Unlock() - return - } - - go func() { - if wakemod == 0 { - // Release queued goroutines - mm.queue.Add(-int(mm.qucnt)) - - // Allocate new queue and reset - mm.queue = &sync.WaitGroup{} - mm.qucnt = 0 - } - - if mm.count == 0 { - // Perform evictions - for _, mu := range mm.evict { - key := mu.key - mu.key = "" - delete(mm.mumap, key) - mm.mpool.Release(mu) - } - - // Reset map state - mm.evict = mm.evict[:0] - mm.state = stateUnlockd - mm.mpool.GC() - } - - // Unlock map - mm.mapmu.Unlock() - }() +// rwmutexPool is a very simply memory rwmutexPool. +type rwmutexPool struct { + current []*rwmutexish + victim []*rwmutexish } -// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks. -// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. -func (mm *MutexMap) RLockMap() *LockState { - mm.lockMap(lockTypeRead | lockTypeMap) - return &LockState{ - mmap: mm, - ltyp: lockTypeRead, +// Acquire will returns a rwmutexState from rwmutexPool (or alloc new). +func (p *rwmutexPool) Acquire() *rwmutexish { + // First try the current queue + if l := len(p.current) - 1; l >= 0 { + mu := p.current[l] + p.current = p.current[:l] + return mu } -} -// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks. -// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. -func (mm *MutexMap) LockMap() *LockState { - mm.lockMap(lockTypeWrite | lockTypeMap) - return &LockState{ - mmap: mm, - ltyp: lockTypeWrite, + // Next try the victim queue. + if l := len(p.victim) - 1; l >= 0 { + mu := p.victim[l] + p.victim = p.victim[:l] + return mu } -} - -// RLock acquires a mutex read lock for supplied key, returning an RUnlock function. -func (mm *MutexMap) RLock(key string) (runlock func()) { - return mm.lock(key, lockTypeRead) -} -// Lock acquires a mutex write lock for supplied key, returning an Unlock function. -func (mm *MutexMap) Lock(key string) (unlock func()) { - return mm.lock(key, lockTypeWrite) + // Lastly, alloc new. + mu := new(rwmutexish) + return mu } -// LockState represents a window to a locked MutexMap. -type LockState struct { - wait sync.WaitGroup - mmap *MutexMap - done uint32 - ltyp uint8 +// Release places a sync.rwmutexState back in the rwmutexPool. +func (p *rwmutexPool) Release(mu *rwmutexish) { + p.current = append(p.current, mu) } -// Lock: see MutexMap.Lock() definition. Will panic if map only read locked. -func (st *LockState) Lock(key string) (unlock func()) { - return st.lock(key, lockTypeWrite) +// GC will clear out unused entries from the rwmutexPool. +func (p *rwmutexPool) GC() { + current := p.current + p.current = nil + p.victim = current } -// RLock: see MutexMap.RLock() definition. -func (st *LockState) RLock(key string) (runlock func()) { - return st.lock(key, lockTypeRead) +// rwmutexish is a RW mutex (ish), i.e. the representation +// of one only to be accessed within +type rwmutexish struct { + tr trigger + ln int32 // no. locks + wn int32 // no. waiters + lt uint8 // lock type } -// lock: see MutexMap.lock() definition. -func (st *LockState) lock(key string, lt uint8) func() { - st.wait.Add(1) // track lock - - if atomic.LoadUint32(&st.done) == 1 { - panic("called (r)lock on unlocked state") - } else if lt&lockTypeWrite != 0 && - st.ltyp&lockTypeWrite == 0 { - panic("called lock on rlocked map") - } - - var ok bool - var mu *rwmutex - - // Spin lock until returns true - st.mmap.spinLock(func() bool { - // Check not overloaded - if !(st.mmap.count < st.mmap.maxmu) { +// Lock will lock the mutex for given lock type, in the +// sense that it will update the internal state tracker +// accordingly. Return value is true on successful lock. +func (mu *rwmutexish) Lock(lt uint8) bool { + switch mu.lt { + case lockTypeRead: + // already read locked, + // only permit more reads. + if lt != lockTypeRead { return false } - // Ensure mutex at key - // is in lockable state - mu, ok = st.mmap.mumap[key] - return !ok || mu.CanLock(lt) - }) + case lockTypeWrite: + // already write locked, + // no other locks allowed. + return false - // Incr count - st.mmap.count++ + default: + // Fully unlocked. + mu.lt = lt + } - if !ok { - // No mutex found for key + // Update + // count. + mu.ln++ - // Alloc mu from pool - mu = st.mmap.mpool.Acquire() - st.mmap.mumap[key] = mu + return true +} - // Set our key - mu.key = key +// Unlock will unlock the mutex, in the sense that +// it will update the internal state tracker accordingly. +// On any unlock it will awaken sleeping waiting threads. +// Returned boolean is if unlocked=true AND waiters=0. +func (mu *rwmutexish) Unlock() bool { + var ok bool - // Queue for eviction - st.mmap.evict = append(st.mmap.evict, mu) + switch mu.ln--; { + case mu.ln > 0 && mu.lt == lockTypeWrite: + panic("BUG: multiple writer locks") + case mu.ln < 0: + panic("BUG: negative lock count") + case mu.ln == 0: + // Fully unlocked. + mu.lt = 0 + + // Only return true + // with no waiters. + ok = (mu.wn == 0) } - // Lock mutex - mu.Lock(lt) - - // Unlock map - st.mmap.mapmu.Unlock() - - return func() { - st.mmap.mapmu.Lock() - mu.Unlock() - st.mmap.cleanup() - st.wait.Add(-1) - } + // Awake all waiting + // goroutines for mu. + mu.tr.Trigger() + return ok } -// UnlockMap will close this state and release the currently locked map. -func (st *LockState) UnlockMap() { - if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { - panic("called unlockmap on expired state") - } - st.wait.Wait() - st.mmap.mapmu.Lock() - st.mmap.cleanup() +// WaitRelock expects a mutex to be passed in already in +// the lock state. It incr the rwmutexish waiter count before +// unlocking the outer mutex and blocking on internal trigger. +// On awake it will relock outer mutex and decr wait count. +func (mu *rwmutexish) WaitRelock(outer *sync.Mutex) { + mu.wn++ + outer.Unlock() + mu.tr.Wait() + outer.Lock() + mu.wn-- } -// rwmutex is a very simple *representation* of a read-write -// mutex, though not one in implementation. it works by -// tracking the lock state for a given map key, which is -// protected by the map's mutex. -type rwmutex struct { - rcnt int32 // read lock count - lock uint8 // lock type - key string // map key -} +// trigger uses the internals of sync.Cond to provide +// a waitgroup type structure (including goroutine parks) +// without such a heavy reliance on a delta value. +type trigger struct{ notifyList } -func (mu *rwmutex) CanLock(lt uint8) bool { - return mu.lock == 0 || - (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0) +func (t *trigger) Trigger() { + runtime_notifyListNotifyAll(&t.notifyList) } -func (mu *rwmutex) Lock(lt uint8) { - // Set lock type - mu.lock = lt +func (t *trigger) Wait() { + v := runtime_notifyListAdd(&t.notifyList) + runtime_notifyListWait(&t.notifyList, v) +} - if lt&lockTypeRead != 0 { - // RLock, increment - mu.rcnt++ - } +// Approximation of notifyList in runtime/sema.go. +type notifyList struct { + wait uint32 + notify uint32 + lock uintptr // key field of the mutex + head unsafe.Pointer + tail unsafe.Pointer } -func (mu *rwmutex) Unlock() { - if mu.rcnt > 0 { - // RUnlock - mu.rcnt-- - } +// See runtime/sema.go for documentation. +// +//go:linkname runtime_notifyListAdd sync.runtime_notifyListAdd +func runtime_notifyListAdd(l *notifyList) uint32 - if mu.rcnt == 0 { - // Total unlock - mu.lock = 0 - } -} +// See runtime/sema.go for documentation. +// +//go:linkname runtime_notifyListWait sync.runtime_notifyListWait +func runtime_notifyListWait(l *notifyList, t uint32) + +// See runtime/sema.go for documentation. +// +//go:linkname runtime_notifyListNotifyAll sync.runtime_notifyListNotifyAll +func runtime_notifyListNotifyAll(l *notifyList) diff --git a/vendor/codeberg.org/gruf/go-mutexes/map_pool.go b/vendor/codeberg.org/gruf/go-mutexes/map_pool.go deleted file mode 100644 index 450e0bc06..000000000 --- a/vendor/codeberg.org/gruf/go-mutexes/map_pool.go +++ /dev/null @@ -1,39 +0,0 @@ -package mutexes - -// pool is a very simply memory pool. -type pool struct { - current []*rwmutex - victim []*rwmutex -} - -// Acquire will returns a rwmutex from pool (or alloc new). -func (p *pool) Acquire() *rwmutex { - // First try the current queue - if l := len(p.current) - 1; l >= 0 { - mu := p.current[l] - p.current = p.current[:l] - return mu - } - - // Next try the victim queue. - if l := len(p.victim) - 1; l >= 0 { - mu := p.victim[l] - p.victim = p.victim[:l] - return mu - } - - // Lastly, alloc new. - return &rwmutex{} -} - -// Release places a sync.RWMutex back in the pool. -func (p *pool) Release(mu *rwmutex) { - p.current = append(p.current, mu) -} - -// GC will clear out unused entries from the pool. -func (p *pool) GC() { - current := p.current - p.current = nil - p.victim = current -} |