diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 335 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go | 67 | 
2 files changed, 328 insertions, 74 deletions
| diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index ea917ee5e..cb31a9543 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -1,105 +1,322 @@  package mutexes  import ( +	"runtime"  	"sync" +	"sync/atomic"  ) +// locktype defines maskable mutexmap lock types. +type locktype uint8 + +const ( +	// possible lock types. +	lockTypeRead  = locktype(1) << 0 +	lockTypeWrite = locktype(1) << 1 +	lockTypeMap   = locktype(1) << 2 + +	// possible mutexmap states. +	stateUnlockd = uint8(0) +	stateRLocked = uint8(1) +	stateLocked  = uint8(2) +	stateInUse   = uint8(3) +) + +// permitLockType returns if provided locktype is permitted to go ahead in current state. +func permitLockType(state uint8, lt locktype) bool { +	switch state { +	// Unlocked state +	// (all allowed) +	case stateUnlockd: +		return true + +	// Keys locked, no state lock. +	// (don't allow map locks) +	case stateInUse: +		return lt&lockTypeMap == 0 + +	// Read locked +	// (only allow read locks) +	case stateRLocked: +		return lt&lockTypeRead != 0 + +	// Write locked +	// (none allowed) +	case stateLocked: +		return false + +	// shouldn't reach here +	default: +		panic("unexpected state") +	} +} +  // MutexMap is a structure that allows having a map of self-evicting mutexes  // by key. You do not need to worry about managing the contents of the map,  // only requesting RLock/Lock for keys, and ensuring to call the returned  // unlock functions.  type MutexMap struct { -	// NOTE: -	// Individual keyed mutexes should ONLY ever -	// be locked within the protection of the outer -	// mapMu lock. If you lock these outside the -	// protection of this, there is a chance for -	// deadlocks -  	mus   map[string]RWMutex  	mapMu sync.Mutex  	pool  sync.Pool +	queue []func() +	evict []func() +	count int32 +	maxmu int32 +	state uint8  } -// NewMap returns a new MutexMap instance based on supplied -// RWMutex allocator function, nil implies use default -func NewMap(newFn func() RWMutex) MutexMap { -	if newFn == nil { -		newFn = NewRW +// NewMap returns a new MutexMap instance with provided max no. open mutexes. +func NewMap(max int32) MutexMap { +	if max < 1 { +		// Default = 128 * GOMAXPROCS +		procs := runtime.GOMAXPROCS(0) +		max = int32(procs * 128)  	}  	return MutexMap{ -		mus:   make(map[string]RWMutex), -		mapMu: sync.Mutex{}, +		mus: make(map[string]RWMutex),  		pool: sync.Pool{  			New: func() interface{} { -				return newFn() +				return NewRW()  			},  		}, +		maxmu: max, +	} +} + +// acquire will either acquire a mutex from pool or alloc. +func (mm *MutexMap) acquire() RWMutex { +	return mm.pool.Get().(RWMutex) +} + +// release will release provided mutex to pool. +func (mm *MutexMap) release(mu RWMutex) { +	mm.pool.Put(mu) +} + +// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true, +// returning with map lock. Note that 'cond' is performed within a map lock. +func (mm *MutexMap) spinLock(cond func() bool) { +	mu := mm.acquire() +	defer mm.release(mu) + +	for { +		// Get map lock +		mm.mapMu.Lock() + +		// Check if return +		if cond() { +			return +		} + +		// Queue ourselves +		unlock := mu.Lock() +		mm.queue = append(mm.queue, unlock) +		mm.mapMu.Unlock() + +		// Wait on notify +		mu.Lock()() +	} +} + +// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return. +func (mm *MutexMap) lockMutex(key string, lt locktype) func() { +	var unlock func() + +	// Incr counter +	mm.count++ + +	// Check for existing mutex at key +	mu, ok := mm.mus[key] +	if !ok { +		// Alloc from pool +		mu = mm.acquire() +		mm.mus[key] = mu + +		// Queue mutex for eviction +		mm.evict = append(mm.evict, func() { +			delete(mm.mus, key) +			mm.pool.Put(mu) +		}) +	} + +	// If no state, set in use. +	// State will already have been +	// set if this is from LockState{} +	if mm.state == stateUnlockd { +		mm.state = stateInUse +	} + +	switch { +	// Read lock +	case lt&lockTypeRead != 0: +		unlock = mu.RLock() + +	// Write lock +	case lt&lockTypeWrite != 0: +		unlock = mu.Lock() + +	// shouldn't reach here +	default: +		panic("unexpected lock type") +	} + +	// Unlock map + return +	mm.mapMu.Unlock() +	return func() { +		mm.mapMu.Lock() +		unlock() +		go mm.onUnlock()  	}  } -func (mm *MutexMap) evict(key string, mu RWMutex) { -	// Acquire map lock -	mm.mapMu.Lock() +// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex. +func (mm *MutexMap) onUnlock() { +	// Decr counter +	mm.count-- + +	if mm.count < 1 { +		// Perform all queued evictions +		for i := 0; i < len(mm.evict); i++ { +			mm.evict[i]() +		} -	// Toggle mutex lock to -	// ensure it is unused -	unlock := mu.Lock() -	unlock() +		// Notify all waiting goroutines +		for i := 0; i < len(mm.queue); i++ { +			mm.queue[i]() +		} -	// Delete mutex key -	delete(mm.mus, key) +		// Reset the map state +		mm.evict = nil +		mm.queue = nil +		mm.state = stateUnlockd +	} + +	// Finally, unlock  	mm.mapMu.Unlock() +} -	// Release to pool -	mm.pool.Put(mu) +// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks. +// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. +func (mm *MutexMap) RLockMap() *LockState { +	return mm.getMapLock(lockTypeRead)  } -// RLock acquires a mutex read lock for supplied key, returning an RUnlock function -func (mm *MutexMap) RLock(key string) func() { -	return mm.getLock(key, func(mu RWMutex) func() { -		return mu.RLock() +// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks. +// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked. +func (mm *MutexMap) LockMap() *LockState { +	return mm.getMapLock(lockTypeWrite) +} + +// RLock acquires a mutex read lock for supplied key, returning an RUnlock function. +func (mm *MutexMap) RLock(key string) (runlock func()) { +	return mm.getLock(key, lockTypeRead) +} + +// Lock acquires a mutex write lock for supplied key, returning an Unlock function. +func (mm *MutexMap) Lock(key string) (unlock func()) { +	return mm.getLock(key, lockTypeWrite) +} + +// getLock will fetch lock of provided type, for given key, returning unlock function. +func (mm *MutexMap) getLock(key string, lt locktype) func() { +	// Spin until achieve lock +	mm.spinLock(func() bool { +		return permitLockType(mm.state, lt) && +			mm.count < mm.maxmu // not overloaded  	}) + +	// Perform actual mutex lock +	return mm.lockMutex(key, lt)  } -// Lock acquires a mutex lock for supplied key, returning an Unlock function -func (mm *MutexMap) Lock(key string) func() { -	return mm.getLock(key, func(mu RWMutex) func() { -		return mu.Lock() +// getMapLock will acquire a map lock of provided type, returning a LockState session. +func (mm *MutexMap) getMapLock(lt locktype) *LockState { +	// Spin until achieve lock +	mm.spinLock(func() bool { +		return permitLockType(mm.state, lt|lockTypeMap) && +			mm.count < mm.maxmu // not overloaded  	}) + +	// Incr counter +	mm.count++ + +	switch { +	// Set read lock state +	case lt&lockTypeRead != 0: +		mm.state = stateRLocked + +	// Set write lock state +	case lt&lockTypeWrite != 0: +		mm.state = stateLocked + +	default: +		panic("unexpected lock type") +	} + +	// Unlock + return +	mm.mapMu.Unlock() +	return &LockState{ +		mmap: mm, +		ltyp: lt, +	}  } -func (mm *MutexMap) getLock(key string, doLock func(RWMutex) func()) func() { -	// Get map lock -	mm.mapMu.Lock() +// LockState represents a window to a locked MutexMap. +type LockState struct { +	wait sync.WaitGroup +	mmap *MutexMap +	done uint32 +	ltyp locktype +} -	// Look for mutex -	mu, ok := mm.mus[key] -	if ok { -		// Lock and return -		// its unlocker func -		unlock := doLock(mu) -		mm.mapMu.Unlock() -		return unlock +// Lock: see MutexMap.Lock() definition. Will panic if map only read locked. +func (st *LockState) Lock(key string) (unlock func()) { +	return st.getLock(key, lockTypeWrite) +} + +// RLock: see MutexMap.RLock() definition. +func (st *LockState) RLock(key string) (runlock func()) { +	return st.getLock(key, lockTypeRead) +} + +// UnlockMap will close this state and release the currently locked map. +func (st *LockState) UnlockMap() { +	// Set state to finished (or panic if already done) +	if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { +		panic("called UnlockMap() on expired state")  	} -	// Note: even though the mutex data structure is -	// small, benchmarking does actually show that pooled -	// alloc of mutexes here is faster +	// Wait until done +	st.wait.Wait() -	// Acquire mu + add -	mu = mm.pool.Get().(RWMutex) -	mm.mus[key] = mu +	// Async reset map +	st.mmap.mapMu.Lock() +	go st.mmap.onUnlock() +} -	// Lock mutex + unlock map -	unlockFn := doLock(mu) -	mm.mapMu.Unlock() +// getLock: see MutexMap.getLock() definition. +func (st *LockState) getLock(key string, lt locktype) func() { +	st.wait.Add(1) // track lock -	return func() { -		// Unlock mutex -		unlockFn() +	// Check if closed, or if write lock is allowed +	if atomic.LoadUint32(&st.done) == 1 { +		panic("map lock closed") +	} else if lt&lockTypeWrite != 0 && +		st.ltyp&lockTypeWrite == 0 { +		panic("called .Lock() on rlocked map") +	} + +	// Spin until achieve map lock +	st.mmap.spinLock(func() bool { +		return st.mmap.count < st.mmap.maxmu +	}) // i.e. not overloaded -		// Release function -		go mm.evict(key, mu) +	// Perform actual mutex lock +	unlock := st.mmap.lockMutex(key, lt) + +	return func() { +		unlock() +		st.wait.Done()  	}  } diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go index 5da69ef25..2e7b8f802 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go +++ b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go @@ -3,8 +3,6 @@ package mutexes  import (  	"sync"  	"time" - -	"codeberg.org/gruf/go-nowish"  )  // TimeoutMutex defines a Mutex with timeouts on locks @@ -73,14 +71,6 @@ func (mu *timeoutRWMutex) RLockFunc(fn func()) func() {  	return mutexTimeout(mu.rd, mu.mu.RLock(), fn)  } -// timeoutPool provides nowish.Timeout objects for timeout mutexes -var timeoutPool = sync.Pool{ -	New: func() interface{} { -		t := nowish.NewTimeout() -		return &t -	}, -} -  // mutexTimeout performs a timed unlock, calling supplied fn if timeout is reached  func mutexTimeout(d time.Duration, unlock func(), fn func()) func() {  	if d < 1 { @@ -88,18 +78,65 @@ func mutexTimeout(d time.Duration, unlock func(), fn func()) func() {  		return unlock  	} -	// Acquire timeout obj -	t := timeoutPool.Get().(*nowish.Timeout) +	// Acquire timer from pool +	t := timerPool.Get().(*timer) -	// Start the timeout with hook -	t.Start(d, fn) +	// Start the timer +	go t.Start(d, fn)  	// Return func cancelling timeout,  	// replacing Timeout in pool and  	// finally unlocking mutex  	return func() { +		defer timerPool.Put(t)  		t.Cancel() -		timeoutPool.Put(t)  		unlock()  	}  } + +// timerPool is the global &timer{} pool. +var timerPool = sync.Pool{ +	New: func() interface{} { +		return newtimer() +	}, +} + +// timer represents a reusable cancellable timer. +type timer struct { +	t *time.Timer +	c chan struct{} +} + +// newtimer returns a new timer instance. +func newtimer() *timer { +	t := time.NewTimer(time.Minute) +	t.Stop() +	return &timer{t: t, c: make(chan struct{})} +} + +// Start will start the timer with duration 'd', performing 'fn' on timeout. +func (t *timer) Start(d time.Duration, fn func()) { +	t.t.Reset(d) +	select { +	// Timed out +	case <-t.t.C: +		fn() + +	// Cancelled +	case <-t.c: +	} +} + +// Cancel will attempt to cancel the running timer. +func (t *timer) Cancel() { +	select { +	// cancel successful +	case t.c <- struct{}{}: +		if !t.t.Stop() { +			<-t.t.C +		} // stop timer + +	// already stopped +	default: +	} +} | 
