diff options
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 57 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map_pool.go (renamed from vendor/codeberg.org/gruf/go-mutexes/pool.go) | 23 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/util/io.go | 14 |
3 files changed, 40 insertions, 54 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index d0387d3e7..a3c171c7a 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -89,8 +89,8 @@ func acquireState(state uint8, lt uint8) (uint8, bool) { // awaiting a permissible state (.e.g no key write locks allowed when the // map is read locked). type MutexMap struct { - qpool pool - queue []*sync.Mutex + queue *sync.WaitGroup + qucnt int32 mumap map[string]*rwmutex mpool pool @@ -118,17 +118,8 @@ func NewMap(max, wake int32) MutexMap { } return MutexMap{ - qpool: pool{ - alloc: func() interface{} { - return &sync.Mutex{} - }, - }, + queue: &sync.WaitGroup{}, mumap: make(map[string]*rwmutex, max), - mpool: pool{ - alloc: func() interface{} { - return &rwmutex{} - }, - }, maxmu: max, wake: wake, } @@ -170,36 +161,26 @@ func (mm *MutexMap) SET(max, wake int32) (int32, int32) { // spinLock will wait (using a mutex to sleep thread) until conditional returns true. func (mm *MutexMap) spinLock(cond func() bool) { - var mu *sync.Mutex - for { // Acquire map lock mm.mapmu.Lock() if cond() { - // Release mu if needed - if mu != nil { - mm.qpool.Release(mu) - } return } - // Alloc mu if needed - if mu == nil { - v := mm.qpool.Acquire() - mu = v.(*sync.Mutex) - } + // Current queue ptr + queue := mm.queue // Queue ourselves - mm.queue = append(mm.queue, mu) - mu.Lock() + queue.Add(1) + mm.qucnt++ // Unlock map mm.mapmu.Unlock() // Wait on notify - mu.Lock() - mu.Unlock() + mm.queue.Wait() } } @@ -236,9 +217,8 @@ func (mm *MutexMap) lock(key string, lt uint8) func() { if !ok { // No mutex found for key - // Alloc from pool - v := mm.mpool.Acquire() - mu = v.(*rwmutex) + // Alloc mu from pool + mu = mm.mpool.Acquire() mm.mumap[key] = mu // Set our key @@ -301,13 +281,12 @@ func (mm *MutexMap) cleanup() { go func() { if wakemod == 0 { - // Notify queued routines - for _, mu := range mm.queue { - mu.Unlock() - } + // Release queued goroutines + mm.queue.Add(-int(mm.qucnt)) - // Reset queue - mm.queue = mm.queue[:0] + // Allocate new queue and reset + mm.queue = &sync.WaitGroup{} + mm.qucnt = 0 } if mm.count == 0 { @@ -323,7 +302,6 @@ func (mm *MutexMap) cleanup() { mm.evict = mm.evict[:0] mm.state = stateUnlockd mm.mpool.GC() - mm.qpool.GC() } // Unlock map @@ -412,9 +390,8 @@ func (st *LockState) lock(key string, lt uint8) func() { if !ok { // No mutex found for key - // Alloc from pool - v := st.mmap.mpool.Acquire() - mu = v.(*rwmutex) + // Alloc mu from pool + mu = st.mmap.mpool.Acquire() st.mmap.mumap[key] = mu // Set our key diff --git a/vendor/codeberg.org/gruf/go-mutexes/pool.go b/vendor/codeberg.org/gruf/go-mutexes/map_pool.go index 135e2c117..450e0bc06 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/pool.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map_pool.go @@ -2,34 +2,33 @@ package mutexes // pool is a very simply memory pool. type pool struct { - current []interface{} - victim []interface{} - alloc func() interface{} + current []*rwmutex + victim []*rwmutex } -// Acquire will returns a sync.RWMutex from pool (or alloc new). -func (p *pool) Acquire() interface{} { +// Acquire will returns a rwmutex from pool (or alloc new). +func (p *pool) Acquire() *rwmutex { // First try the current queue if l := len(p.current) - 1; l >= 0 { - v := p.current[l] + mu := p.current[l] p.current = p.current[:l] - return v + return mu } // Next try the victim queue. if l := len(p.victim) - 1; l >= 0 { - v := p.victim[l] + mu := p.victim[l] p.victim = p.victim[:l] - return v + return mu } // Lastly, alloc new. - return p.alloc() + return &rwmutex{} } // Release places a sync.RWMutex back in the pool. -func (p *pool) Release(v interface{}) { - p.current = append(p.current, v) +func (p *pool) Release(mu *rwmutex) { + p.current = append(p.current, mu) } // GC will clear out unused entries from the pool. diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/io.go b/vendor/codeberg.org/gruf/go-store/v2/util/io.go index 334ae4dd0..3d62e8be6 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/util/io.go +++ b/vendor/codeberg.org/gruf/go-store/v2/util/io.go @@ -48,6 +48,7 @@ func NopWriteCloser(w io.Writer) io.WriteCloser { } // ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser. +// Note that the callback will never be called more than once, after execution this will remove the func reference. func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { return &callbackReadCloser{ ReadCloser: rc, @@ -56,6 +57,7 @@ func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { } // WriteCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.WriteCloser. +// Note that the callback will never be called more than once, after execution this will remove the func reference. func WriteCloserWithCallback(wc io.WriteCloser, cb func()) io.WriteCloser { return &callbackWriteCloser{ WriteCloser: wc, @@ -80,7 +82,11 @@ type callbackReadCloser struct { } func (c *callbackReadCloser) Close() error { - defer c.callback() + if c.callback != nil { + cb := c.callback + c.callback = nil + defer cb() + } return c.ReadCloser.Close() } @@ -91,6 +97,10 @@ type callbackWriteCloser struct { } func (c *callbackWriteCloser) Close() error { - defer c.callback() + if c.callback != nil { + cb := c.callback + c.callback = nil + defer cb() + } return c.WriteCloser.Close() } |