diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes')
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 112 |
1 files changed, 53 insertions, 59 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index e61ef3537..0da1fc3cc 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -3,17 +3,16 @@ package mutexes import ( "sync" "sync/atomic" + "unsafe" + + "codeberg.org/gruf/go-mempool" + "github.com/dolthub/swiss" ) const ( // possible lock types. lockTypeRead = uint8(1) << 0 lockTypeWrite = uint8(1) << 1 - - // frequency of GC cycles - // per no. unlocks. i.e. - // every 'gcfreq' unlocks. - gcfreq = 1024 ) // MutexMap is a structure that allows read / write locking @@ -28,15 +27,15 @@ const ( // like structures for sleeping / awaking awaiting goroutines. type MutexMap struct { mapmu sync.Mutex - mumap map[string]*rwmutex - mupool rwmutexPool - count uint32 + mumap *swiss.Map[string, *rwmutex] + mupool mempool.UnsafePool } // checkInit ensures MutexMap is initialized (UNSAFE). func (mm *MutexMap) checkInit() { if mm.mumap == nil { - mm.mumap = make(map[string]*rwmutex) + mm.mumap = swiss.NewMap[string, *rwmutex](0) + mm.mupool.DirtyFactor = 256 } } @@ -58,13 +57,13 @@ func (mm *MutexMap) lock(key string, lt uint8) func() { mm.checkInit() for { - // Check map for mu. - mu := mm.mumap[key] + // Check map for mutex. + mu, _ := mm.mumap.Get(key) if mu == nil { - // Allocate new mutex. - mu = mm.mupool.Acquire() - mm.mumap[key] = mu + // Allocate mutex. + mu = mm.acquire() + mm.mumap.Put(key, mu) } if !mu.Lock(lt) { @@ -87,63 +86,58 @@ func (mm *MutexMap) unlock(key string, mu *rwmutex) { mm.mapmu.Lock() // Unlock mutex. - if mu.Unlock() { + if !mu.Unlock() { - // Mutex fully unlocked - // with zero waiters. Self - // evict and release it. - delete(mm.mumap, key) - mm.mupool.Release(mu) + // Fast path. Mutex still + // used so no map change. + mm.mapmu.Unlock() + return } - if mm.count++; mm.count%gcfreq == 0 { - // Every 'gcfreq' unlocks perform - // a garbage collection to keep - // us squeaky clean :] - mm.mupool.GC() + // Mutex fully unlocked + // with zero waiters. Self + // evict and release it. + mm.mumap.Delete(key) + mm.release(mu) + + // Maximum load factor before + // 'swiss' allocates new hmap: + // maxLoad = 7 / 8 + // + // So we apply the inverse/2, once + // $maxLoad/2 % of hmap is empty we + // compact the map to drop buckets. + len := mm.mumap.Count() + cap := mm.mumap.Capacity() + if cap-len > (cap*7)/(8*2) { + + // Create a new map only as big as required. + mumap := swiss.NewMap[string, *rwmutex](uint32(len)) + mm.mumap.Iter(func(k string, v *rwmutex) (stop bool) { + mumap.Put(k, v) + return false + }) + + // Set new map. + mm.mumap = mumap } // Done with map. mm.mapmu.Unlock() } -// rwmutexPool is a very simply memory rwmutexPool. -type rwmutexPool struct { - current []*rwmutex - victim []*rwmutex -} - -// Acquire will returns a rwmutexState from rwmutexPool (or alloc new). -func (p *rwmutexPool) Acquire() *rwmutex { - // First try the current queue - if l := len(p.current) - 1; l >= 0 { - mu := p.current[l] - p.current = p.current[:l] - return mu - } - - // Next try the victim queue. - if l := len(p.victim) - 1; l >= 0 { - mu := p.victim[l] - p.victim = p.victim[:l] - return mu +// acquire will acquire mutex from memory pool, or alloc new. +func (mm *MutexMap) acquire() *rwmutex { + if ptr := mm.mupool.Get(); ptr != nil { + return (*rwmutex)(ptr) } - - // Lastly, alloc new. - mu := new(rwmutex) - return mu -} - -// Release places a sync.rwmutexState back in the rwmutexPool. -func (p *rwmutexPool) Release(mu *rwmutex) { - p.current = append(p.current, mu) + return new(rwmutex) } -// GC will clear out unused entries from the rwmutexPool. -func (p *rwmutexPool) GC() { - current := p.current - p.current = nil - p.victim = current +// release will release given mutex to memory pool. +func (mm *MutexMap) release(mu *rwmutex) { + ptr := unsafe.Pointer(mu) + mm.mupool.Put(ptr) } // rwmutex represents a RW mutex when used correctly within |