summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-mutexes/map.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-mutexes/map.go')
-rw-r--r--vendor/codeberg.org/gruf/go-mutexes/map.go112
1 files changed, 53 insertions, 59 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go
index e61ef3537..0da1fc3cc 100644
--- a/vendor/codeberg.org/gruf/go-mutexes/map.go
+++ b/vendor/codeberg.org/gruf/go-mutexes/map.go
@@ -3,17 +3,16 @@ package mutexes
import (
"sync"
"sync/atomic"
+ "unsafe"
+
+ "codeberg.org/gruf/go-mempool"
+ "github.com/dolthub/swiss"
)
const (
// possible lock types.
lockTypeRead = uint8(1) << 0
lockTypeWrite = uint8(1) << 1
-
- // frequency of GC cycles
- // per no. unlocks. i.e.
- // every 'gcfreq' unlocks.
- gcfreq = 1024
)
// MutexMap is a structure that allows read / write locking
@@ -28,15 +27,15 @@ const (
// like structures for sleeping / awaking awaiting goroutines.
type MutexMap struct {
mapmu sync.Mutex
- mumap map[string]*rwmutex
- mupool rwmutexPool
- count uint32
+ mumap *swiss.Map[string, *rwmutex]
+ mupool mempool.UnsafePool
}
// checkInit ensures MutexMap is initialized (UNSAFE).
func (mm *MutexMap) checkInit() {
if mm.mumap == nil {
- mm.mumap = make(map[string]*rwmutex)
+ mm.mumap = swiss.NewMap[string, *rwmutex](0)
+ mm.mupool.DirtyFactor = 256
}
}
@@ -58,13 +57,13 @@ func (mm *MutexMap) lock(key string, lt uint8) func() {
mm.checkInit()
for {
- // Check map for mu.
- mu := mm.mumap[key]
+ // Check map for mutex.
+ mu, _ := mm.mumap.Get(key)
if mu == nil {
- // Allocate new mutex.
- mu = mm.mupool.Acquire()
- mm.mumap[key] = mu
+ // Allocate mutex.
+ mu = mm.acquire()
+ mm.mumap.Put(key, mu)
}
if !mu.Lock(lt) {
@@ -87,63 +86,58 @@ func (mm *MutexMap) unlock(key string, mu *rwmutex) {
mm.mapmu.Lock()
// Unlock mutex.
- if mu.Unlock() {
+ if !mu.Unlock() {
- // Mutex fully unlocked
- // with zero waiters. Self
- // evict and release it.
- delete(mm.mumap, key)
- mm.mupool.Release(mu)
+ // Fast path. Mutex still
+ // used so no map change.
+ mm.mapmu.Unlock()
+ return
}
- if mm.count++; mm.count%gcfreq == 0 {
- // Every 'gcfreq' unlocks perform
- // a garbage collection to keep
- // us squeaky clean :]
- mm.mupool.GC()
+ // Mutex fully unlocked
+ // with zero waiters. Self
+ // evict and release it.
+ mm.mumap.Delete(key)
+ mm.release(mu)
+
+ // Maximum load factor before
+ // 'swiss' allocates new hmap:
+ // maxLoad = 7 / 8
+ //
+ // So we apply the inverse/2, once
+ // $maxLoad/2 % of hmap is empty we
+ // compact the map to drop buckets.
+ len := mm.mumap.Count()
+ cap := mm.mumap.Capacity()
+ if cap-len > (cap*7)/(8*2) {
+
+ // Create a new map only as big as required.
+ mumap := swiss.NewMap[string, *rwmutex](uint32(len))
+ mm.mumap.Iter(func(k string, v *rwmutex) (stop bool) {
+ mumap.Put(k, v)
+ return false
+ })
+
+ // Set new map.
+ mm.mumap = mumap
}
// Done with map.
mm.mapmu.Unlock()
}
-// rwmutexPool is a very simply memory rwmutexPool.
-type rwmutexPool struct {
- current []*rwmutex
- victim []*rwmutex
-}
-
-// Acquire will returns a rwmutexState from rwmutexPool (or alloc new).
-func (p *rwmutexPool) Acquire() *rwmutex {
- // First try the current queue
- if l := len(p.current) - 1; l >= 0 {
- mu := p.current[l]
- p.current = p.current[:l]
- return mu
- }
-
- // Next try the victim queue.
- if l := len(p.victim) - 1; l >= 0 {
- mu := p.victim[l]
- p.victim = p.victim[:l]
- return mu
+// acquire will acquire mutex from memory pool, or alloc new.
+func (mm *MutexMap) acquire() *rwmutex {
+ if ptr := mm.mupool.Get(); ptr != nil {
+ return (*rwmutex)(ptr)
}
-
- // Lastly, alloc new.
- mu := new(rwmutex)
- return mu
-}
-
-// Release places a sync.rwmutexState back in the rwmutexPool.
-func (p *rwmutexPool) Release(mu *rwmutex) {
- p.current = append(p.current, mu)
+ return new(rwmutex)
}
-// GC will clear out unused entries from the rwmutexPool.
-func (p *rwmutexPool) GC() {
- current := p.current
- p.current = nil
- p.victim = current
+// release will release given mutex to memory pool.
+func (mm *MutexMap) release(mu *rwmutex) {
+ ptr := unsafe.Pointer(mu)
+ mm.mupool.Put(ptr)
}
// rwmutex represents a RW mutex when used correctly within