diff options
| author | 2022-03-08 11:56:53 +0000 | |
|---|---|---|
| committer | 2022-03-08 12:56:53 +0100 | |
| commit | b8879ac68a30e8bccd1c96cc4630da791d8996c4 (patch) | |
| tree | 77adeeaf2456610b771d9df8dc38207014215aea /vendor/codeberg.org/gruf | |
| parent | [performance] Database optimizations (#419) (diff) | |
| download | gotosocial-b8879ac68a30e8bccd1c96cc4630da791d8996c4.tar.xz | |
[dependencies] update go-store, go-mutexes (#422)
* update go-store, go-mutexes
Signed-off-by: kim <grufwub@gmail.com>
* update vendored code
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org/gruf')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-fastcopy/README.md | 3 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-fastcopy/copy.go | 134 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/debug.go | 39 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 496 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/mutex.go | 16 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go | 25 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go | 11 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/pool.go | 40 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/store.go | 2 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/disk.go | 31 | 
10 files changed, 581 insertions, 216 deletions
diff --git a/vendor/codeberg.org/gruf/go-fastcopy/README.md b/vendor/codeberg.org/gruf/go-fastcopy/README.md new file mode 100644 index 000000000..0c1ff68f7 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-fastcopy/README.md @@ -0,0 +1,3 @@ +# go-fastcopy + +An `io.Copy()` implementation that uses a memory pool for the copy buffer.
\ No newline at end of file diff --git a/vendor/codeberg.org/gruf/go-fastcopy/copy.go b/vendor/codeberg.org/gruf/go-fastcopy/copy.go new file mode 100644 index 000000000..4716b140f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-fastcopy/copy.go @@ -0,0 +1,134 @@ +package fastcopy + +import ( +	"io" +	"sync" +	_ "unsafe" // link to io.errInvalidWrite. +) + +var ( +	// global pool instance. +	pool = CopyPool{size: 4096} + +	//go:linkname errInvalidWrite io.errInvalidWrite +	errInvalidWrite error +) + +// CopyPool provides a memory pool of byte +// buffers for io copies from readers to writers. +type CopyPool struct { +	size int +	pool sync.Pool +} + +// See CopyPool.Buffer(). +func Buffer(sz int) int { +	return pool.Buffer(sz) +} + +// See CopyPool.CopyN(). +func CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) { +	return pool.CopyN(dst, src, n) +} + +// See CopyPool.Copy(). +func Copy(dst io.Writer, src io.Reader) (int64, error) { +	return pool.Copy(dst, src) +} + +// Buffer sets the pool buffer size to allocate. Returns current size. +// Note this is NOT atomically safe, please call BEFORE other calls to CopyPool. +func (cp *CopyPool) Buffer(sz int) int { +	if sz > 0 { +		// update size +		cp.size = sz +	} else if cp.size < 1 { +		// default size +		return 4096 +	} +	return cp.size +} + +// CopyN performs the same logic as io.CopyN(), with the difference +// being that the byte buffer is acquired from a memory pool. +func (cp *CopyPool) CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) { +	written, err := cp.Copy(dst, io.LimitReader(src, n)) +	if written == n { +		return n, nil +	} +	if written < n && err == nil { +		// src stopped early; must have been EOF. +		err = io.EOF +	} +	return written, err +} + +// Copy performs the same logic as io.Copy(), with the difference +// being that the byte buffer is acquired from a memory pool. +func (cp *CopyPool) Copy(dst io.Writer, src io.Reader) (int64, error) { +	// Prefer using io.WriterTo to do the copy (avoids alloc + copy) +	if wt, ok := src.(io.WriterTo); ok { +		return wt.WriteTo(dst) +	} + +	// Prefer using io.ReaderFrom to do the copy. +	if rt, ok := dst.(io.ReaderFrom); ok { +		return rt.ReadFrom(src) +	} + +	var buf []byte + +	if b, ok := cp.pool.Get().([]byte); ok { +		// Acquired buf from pool +		buf = b +	} else { +		// Allocate new buffer of size +		buf = make([]byte, cp.Buffer(0)) +	} + +	// Defer release to pool +	defer cp.pool.Put(buf) + +	var n int64 +	for { +		// Perform next read into buf +		nr, err := src.Read(buf) +		if nr > 0 { +			// We error check AFTER checking +			// no. read bytes so incomplete +			// read still gets written up to nr. + +			// Perform next write from buf +			nw, ew := dst.Write(buf[0:nr]) + +			// Check for valid write +			if nw < 0 || nr < nw { +				if ew == nil { +					ew = errInvalidWrite +				} +				return n, ew +			} + +			// Incr total count +			n += int64(nw) + +			// Check write error +			if ew != nil { +				return n, ew +			} + +			// Check unequal read/writes +			if nr != nw { +				return n, io.ErrShortWrite +			} +		} + +		// Return on err +		if err != nil { +			if err == io.EOF { +				err = nil // expected +			} +			return n, err +		} +	} +} diff --git a/vendor/codeberg.org/gruf/go-mutexes/debug.go b/vendor/codeberg.org/gruf/go-mutexes/debug.go new file mode 100644 index 000000000..1b7be60c7 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-mutexes/debug.go @@ -0,0 +1,39 @@ +package mutexes + +// func init() { +// 	log.SetFlags(log.Flags() | log.Lshortfile) +// } + +// type debugMutex sync.Mutex + +// func (mu *debugMutex) Lock() { +// 	log.Output(2, "Lock()") +// 	(*sync.Mutex)(mu).Lock() +// } + +// func (mu *debugMutex) Unlock() { +// 	log.Output(2, "Unlock()") +// 	(*sync.Mutex)(mu).Unlock() +// } + +// type debugRWMutex sync.RWMutex + +// func (mu *debugRWMutex) Lock() { +// 	log.Output(2, "Lock()") +// 	(*sync.RWMutex)(mu).Lock() +// } + +// func (mu *debugRWMutex) Unlock() { +// 	log.Output(2, "Unlock()") +// 	(*sync.RWMutex)(mu).Unlock() +// } + +// func (mu *debugRWMutex) RLock() { +// 	log.Output(2, "RLock()") +// 	(*sync.RWMutex)(mu).RLock() +// } + +// func (mu *debugRWMutex) RUnlock() { +// 	log.Output(2, "RUnlock()") +// 	(*sync.RWMutex)(mu).RUnlock() +// } diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index cb31a9543..c0f740eec 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -6,260 +6,347 @@ import (  	"sync/atomic"  ) -// locktype defines maskable mutexmap lock types. -type locktype uint8 -  const (  	// possible lock types. -	lockTypeRead  = locktype(1) << 0 -	lockTypeWrite = locktype(1) << 1 -	lockTypeMap   = locktype(1) << 2 +	lockTypeRead  = uint8(1) << 0 +	lockTypeWrite = uint8(1) << 1 +	lockTypeMap   = uint8(1) << 2  	// possible mutexmap states.  	stateUnlockd = uint8(0)  	stateRLocked = uint8(1)  	stateLocked  = uint8(2)  	stateInUse   = uint8(3) + +	// default values. +	defaultWake = 1024  ) -// permitLockType returns if provided locktype is permitted to go ahead in current state. -func permitLockType(state uint8, lt locktype) bool { +// acquireState attempts to acquire required map state for lockType. +func acquireState(state uint8, lt uint8) (uint8, bool) {  	switch state {  	// Unlocked state  	// (all allowed)  	case stateUnlockd: -		return true  	// Keys locked, no state lock.  	// (don't allow map locks)  	case stateInUse: -		return lt&lockTypeMap == 0 +		if lt&lockTypeMap != 0 { +			return 0, false +		}  	// Read locked  	// (only allow read locks)  	case stateRLocked: -		return lt&lockTypeRead != 0 +		if lt&lockTypeRead == 0 { +			return 0, false +		}  	// Write locked  	// (none allowed)  	case stateLocked: -		return false +		return 0, false  	// shouldn't reach here  	default:  		panic("unexpected state")  	} + +	switch { +	// If unlocked and not a map +	// lock request, set in use +	case lt&lockTypeMap == 0: +		if state == stateUnlockd { +			state = stateInUse +		} + +	// Set read lock state +	case lt&lockTypeRead != 0: +		state = stateRLocked + +	// Set write lock state +	case lt&lockTypeWrite != 0: +		state = stateLocked + +	default: +		panic("unexpected lock type") +	} + +	return state, true  } -// MutexMap is a structure that allows having a map of self-evicting mutexes -// by key. You do not need to worry about managing the contents of the map, -// only requesting RLock/Lock for keys, and ensuring to call the returned -// unlock functions. +// MutexMap is a structure that allows read / write locking key, performing +// as you'd expect a map[string]*sync.RWMutex to perform. The differences +// being that the entire map can itself be read / write locked, it uses memory +// pooling for the mutex (not quite) structures, and it is self-evicting. The +// core configurations of maximum no. open locks and wake modulus* are user +// definable. +// +// * The wake modulus is the number that the current number of open locks is +// modulused against to determine how often to notify sleeping goroutines. +// These are goroutines that are attempting to lock a key / whole map and are +// awaiting a permissible state (.e.g no key write locks allowed when the +// map is read locked).  type MutexMap struct { -	mus   map[string]RWMutex -	mapMu sync.Mutex -	pool  sync.Pool -	queue []func() -	evict []func() +	qpool pool +	queue []*sync.Mutex + +	mumap map[string]*rwmutex +	mpool pool +	evict []*rwmutex +  	count int32  	maxmu int32 +	wake  int32 + +	mapmu sync.Mutex  	state uint8  }  // NewMap returns a new MutexMap instance with provided max no. open mutexes. -func NewMap(max int32) MutexMap { +func NewMap(max, wake int32) MutexMap { +	// Determine wake mod. +	if wake < 1 { +		wake = defaultWake +	} + +	// Determine max no. mutexes  	if max < 1 { -		// Default = 128 * GOMAXPROCS  		procs := runtime.GOMAXPROCS(0) -		max = int32(procs * 128) +		max = wake * int32(procs)  	} +  	return MutexMap{ -		mus: make(map[string]RWMutex), -		pool: sync.Pool{ -			New: func() interface{} { -				return NewRW() +		qpool: pool{ +			alloc: func() interface{} { +				return &sync.Mutex{} +			}, +		}, +		mumap: make(map[string]*rwmutex, max), +		mpool: pool{ +			alloc: func() interface{} { +				return &rwmutex{}  			},  		},  		maxmu: max, +		wake:  wake,  	}  } -// acquire will either acquire a mutex from pool or alloc. -func (mm *MutexMap) acquire() RWMutex { -	return mm.pool.Get().(RWMutex) -} +// MAX sets the MutexMap max open locks and wake modulus, returns current values. +// For values less than zero defaults are set, and zero is non-op. +func (mm *MutexMap) SET(max, wake int32) (int32, int32) { +	mm.mapmu.Lock() + +	switch { +	// Set default wake +	case wake < 0: +		mm.wake = defaultWake + +	// Set supplied wake +	case wake > 0: +		mm.wake = wake +	} + +	switch { +	// Set default max +	case max < 0: +		procs := runtime.GOMAXPROCS(0) +		mm.maxmu = wake * int32(procs) + +	// Set supplied max +	case max > 0: +		mm.maxmu = max +	} -// release will release provided mutex to pool. -func (mm *MutexMap) release(mu RWMutex) { -	mm.pool.Put(mu) +	// Fetch values +	max = mm.maxmu +	wake = mm.wake + +	mm.mapmu.Unlock() +	return max, wake  } -// spinLock will wait (using a mutex to sleep thread) until 'cond()' returns true, -// returning with map lock. Note that 'cond' is performed within a map lock. +// spinLock will wait (using a mutex to sleep thread) until conditional returns true.  func (mm *MutexMap) spinLock(cond func() bool) { -	mu := mm.acquire() -	defer mm.release(mu) +	var mu *sync.Mutex  	for { -		// Get map lock -		mm.mapMu.Lock() +		// Acquire map lock +		mm.mapmu.Lock() -		// Check if return  		if cond() { +			// Release mu if needed +			if mu != nil { +				mm.qpool.Release(mu) +			}  			return  		} +		// Alloc mu if needed +		if mu == nil { +			v := mm.qpool.Acquire() +			mu = v.(*sync.Mutex) +		} +  		// Queue ourselves -		unlock := mu.Lock() -		mm.queue = append(mm.queue, unlock) -		mm.mapMu.Unlock() +		mm.queue = append(mm.queue, mu) +		mu.Lock() + +		// Unlock map +		mm.mapmu.Unlock()  		// Wait on notify -		mu.Lock()() +		mu.Lock() +		mu.Unlock()  	}  } -// lockMutex will acquire a lock on the mutex at provided key, handling earlier allocated mutex if provided. Unlocks map on return. -func (mm *MutexMap) lockMutex(key string, lt locktype) func() { -	var unlock func() +// lock will acquire a lock of given type on the 'mutex' at key. +func (mm *MutexMap) lock(key string, lt uint8) func() { +	var ok bool +	var mu *rwmutex + +	// Spin lock until returns true +	mm.spinLock(func() bool { +		// Check not overloaded +		if !(mm.count < mm.maxmu) { +			return false +		} + +		// Attempt to acquire usable map state +		state, ok := acquireState(mm.state, lt) +		if !ok { +			return false +		} + +		// Update state +		mm.state = state + +		// Ensure mutex at key +		// is in lockable state +		mu, ok = mm.mumap[key] +		return !ok || mu.CanLock(lt) +	}) -	// Incr counter +	// Incr count  	mm.count++ -	// Check for existing mutex at key -	mu, ok := mm.mus[key]  	if !ok { +		// No mutex found for key +  		// Alloc from pool -		mu = mm.acquire() -		mm.mus[key] = mu - -		// Queue mutex for eviction -		mm.evict = append(mm.evict, func() { -			delete(mm.mus, key) -			mm.pool.Put(mu) -		}) -	} +		v := mm.mpool.Acquire() +		mu = v.(*rwmutex) +		mm.mumap[key] = mu -	// If no state, set in use. -	// State will already have been -	// set if this is from LockState{} -	if mm.state == stateUnlockd { -		mm.state = stateInUse -	} +		// Set our key +		mu.key = key -	switch { -	// Read lock -	case lt&lockTypeRead != 0: -		unlock = mu.RLock() +		// Queue for eviction +		mm.evict = append(mm.evict, mu) +	} -	// Write lock -	case lt&lockTypeWrite != 0: -		unlock = mu.Lock() +	// Lock mutex +	mu.Lock(lt) -	// shouldn't reach here -	default: -		panic("unexpected lock type") -	} +	// Unlock map +	mm.mapmu.Unlock() -	// Unlock map + return -	mm.mapMu.Unlock()  	return func() { -		mm.mapMu.Lock() -		unlock() -		go mm.onUnlock() +		mm.mapmu.Lock() +		mu.Unlock() +		go mm.cleanup()  	}  } -// onUnlock is performed as the final (async) stage of releasing an acquired key / map mutex. -func (mm *MutexMap) onUnlock() { -	// Decr counter +// lockMap will lock the whole map under given lock type. +func (mm *MutexMap) lockMap(lt uint8) { +	// Spin lock until returns true +	mm.spinLock(func() bool { +		// Attempt to acquire usable map state +		state, ok := acquireState(mm.state, lt) +		if !ok { +			return false +		} + +		// Update state +		mm.state = state + +		return true +	}) + +	// Incr count +	mm.count++ + +	// State acquired, unlock +	mm.mapmu.Unlock() +} + +// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map. +func (mm *MutexMap) cleanup() { +	// Decr count  	mm.count-- -	if mm.count < 1 { -		// Perform all queued evictions -		for i := 0; i < len(mm.evict); i++ { -			mm.evict[i]() +	if mm.count%mm.wake == 0 { +		// Notify queued routines +		for _, mu := range mm.queue { +			mu.Unlock()  		} -		// Notify all waiting goroutines -		for i := 0; i < len(mm.queue); i++ { -			mm.queue[i]() +		// Reset queue +		mm.queue = mm.queue[:0] +	} + +	if mm.count < 1 { +		// Perform evictions +		for _, mu := range mm.evict { +			key := mu.key +			mu.key = "" +			delete(mm.mumap, key) +			mm.mpool.Release(mu)  		} -		// Reset the map state -		mm.evict = nil -		mm.queue = nil +		// Reset map state +		mm.evict = mm.evict[:0]  		mm.state = stateUnlockd +		mm.mpool.GC() +		mm.qpool.GC()  	} -	// Finally, unlock -	mm.mapMu.Unlock() +	// Unlock map +	mm.mapmu.Unlock()  }  // RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.  // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.  func (mm *MutexMap) RLockMap() *LockState { -	return mm.getMapLock(lockTypeRead) +	mm.lockMap(lockTypeRead | lockTypeMap) +	return &LockState{ +		mmap: mm, +		ltyp: lockTypeRead, +	}  }  // LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.  // Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.  func (mm *MutexMap) LockMap() *LockState { -	return mm.getMapLock(lockTypeWrite) +	mm.lockMap(lockTypeWrite | lockTypeMap) +	return &LockState{ +		mmap: mm, +		ltyp: lockTypeWrite, +	}  }  // RLock acquires a mutex read lock for supplied key, returning an RUnlock function.  func (mm *MutexMap) RLock(key string) (runlock func()) { -	return mm.getLock(key, lockTypeRead) +	return mm.lock(key, lockTypeRead)  }  // Lock acquires a mutex write lock for supplied key, returning an Unlock function.  func (mm *MutexMap) Lock(key string) (unlock func()) { -	return mm.getLock(key, lockTypeWrite) -} - -// getLock will fetch lock of provided type, for given key, returning unlock function. -func (mm *MutexMap) getLock(key string, lt locktype) func() { -	// Spin until achieve lock -	mm.spinLock(func() bool { -		return permitLockType(mm.state, lt) && -			mm.count < mm.maxmu // not overloaded -	}) - -	// Perform actual mutex lock -	return mm.lockMutex(key, lt) -} - -// getMapLock will acquire a map lock of provided type, returning a LockState session. -func (mm *MutexMap) getMapLock(lt locktype) *LockState { -	// Spin until achieve lock -	mm.spinLock(func() bool { -		return permitLockType(mm.state, lt|lockTypeMap) && -			mm.count < mm.maxmu // not overloaded -	}) - -	// Incr counter -	mm.count++ - -	switch { -	// Set read lock state -	case lt&lockTypeRead != 0: -		mm.state = stateRLocked - -	// Set write lock state -	case lt&lockTypeWrite != 0: -		mm.state = stateLocked - -	default: -		panic("unexpected lock type") -	} - -	// Unlock + return -	mm.mapMu.Unlock() -	return &LockState{ -		mmap: mm, -		ltyp: lt, -	} +	return mm.lock(key, lockTypeWrite)  }  // LockState represents a window to a locked MutexMap. @@ -267,56 +354,113 @@ type LockState struct {  	wait sync.WaitGroup  	mmap *MutexMap  	done uint32 -	ltyp locktype +	ltyp uint8  }  // Lock: see MutexMap.Lock() definition. Will panic if map only read locked.  func (st *LockState) Lock(key string) (unlock func()) { -	return st.getLock(key, lockTypeWrite) +	return st.lock(key, lockTypeWrite)  }  // RLock: see MutexMap.RLock() definition.  func (st *LockState) RLock(key string) (runlock func()) { -	return st.getLock(key, lockTypeRead) +	return st.lock(key, lockTypeRead)  } -// UnlockMap will close this state and release the currently locked map. -func (st *LockState) UnlockMap() { -	// Set state to finished (or panic if already done) -	if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { -		panic("called UnlockMap() on expired state") -	} - -	// Wait until done -	st.wait.Wait() - -	// Async reset map -	st.mmap.mapMu.Lock() -	go st.mmap.onUnlock() -} - -// getLock: see MutexMap.getLock() definition. -func (st *LockState) getLock(key string, lt locktype) func() { +// lock: see MutexMap.lock() definition. +func (st *LockState) lock(key string, lt uint8) func() {  	st.wait.Add(1) // track lock -	// Check if closed, or if write lock is allowed  	if atomic.LoadUint32(&st.done) == 1 { -		panic("map lock closed") +		panic("called (r)lock on unlocked state")  	} else if lt&lockTypeWrite != 0 &&  		st.ltyp&lockTypeWrite == 0 { -		panic("called .Lock() on rlocked map") +		panic("called lock on rlocked map")  	} -	// Spin until achieve map lock +	var ok bool +	var mu *rwmutex + +	// Spin lock until returns true  	st.mmap.spinLock(func() bool { -		return st.mmap.count < st.mmap.maxmu -	}) // i.e. not overloaded +		// Check not overloaded +		if !(st.mmap.count < st.mmap.maxmu) { +			return false +		} + +		// Ensure mutex at key +		// is in lockable state +		mu, ok = st.mmap.mumap[key] +		return !ok || mu.CanLock(lt) +	}) + +	// Incr count +	st.mmap.count++ + +	if !ok { +		// No mutex found for key + +		// Alloc from pool +		v := st.mmap.mpool.Acquire() +		mu = v.(*rwmutex) +		st.mmap.mumap[key] = mu + +		// Set our key +		mu.key = key + +		// Queue for eviction +		st.mmap.evict = append(st.mmap.evict, mu) +	} -	// Perform actual mutex lock -	unlock := st.mmap.lockMutex(key, lt) +	// Lock mutex +	mu.Lock(lt) + +	// Unlock map +	st.mmap.mapmu.Unlock()  	return func() { -		unlock() -		st.wait.Done() +		st.mmap.mapmu.Lock() +		mu.Unlock() +		go st.mmap.cleanup() +		st.wait.Add(-1) +	} +} + +// UnlockMap will close this state and release the currently locked map. +func (st *LockState) UnlockMap() { +	if !atomic.CompareAndSwapUint32(&st.done, 0, 1) { +		panic("called unlockmap on expired state") +	} +	st.wait.Wait() +	st.mmap.mapmu.Lock() +	go st.mmap.cleanup() +} + +// rwmutex is a very simple *representation* of a read-write +// mutex, though not one in implementation. it works by +// tracking the lock state for a given map key, which is +// protected by the map's mutex. +type rwmutex struct { +	rcnt uint32 +	lock uint8 +	key  string +} + +func (mu *rwmutex) CanLock(lt uint8) bool { +	return mu.lock == 0 || +		(mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0) +} + +func (mu *rwmutex) Lock(lt uint8) { +	mu.lock = lt +	if lt&lockTypeRead != 0 { +		mu.rcnt++ +	} +} + +func (mu *rwmutex) Unlock() { +	mu.rcnt-- +	if mu.rcnt == 0 { +		mu.lock = 0  	}  } diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex.go b/vendor/codeberg.org/gruf/go-mutexes/mutex.go index c4f3f8876..3841c9423 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/mutex.go +++ b/vendor/codeberg.org/gruf/go-mutexes/mutex.go @@ -41,24 +41,24 @@ func WithFuncRW(mu RWMutex, onLock, onRLock, onUnlock, onRUnlock func()) RWMutex  }  // baseMutex simply wraps a sync.Mutex to implement our Mutex interface -type baseMutex struct{ mu sync.Mutex } +type baseMutex sync.Mutex  func (mu *baseMutex) Lock() func() { -	mu.mu.Lock() -	return mu.mu.Unlock +	(*sync.Mutex)(mu).Lock() +	return (*sync.Mutex)(mu).Unlock  }  // baseRWMutex simply wraps a sync.RWMutex to implement our RWMutex interface -type baseRWMutex struct{ mu sync.RWMutex } +type baseRWMutex sync.RWMutex  func (mu *baseRWMutex) Lock() func() { -	mu.mu.Lock() -	return mu.mu.Unlock +	(*sync.RWMutex)(mu).Lock() +	return (*sync.RWMutex)(mu).Unlock  }  func (mu *baseRWMutex) RLock() func() { -	mu.mu.RLock() -	return mu.mu.RUnlock +	(*sync.RWMutex)(mu).RLock() +	return (*sync.RWMutex)(mu).RUnlock  }  // fnMutex wraps a Mutex to add hooks for Lock and Unlock diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go index 7a9747521..5a0383dce 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go +++ b/vendor/codeberg.org/gruf/go-mutexes/mutex_safe.go @@ -1,6 +1,8 @@  package mutexes -import "sync" +import ( +	"sync/atomic" +)  // WithSafety wrapps the supplied Mutex to protect unlock fns  // from being called multiple times @@ -19,8 +21,7 @@ type safeMutex struct{ mu Mutex }  func (mu *safeMutex) Lock() func() {  	unlock := mu.mu.Lock() -	once := sync.Once{} -	return func() { once.Do(unlock) } +	return once(unlock)  }  // safeRWMutex simply wraps a RWMutex to add multi-unlock safety @@ -28,12 +29,22 @@ type safeRWMutex struct{ mu RWMutex }  func (mu *safeRWMutex) Lock() func() {  	unlock := mu.mu.Lock() -	once := sync.Once{} -	return func() { once.Do(unlock) } +	return once(unlock)  }  func (mu *safeRWMutex) RLock() func() {  	unlock := mu.mu.RLock() -	once := sync.Once{} -	return func() { once.Do(unlock) } +	return once(unlock) +} + +// once will perform 'do' only once, this is safe for unlocks +// as 2 functions calling 'unlock()' don't need absolute guarantees +// that by the time it is completed the unlock was finished. +func once(do func()) func() { +	var done uint32 +	return func() { +		if atomic.CompareAndSwapUint32(&done, 0, 1) { +			do() +		} +	}  } diff --git a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go index 2e7b8f802..03bf0e389 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go +++ b/vendor/codeberg.org/gruf/go-mutexes/mutex_timeout.go @@ -97,7 +97,9 @@ func mutexTimeout(d time.Duration, unlock func(), fn func()) func() {  // timerPool is the global &timer{} pool.  var timerPool = sync.Pool{  	New: func() interface{} { -		return newtimer() +		t := time.NewTimer(time.Minute) +		t.Stop() +		return &timer{t: t, c: make(chan struct{})}  	},  } @@ -107,13 +109,6 @@ type timer struct {  	c chan struct{}  } -// newtimer returns a new timer instance. -func newtimer() *timer { -	t := time.NewTimer(time.Minute) -	t.Stop() -	return &timer{t: t, c: make(chan struct{})} -} -  // Start will start the timer with duration 'd', performing 'fn' on timeout.  func (t *timer) Start(d time.Duration, fn func()) {  	t.t.Reset(d) diff --git a/vendor/codeberg.org/gruf/go-mutexes/pool.go b/vendor/codeberg.org/gruf/go-mutexes/pool.go new file mode 100644 index 000000000..135e2c117 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-mutexes/pool.go @@ -0,0 +1,40 @@ +package mutexes + +// pool is a very simply memory pool. +type pool struct { +	current []interface{} +	victim  []interface{} +	alloc   func() interface{} +} + +// Acquire will returns a sync.RWMutex from pool (or alloc new). +func (p *pool) Acquire() interface{} { +	// First try the current queue +	if l := len(p.current) - 1; l >= 0 { +		v := p.current[l] +		p.current = p.current[:l] +		return v +	} + +	// Next try the victim queue. +	if l := len(p.victim) - 1; l >= 0 { +		v := p.victim[l] +		p.victim = p.victim[:l] +		return v +	} + +	// Lastly, alloc new. +	return p.alloc() +} + +// Release places a sync.RWMutex back in the pool. +func (p *pool) Release(v interface{}) { +	p.current = append(p.current, v) +} + +// GC will clear out unused entries from the pool. +func (p *pool) GC() { +	current := p.current +	p.current = nil +	p.victim = current +} diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go index a8741afe0..fd9935f25 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/kv/store.go @@ -45,7 +45,7 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) {  	// Return new KVStore  	return &KVStore{ -		mutex:   mutexes.NewMap(-1), +		mutex:   mutexes.NewMap(-1, -1),  		storage: storage,  	}, nil  } diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go index 287042886..b3c480b3d 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go @@ -10,7 +10,7 @@ import (  	"syscall"  	"codeberg.org/gruf/go-bytes" -	"codeberg.org/gruf/go-pools" +	"codeberg.org/gruf/go-fastcopy"  	"codeberg.org/gruf/go-store/util"  ) @@ -81,10 +81,10 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig {  // DiskStorage is a Storage implementation that stores directly to a filesystem  type DiskStorage struct { -	path   string           // path is the root path of this store -	bufp   pools.BufferPool // bufp is the buffer pool for this DiskStorage -	config DiskConfig       // cfg is the supplied configuration for this store -	lock   *Lock            // lock is the opened lockfile for this storage instance +	path   string            // path is the root path of this store +	cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool +	config DiskConfig        // cfg is the supplied configuration for this store +	lock   *Lock             // lock is the opened lockfile for this storage instance  }  // OpenFile opens a DiskStorage instance for given folder path and configuration @@ -147,13 +147,17 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {  		return nil, err  	} -	// Return new DiskStorage -	return &DiskStorage{ +	// Prepare DiskStorage +	st := &DiskStorage{  		path:   storePath, -		bufp:   pools.NewBufferPool(config.WriteBufSize),  		config: config,  		lock:   lock, -	}, nil +	} + +	// Set copypool buffer size +	st.cppool.Buffer(config.WriteBufSize) + +	return st, nil  }  // Clean implements Storage.Clean() @@ -271,13 +275,8 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {  	}  	defer cFile.Close() -	// Acquire write buffer -	buf := st.bufp.Get() -	defer st.bufp.Put(buf) -	buf.Grow(st.config.WriteBufSize) - -	// Copy reader to file -	_, err = io.CopyBuffer(cFile, r, buf.B) +	// Copy provided reader to file +	_, err = st.cppool.Copy(cFile, r)  	return err  }  | 
