summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-mutexes/map.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2023-10-31 11:12:22 +0000
committerLibravatar GitHub <noreply@github.com>2023-10-31 11:12:22 +0000
commitce71a5a7902963538fc54583588850563f6746cc (patch)
tree3e869eba6d25d2db5fe81184ffee595e451b3147 /vendor/codeberg.org/gruf/go-mutexes/map.go
parent[bugfix] Relax `Mention` parsing, allowing either href or name (#2320) (diff)
downloadgotosocial-ce71a5a7902963538fc54583588850563f6746cc.tar.xz
[feature] add per-uri dereferencer locks (#2291)
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes/map.go')
-rw-r--r--vendor/codeberg.org/gruf/go-mutexes/map.go568
1 files changed, 184 insertions, 384 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go
index 73f8f1821..6fcd9c9b1 100644
--- a/vendor/codeberg.org/gruf/go-mutexes/map.go
+++ b/vendor/codeberg.org/gruf/go-mutexes/map.go
@@ -1,9 +1,8 @@
package mutexes
import (
- "runtime"
"sync"
- "sync/atomic"
+ "unsafe"
)
const (
@@ -12,452 +11,253 @@ const (
lockTypeWrite = uint8(1) << 1
lockTypeMap = uint8(1) << 2
- // possible mutexmap states.
- stateUnlockd = uint8(0)
- stateRLocked = uint8(1)
- stateLocked = uint8(2)
- stateInUse = uint8(3)
-
- // default values.
- defaultWake = 1024
+ // frequency of GC cycles
+ // per no. unlocks. i.e.
+ // every 'gcfreq' unlocks.
+ gcfreq = 1024
)
-// acquireState attempts to acquire required map state for lockType.
-func acquireState(state uint8, lt uint8) (uint8, bool) {
- switch state {
- // Unlocked state
- // (all allowed)
- case stateUnlockd:
-
- // Keys locked, no state lock.
- // (don't allow map locks)
- case stateInUse:
- if lt&lockTypeMap != 0 {
- return 0, false
- }
-
- // Read locked
- // (only allow read locks)
- case stateRLocked:
- if lt&lockTypeRead == 0 {
- return 0, false
- }
-
- // Write locked
- // (none allowed)
- case stateLocked:
- return 0, false
-
- // shouldn't reach here
- default:
- panic("unexpected state")
- }
-
- switch {
- // If unlocked and not a map
- // lock request, set in use
- case lt&lockTypeMap == 0:
- if state == stateUnlockd {
- state = stateInUse
- }
-
- // Set read lock state
- case lt&lockTypeRead != 0:
- state = stateRLocked
-
- // Set write lock state
- case lt&lockTypeWrite != 0:
- state = stateLocked
-
- default:
- panic("unexpected lock type")
- }
-
- return state, true
-}
-
-// MutexMap is a structure that allows read / write locking key, performing
-// as you'd expect a map[string]*sync.RWMutex to perform. The differences
-// being that the entire map can itself be read / write locked, it uses memory
-// pooling for the mutex (not quite) structures, and it is self-evicting. The
-// core configurations of maximum no. open locks and wake modulus* are user
-// definable.
+// MutexMap is a structure that allows read / write locking
+// per key, performing as you'd expect a map[string]*RWMutex
+// to perform, without you needing to worry about deadlocks
+// between competing read / write locks and the map's own mutex.
+// It uses memory pooling for the internal "mutex" (ish) types
+// and performs self-eviction of keys.
//
-// * The wake modulus is the number that the current number of open locks is
-// modulused against to determine how often to notify sleeping goroutines.
-// These are goroutines that are attempting to lock a key / whole map and are
-// awaiting a permissible state (.e.g no key write locks allowed when the
-// map is read locked).
+// Under the hood this is achieved using a single mutex for the
+// map, state tracking for individual keys, and some simple waitgroup
+// type structures to park / block goroutines waiting for keys.
type MutexMap struct {
- queue *sync.WaitGroup
- qucnt int32
-
- mumap map[string]*rwmutex
- mpool pool
- evict []*rwmutex
-
- count int32
- maxmu int32
- wake int32
-
- mapmu sync.Mutex
- state uint8
+ mapmu sync.Mutex
+ mumap map[string]*rwmutexish
+ mupool rwmutexPool
+ count uint32
}
-// NewMap returns a new MutexMap instance with provided max no. open mutexes.
-func NewMap(max, wake int32) MutexMap {
- // Determine wake mod.
- if wake < 1 {
- wake = defaultWake
+// checkInit ensures MutexMap is initialized (UNSAFE).
+func (mm *MutexMap) checkInit() {
+ if mm.mumap == nil {
+ mm.mumap = make(map[string]*rwmutexish)
}
+}
- // Determine max no. mutexes
- if max < 1 {
- procs := runtime.GOMAXPROCS(0)
- max = wake * int32(procs)
- }
+// Lock acquires a write lock on key in map, returning unlock function.
+func (mm *MutexMap) Lock(key string) func() {
+ return mm.lock(key, lockTypeWrite)
+}
- return MutexMap{
- queue: &sync.WaitGroup{},
- mumap: make(map[string]*rwmutex, max),
- maxmu: max,
- wake: wake,
- }
+// RLock acquires a read lock on key in map, returning runlock function.
+func (mm *MutexMap) RLock(key string) func() {
+ return mm.lock(key, lockTypeRead)
}
-// SET sets the MutexMap max open locks and wake modulus, returns current values.
-// For values less than zero defaults are set, and zero is non-op.
-func (mm *MutexMap) SET(max, wake int32) (int32, int32) {
+func (mm *MutexMap) lock(key string, lt uint8) func() {
+ // Perform first map lock
+ // and check initialization
+ // OUTSIDE the main loop.
mm.mapmu.Lock()
+ mm.checkInit()
- switch {
- // Set default wake
- case wake < 0:
- mm.wake = defaultWake
-
- // Set supplied wake
- case wake > 0:
- mm.wake = wake
- }
-
- switch {
- // Set default max
- case max < 0:
- procs := runtime.GOMAXPROCS(0)
- mm.maxmu = wake * int32(procs)
-
- // Set supplied max
- case max > 0:
- mm.maxmu = max
- }
-
- // Fetch values
- max = mm.maxmu
- wake = mm.wake
-
- mm.mapmu.Unlock()
- return max, wake
-}
-
-// spinLock will wait (using a mutex to sleep thread) until conditional returns true.
-func (mm *MutexMap) spinLock(cond func() bool) {
for {
- // Acquire map lock
- mm.mapmu.Lock()
+ // Check map for mu.
+ mu := mm.mumap[key]
- if cond() {
- return
+ if mu == nil {
+ // Allocate new mutex.
+ mu = mm.mupool.Acquire()
+ mm.mumap[key] = mu
}
- // Current queue ptr
- queue := mm.queue
-
- // Queue ourselves
- queue.Add(1)
- mm.qucnt++
+ if !mu.Lock(lt) {
+ // Wait on mutex unlock, after
+ // immediately relocking map mu.
+ mu.WaitRelock(&mm.mapmu)
+ continue
+ }
- // Unlock map
+ // Done with map.
mm.mapmu.Unlock()
- // Wait on notify
- mm.queue.Wait()
+ // Return mutex unlock function.
+ return func() { mm.unlock(key, mu) }
}
}
-// lock will acquire a lock of given type on the 'mutex' at key.
-func (mm *MutexMap) lock(key string, lt uint8) func() {
- var ok bool
- var mu *rwmutex
-
- // Spin lock until returns true
- mm.spinLock(func() bool {
- // Check not overloaded
- if !(mm.count < mm.maxmu) {
- return false
- }
-
- // Attempt to acquire usable map state
- state, ok := acquireState(mm.state, lt)
- if !ok {
- return false
- }
-
- // Update state
- mm.state = state
-
- // Ensure mutex at key
- // is in lockable state
- mu, ok = mm.mumap[key]
- return !ok || mu.CanLock(lt)
- })
-
- // Incr count
- mm.count++
-
- if !ok {
- // No mutex found for key
-
- // Alloc mu from pool
- mu = mm.mpool.Acquire()
- mm.mumap[key] = mu
+func (mm *MutexMap) unlock(key string, mu *rwmutexish) {
+ // Get map lock.
+ mm.mapmu.Lock()
- // Set our key
- mu.key = key
+ // Unlock mutex.
+ if mu.Unlock() {
- // Queue for eviction
- mm.evict = append(mm.evict, mu)
+ // Mutex fully unlocked
+ // with zero waiters. Self
+ // evict and release it.
+ delete(mm.mumap, key)
+ mm.mupool.Release(mu)
}
- // Lock mutex
- mu.Lock(lt)
-
- // Unlock map
- mm.mapmu.Unlock()
-
- return func() {
- mm.mapmu.Lock()
- mu.Unlock()
- mm.cleanup()
+ if mm.count++; mm.count%gcfreq == 0 {
+ // Every 'gcfreq' unlocks perform
+ // a garbage collection to keep
+ // us squeaky clean :]
+ mm.mupool.GC()
}
-}
-// lockMap will lock the whole map under given lock type.
-func (mm *MutexMap) lockMap(lt uint8) {
- // Spin lock until returns true
- mm.spinLock(func() bool {
- // Attempt to acquire usable map state
- state, ok := acquireState(mm.state, lt)
- if !ok {
- return false
- }
-
- // Update state
- mm.state = state
-
- return true
- })
-
- // Incr count
- mm.count++
-
- // State acquired, unlock
+ // Done with map.
mm.mapmu.Unlock()
}
-// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map.
-func (mm *MutexMap) cleanup() {
- // Decr count
- mm.count--
-
- // Calculate current wake modulus
- wakemod := mm.count % mm.wake
-
- if mm.count != 0 && wakemod != 0 {
- // Fast path => no cleanup.
- // Unlock, return early
- mm.mapmu.Unlock()
- return
- }
-
- go func() {
- if wakemod == 0 {
- // Release queued goroutines
- mm.queue.Add(-int(mm.qucnt))
-
- // Allocate new queue and reset
- mm.queue = &sync.WaitGroup{}
- mm.qucnt = 0
- }
-
- if mm.count == 0 {
- // Perform evictions
- for _, mu := range mm.evict {
- key := mu.key
- mu.key = ""
- delete(mm.mumap, key)
- mm.mpool.Release(mu)
- }
-
- // Reset map state
- mm.evict = mm.evict[:0]
- mm.state = stateUnlockd
- mm.mpool.GC()
- }
-
- // Unlock map
- mm.mapmu.Unlock()
- }()
+// rwmutexPool is a very simply memory rwmutexPool.
+type rwmutexPool struct {
+ current []*rwmutexish
+ victim []*rwmutexish
}
-// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.
-// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
-func (mm *MutexMap) RLockMap() *LockState {
- mm.lockMap(lockTypeRead | lockTypeMap)
- return &LockState{
- mmap: mm,
- ltyp: lockTypeRead,
+// Acquire will returns a rwmutexState from rwmutexPool (or alloc new).
+func (p *rwmutexPool) Acquire() *rwmutexish {
+ // First try the current queue
+ if l := len(p.current) - 1; l >= 0 {
+ mu := p.current[l]
+ p.current = p.current[:l]
+ return mu
}
-}
-// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.
-// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
-func (mm *MutexMap) LockMap() *LockState {
- mm.lockMap(lockTypeWrite | lockTypeMap)
- return &LockState{
- mmap: mm,
- ltyp: lockTypeWrite,
+ // Next try the victim queue.
+ if l := len(p.victim) - 1; l >= 0 {
+ mu := p.victim[l]
+ p.victim = p.victim[:l]
+ return mu
}
-}
-
-// RLock acquires a mutex read lock for supplied key, returning an RUnlock function.
-func (mm *MutexMap) RLock(key string) (runlock func()) {
- return mm.lock(key, lockTypeRead)
-}
-// Lock acquires a mutex write lock for supplied key, returning an Unlock function.
-func (mm *MutexMap) Lock(key string) (unlock func()) {
- return mm.lock(key, lockTypeWrite)
+ // Lastly, alloc new.
+ mu := new(rwmutexish)
+ return mu
}
-// LockState represents a window to a locked MutexMap.
-type LockState struct {
- wait sync.WaitGroup
- mmap *MutexMap
- done uint32
- ltyp uint8
+// Release places a sync.rwmutexState back in the rwmutexPool.
+func (p *rwmutexPool) Release(mu *rwmutexish) {
+ p.current = append(p.current, mu)
}
-// Lock: see MutexMap.Lock() definition. Will panic if map only read locked.
-func (st *LockState) Lock(key string) (unlock func()) {
- return st.lock(key, lockTypeWrite)
+// GC will clear out unused entries from the rwmutexPool.
+func (p *rwmutexPool) GC() {
+ current := p.current
+ p.current = nil
+ p.victim = current
}
-// RLock: see MutexMap.RLock() definition.
-func (st *LockState) RLock(key string) (runlock func()) {
- return st.lock(key, lockTypeRead)
+// rwmutexish is a RW mutex (ish), i.e. the representation
+// of one only to be accessed within
+type rwmutexish struct {
+ tr trigger
+ ln int32 // no. locks
+ wn int32 // no. waiters
+ lt uint8 // lock type
}
-// lock: see MutexMap.lock() definition.
-func (st *LockState) lock(key string, lt uint8) func() {
- st.wait.Add(1) // track lock
-
- if atomic.LoadUint32(&st.done) == 1 {
- panic("called (r)lock on unlocked state")
- } else if lt&lockTypeWrite != 0 &&
- st.ltyp&lockTypeWrite == 0 {
- panic("called lock on rlocked map")
- }
-
- var ok bool
- var mu *rwmutex
-
- // Spin lock until returns true
- st.mmap.spinLock(func() bool {
- // Check not overloaded
- if !(st.mmap.count < st.mmap.maxmu) {
+// Lock will lock the mutex for given lock type, in the
+// sense that it will update the internal state tracker
+// accordingly. Return value is true on successful lock.
+func (mu *rwmutexish) Lock(lt uint8) bool {
+ switch mu.lt {
+ case lockTypeRead:
+ // already read locked,
+ // only permit more reads.
+ if lt != lockTypeRead {
return false
}
- // Ensure mutex at key
- // is in lockable state
- mu, ok = st.mmap.mumap[key]
- return !ok || mu.CanLock(lt)
- })
+ case lockTypeWrite:
+ // already write locked,
+ // no other locks allowed.
+ return false
- // Incr count
- st.mmap.count++
+ default:
+ // Fully unlocked.
+ mu.lt = lt
+ }
- if !ok {
- // No mutex found for key
+ // Update
+ // count.
+ mu.ln++
- // Alloc mu from pool
- mu = st.mmap.mpool.Acquire()
- st.mmap.mumap[key] = mu
+ return true
+}
- // Set our key
- mu.key = key
+// Unlock will unlock the mutex, in the sense that
+// it will update the internal state tracker accordingly.
+// On any unlock it will awaken sleeping waiting threads.
+// Returned boolean is if unlocked=true AND waiters=0.
+func (mu *rwmutexish) Unlock() bool {
+ var ok bool
- // Queue for eviction
- st.mmap.evict = append(st.mmap.evict, mu)
+ switch mu.ln--; {
+ case mu.ln > 0 && mu.lt == lockTypeWrite:
+ panic("BUG: multiple writer locks")
+ case mu.ln < 0:
+ panic("BUG: negative lock count")
+ case mu.ln == 0:
+ // Fully unlocked.
+ mu.lt = 0
+
+ // Only return true
+ // with no waiters.
+ ok = (mu.wn == 0)
}
- // Lock mutex
- mu.Lock(lt)
-
- // Unlock map
- st.mmap.mapmu.Unlock()
-
- return func() {
- st.mmap.mapmu.Lock()
- mu.Unlock()
- st.mmap.cleanup()
- st.wait.Add(-1)
- }
+ // Awake all waiting
+ // goroutines for mu.
+ mu.tr.Trigger()
+ return ok
}
-// UnlockMap will close this state and release the currently locked map.
-func (st *LockState) UnlockMap() {
- if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
- panic("called unlockmap on expired state")
- }
- st.wait.Wait()
- st.mmap.mapmu.Lock()
- st.mmap.cleanup()
+// WaitRelock expects a mutex to be passed in already in
+// the lock state. It incr the rwmutexish waiter count before
+// unlocking the outer mutex and blocking on internal trigger.
+// On awake it will relock outer mutex and decr wait count.
+func (mu *rwmutexish) WaitRelock(outer *sync.Mutex) {
+ mu.wn++
+ outer.Unlock()
+ mu.tr.Wait()
+ outer.Lock()
+ mu.wn--
}
-// rwmutex is a very simple *representation* of a read-write
-// mutex, though not one in implementation. it works by
-// tracking the lock state for a given map key, which is
-// protected by the map's mutex.
-type rwmutex struct {
- rcnt int32 // read lock count
- lock uint8 // lock type
- key string // map key
-}
+// trigger uses the internals of sync.Cond to provide
+// a waitgroup type structure (including goroutine parks)
+// without such a heavy reliance on a delta value.
+type trigger struct{ notifyList }
-func (mu *rwmutex) CanLock(lt uint8) bool {
- return mu.lock == 0 ||
- (mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0)
+func (t *trigger) Trigger() {
+ runtime_notifyListNotifyAll(&t.notifyList)
}
-func (mu *rwmutex) Lock(lt uint8) {
- // Set lock type
- mu.lock = lt
+func (t *trigger) Wait() {
+ v := runtime_notifyListAdd(&t.notifyList)
+ runtime_notifyListWait(&t.notifyList, v)
+}
- if lt&lockTypeRead != 0 {
- // RLock, increment
- mu.rcnt++
- }
+// Approximation of notifyList in runtime/sema.go.
+type notifyList struct {
+ wait uint32
+ notify uint32
+ lock uintptr // key field of the mutex
+ head unsafe.Pointer
+ tail unsafe.Pointer
}
-func (mu *rwmutex) Unlock() {
- if mu.rcnt > 0 {
- // RUnlock
- mu.rcnt--
- }
+// See runtime/sema.go for documentation.
+//
+//go:linkname runtime_notifyListAdd sync.runtime_notifyListAdd
+func runtime_notifyListAdd(l *notifyList) uint32
- if mu.rcnt == 0 {
- // Total unlock
- mu.lock = 0
- }
-}
+// See runtime/sema.go for documentation.
+//
+//go:linkname runtime_notifyListWait sync.runtime_notifyListWait
+func runtime_notifyListWait(l *notifyList, t uint32)
+
+// See runtime/sema.go for documentation.
+//
+//go:linkname runtime_notifyListNotifyAll sync.runtime_notifyListNotifyAll
+func runtime_notifyListNotifyAll(l *notifyList)