summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r--vendor/codeberg.org/gruf/go-mutexes/map.go57
-rw-r--r--vendor/codeberg.org/gruf/go-mutexes/map_pool.go (renamed from vendor/codeberg.org/gruf/go-mutexes/pool.go)23
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/util/io.go14
3 files changed, 40 insertions, 54 deletions
diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go
index d0387d3e7..a3c171c7a 100644
--- a/vendor/codeberg.org/gruf/go-mutexes/map.go
+++ b/vendor/codeberg.org/gruf/go-mutexes/map.go
@@ -89,8 +89,8 @@ func acquireState(state uint8, lt uint8) (uint8, bool) {
// awaiting a permissible state (.e.g no key write locks allowed when the
// map is read locked).
type MutexMap struct {
- qpool pool
- queue []*sync.Mutex
+ queue *sync.WaitGroup
+ qucnt int32
mumap map[string]*rwmutex
mpool pool
@@ -118,17 +118,8 @@ func NewMap(max, wake int32) MutexMap {
}
return MutexMap{
- qpool: pool{
- alloc: func() interface{} {
- return &sync.Mutex{}
- },
- },
+ queue: &sync.WaitGroup{},
mumap: make(map[string]*rwmutex, max),
- mpool: pool{
- alloc: func() interface{} {
- return &rwmutex{}
- },
- },
maxmu: max,
wake: wake,
}
@@ -170,36 +161,26 @@ func (mm *MutexMap) SET(max, wake int32) (int32, int32) {
// spinLock will wait (using a mutex to sleep thread) until conditional returns true.
func (mm *MutexMap) spinLock(cond func() bool) {
- var mu *sync.Mutex
-
for {
// Acquire map lock
mm.mapmu.Lock()
if cond() {
- // Release mu if needed
- if mu != nil {
- mm.qpool.Release(mu)
- }
return
}
- // Alloc mu if needed
- if mu == nil {
- v := mm.qpool.Acquire()
- mu = v.(*sync.Mutex)
- }
+ // Current queue ptr
+ queue := mm.queue
// Queue ourselves
- mm.queue = append(mm.queue, mu)
- mu.Lock()
+ queue.Add(1)
+ mm.qucnt++
// Unlock map
mm.mapmu.Unlock()
// Wait on notify
- mu.Lock()
- mu.Unlock()
+ mm.queue.Wait()
}
}
@@ -236,9 +217,8 @@ func (mm *MutexMap) lock(key string, lt uint8) func() {
if !ok {
// No mutex found for key
- // Alloc from pool
- v := mm.mpool.Acquire()
- mu = v.(*rwmutex)
+ // Alloc mu from pool
+ mu = mm.mpool.Acquire()
mm.mumap[key] = mu
// Set our key
@@ -301,13 +281,12 @@ func (mm *MutexMap) cleanup() {
go func() {
if wakemod == 0 {
- // Notify queued routines
- for _, mu := range mm.queue {
- mu.Unlock()
- }
+ // Release queued goroutines
+ mm.queue.Add(-int(mm.qucnt))
- // Reset queue
- mm.queue = mm.queue[:0]
+ // Allocate new queue and reset
+ mm.queue = &sync.WaitGroup{}
+ mm.qucnt = 0
}
if mm.count == 0 {
@@ -323,7 +302,6 @@ func (mm *MutexMap) cleanup() {
mm.evict = mm.evict[:0]
mm.state = stateUnlockd
mm.mpool.GC()
- mm.qpool.GC()
}
// Unlock map
@@ -412,9 +390,8 @@ func (st *LockState) lock(key string, lt uint8) func() {
if !ok {
// No mutex found for key
- // Alloc from pool
- v := st.mmap.mpool.Acquire()
- mu = v.(*rwmutex)
+ // Alloc mu from pool
+ mu = st.mmap.mpool.Acquire()
st.mmap.mumap[key] = mu
// Set our key
diff --git a/vendor/codeberg.org/gruf/go-mutexes/pool.go b/vendor/codeberg.org/gruf/go-mutexes/map_pool.go
index 135e2c117..450e0bc06 100644
--- a/vendor/codeberg.org/gruf/go-mutexes/pool.go
+++ b/vendor/codeberg.org/gruf/go-mutexes/map_pool.go
@@ -2,34 +2,33 @@ package mutexes
// pool is a very simply memory pool.
type pool struct {
- current []interface{}
- victim []interface{}
- alloc func() interface{}
+ current []*rwmutex
+ victim []*rwmutex
}
-// Acquire will returns a sync.RWMutex from pool (or alloc new).
-func (p *pool) Acquire() interface{} {
+// Acquire will returns a rwmutex from pool (or alloc new).
+func (p *pool) Acquire() *rwmutex {
// First try the current queue
if l := len(p.current) - 1; l >= 0 {
- v := p.current[l]
+ mu := p.current[l]
p.current = p.current[:l]
- return v
+ return mu
}
// Next try the victim queue.
if l := len(p.victim) - 1; l >= 0 {
- v := p.victim[l]
+ mu := p.victim[l]
p.victim = p.victim[:l]
- return v
+ return mu
}
// Lastly, alloc new.
- return p.alloc()
+ return &rwmutex{}
}
// Release places a sync.RWMutex back in the pool.
-func (p *pool) Release(v interface{}) {
- p.current = append(p.current, v)
+func (p *pool) Release(mu *rwmutex) {
+ p.current = append(p.current, mu)
}
// GC will clear out unused entries from the pool.
diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/io.go b/vendor/codeberg.org/gruf/go-store/v2/util/io.go
index 334ae4dd0..3d62e8be6 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/util/io.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/util/io.go
@@ -48,6 +48,7 @@ func NopWriteCloser(w io.Writer) io.WriteCloser {
}
// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser.
+// Note that the callback will never be called more than once, after execution this will remove the func reference.
func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
return &callbackReadCloser{
ReadCloser: rc,
@@ -56,6 +57,7 @@ func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
}
// WriteCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.WriteCloser.
+// Note that the callback will never be called more than once, after execution this will remove the func reference.
func WriteCloserWithCallback(wc io.WriteCloser, cb func()) io.WriteCloser {
return &callbackWriteCloser{
WriteCloser: wc,
@@ -80,7 +82,11 @@ type callbackReadCloser struct {
}
func (c *callbackReadCloser) Close() error {
- defer c.callback()
+ if c.callback != nil {
+ cb := c.callback
+ c.callback = nil
+ defer cb()
+ }
return c.ReadCloser.Close()
}
@@ -91,6 +97,10 @@ type callbackWriteCloser struct {
}
func (c *callbackWriteCloser) Close() error {
- defer c.callback()
+ if c.callback != nil {
+ cb := c.callback
+ c.callback = nil
+ defer cb()
+ }
return c.WriteCloser.Close()
}