diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-pools')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-pools/bufio.go | 42 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-pools/bytes.go | 14 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-pools/henc.go | 46 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-pools/pool.go | 387 | 
4 files changed, 468 insertions, 21 deletions
| diff --git a/vendor/codeberg.org/gruf/go-pools/bufio.go b/vendor/codeberg.org/gruf/go-pools/bufio.go index 8c2ef9730..e22fd6a1c 100644 --- a/vendor/codeberg.org/gruf/go-pools/bufio.go +++ b/vendor/codeberg.org/gruf/go-pools/bufio.go @@ -6,7 +6,7 @@ import (  	"sync"  ) -// BufioReaderPool is a pooled allocator for bufio.Reader objects +// BufioReaderPool is a pooled allocator for bufio.Reader objects.  type BufioReaderPool interface {  	// Get fetches a bufio.Reader from pool and resets to supplied reader  	Get(io.Reader) *bufio.Reader @@ -15,32 +15,39 @@ type BufioReaderPool interface {  	Put(*bufio.Reader)  } -// NewBufioReaderPool returns a newly instantiated bufio.Reader pool +// NewBufioReaderPool returns a newly instantiated bufio.Reader pool.  func NewBufioReaderPool(size int) BufioReaderPool {  	return &bufioReaderPool{ -		Pool: sync.Pool{ +		pool: sync.Pool{  			New: func() interface{} {  				return bufio.NewReaderSize(nil, size)  			},  		}, +		size: size,  	}  } -// bufioReaderPool is our implementation of BufioReaderPool -type bufioReaderPool struct{ sync.Pool } +// bufioReaderPool is our implementation of BufioReaderPool. +type bufioReaderPool struct { +	pool sync.Pool +	size int +}  func (p *bufioReaderPool) Get(r io.Reader) *bufio.Reader { -	br := p.Pool.Get().(*bufio.Reader) +	br := p.pool.Get().(*bufio.Reader)  	br.Reset(r)  	return br  }  func (p *bufioReaderPool) Put(br *bufio.Reader) { +	if br.Size() < p.size { +		return +	}  	br.Reset(nil) -	p.Pool.Put(br) +	p.pool.Put(br)  } -// BufioWriterPool is a pooled allocator for bufio.Writer objects +// BufioWriterPool is a pooled allocator for bufio.Writer objects.  type BufioWriterPool interface {  	// Get fetches a bufio.Writer from pool and resets to supplied writer  	Get(io.Writer) *bufio.Writer @@ -49,27 +56,34 @@ type BufioWriterPool interface {  	Put(*bufio.Writer)  } -// NewBufioWriterPool returns a newly instantiated bufio.Writer pool +// NewBufioWriterPool returns a newly instantiated bufio.Writer pool.  func NewBufioWriterPool(size int) BufioWriterPool {  	return &bufioWriterPool{ -		Pool: sync.Pool{ +		pool: sync.Pool{  			New: func() interface{} {  				return bufio.NewWriterSize(nil, size)  			},  		}, +		size: size,  	}  } -// bufioWriterPool is our implementation of BufioWriterPool -type bufioWriterPool struct{ sync.Pool } +// bufioWriterPool is our implementation of BufioWriterPool. +type bufioWriterPool struct { +	pool sync.Pool +	size int +}  func (p *bufioWriterPool) Get(w io.Writer) *bufio.Writer { -	bw := p.Pool.Get().(*bufio.Writer) +	bw := p.pool.Get().(*bufio.Writer)  	bw.Reset(w)  	return bw  }  func (p *bufioWriterPool) Put(bw *bufio.Writer) { +	if bw.Size() < p.size { +		return +	}  	bw.Reset(nil) -	p.Pool.Put(bw) +	p.pool.Put(bw)  } diff --git a/vendor/codeberg.org/gruf/go-pools/bytes.go b/vendor/codeberg.org/gruf/go-pools/bytes.go index 76fe18616..1aee77064 100644 --- a/vendor/codeberg.org/gruf/go-pools/bytes.go +++ b/vendor/codeberg.org/gruf/go-pools/bytes.go @@ -3,16 +3,16 @@ package pools  import (  	"sync" -	"codeberg.org/gruf/go-bytes" +	"codeberg.org/gruf/go-byteutil"  )  // BufferPool is a pooled allocator for bytes.Buffer objects  type BufferPool interface {  	// Get fetches a bytes.Buffer from pool -	Get() *bytes.Buffer +	Get() *byteutil.Buffer  	// Put places supplied bytes.Buffer in pool -	Put(*bytes.Buffer) +	Put(*byteutil.Buffer)  }  // NewBufferPool returns a newly instantiated bytes.Buffer pool @@ -20,7 +20,7 @@ func NewBufferPool(size int) BufferPool {  	return &bufferPool{  		pool: sync.Pool{  			New: func() interface{} { -				return &bytes.Buffer{B: make([]byte, 0, size)} +				return &byteutil.Buffer{B: make([]byte, 0, size)}  			},  		},  		size: size, @@ -33,11 +33,11 @@ type bufferPool struct {  	size int  } -func (p *bufferPool) Get() *bytes.Buffer { -	return p.pool.Get().(*bytes.Buffer) +func (p *bufferPool) Get() *byteutil.Buffer { +	return p.pool.Get().(*byteutil.Buffer)  } -func (p *bufferPool) Put(buf *bytes.Buffer) { +func (p *bufferPool) Put(buf *byteutil.Buffer) {  	if buf.Cap() < p.size {  		return  	} diff --git a/vendor/codeberg.org/gruf/go-pools/henc.go b/vendor/codeberg.org/gruf/go-pools/henc.go new file mode 100644 index 000000000..cad905af4 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-pools/henc.go @@ -0,0 +1,46 @@ +package pools + +import ( +	"hash" +	"sync" + +	"codeberg.org/gruf/go-hashenc" +) + +// HashEncoderPool is a pooled allocator for hashenc.HashEncoder objects. +type HashEncoderPool interface { +	// Get fetches a hashenc.HashEncoder from pool +	Get() hashenc.HashEncoder + +	// Put places supplied hashenc.HashEncoder back in pool +	Put(hashenc.HashEncoder) +} + +// NewHashEncoderPool returns a newly instantiated hashenc.HashEncoder pool. +func NewHashEncoderPool(hash func() hash.Hash, enc func() hashenc.Encoder) HashEncoderPool { +	return &hencPool{ +		pool: sync.Pool{ +			New: func() interface{} { +				return hashenc.New(hash(), enc()) +			}, +		}, +		size: hashenc.New(hash(), enc()).Size(), +	} +} + +// hencPool is our implementation of HashEncoderPool. +type hencPool struct { +	pool sync.Pool +	size int +} + +func (p *hencPool) Get() hashenc.HashEncoder { +	return p.pool.Get().(hashenc.HashEncoder) +} + +func (p *hencPool) Put(henc hashenc.HashEncoder) { +	if henc.Size() < p.size { +		return +	} +	p.pool.Put(henc) +} diff --git a/vendor/codeberg.org/gruf/go-pools/pool.go b/vendor/codeberg.org/gruf/go-pools/pool.go new file mode 100644 index 000000000..1e3db74b2 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-pools/pool.go @@ -0,0 +1,387 @@ +package pools + +import ( +	"runtime" +	"sync" +	"sync/atomic" +	"unsafe" +) + +type Pool struct { +	// New is used to instantiate new items +	New func() interface{} + +	// Evict is called on evicted items during pool .Clean() +	Evict func(interface{}) + +	local    unsafe.Pointer // ptr to []_ppool +	localSz  int64          // count of all elems in local +	victim   unsafe.Pointer // ptr to []_ppool +	victimSz int64          // count of all elems in victim +	mutex    sync.Mutex     // mutex protects new cleanups, and new allocations of local +} + +// Get attempts to fetch an item from the pool, failing that allocates with supplied .New() function +func (p *Pool) Get() interface{} { +	// Get local pool for proc +	// (also pins proc) +	pool, pid := p.pin() + +	if v := pool.getPrivate(); v != nil { +		// local _ppool private elem acquired +		runtime_procUnpin() +		atomic.AddInt64(&p.localSz, -1) +		return v +	} + +	if v := pool.get(); v != nil { +		// local _ppool queue elem acquired +		runtime_procUnpin() +		atomic.AddInt64(&p.localSz, -1) +		return v +	} + +	// Unpin before attempting slow +	runtime_procUnpin() +	if v := p.getSlow(pid); v != nil { +		// note size decrementing +		// is handled within p.getSlow() +		// as we don't know if it came +		// from the local or victim pools +		return v +	} + +	// Alloc new +	return p.New() +} + +// Put places supplied item in the proc local pool +func (p *Pool) Put(v interface{}) { +	// Don't store nil +	if v == nil { +		return +	} + +	// Get proc local pool +	// (also pins proc) +	pool, _ := p.pin() + +	// first try private, then queue +	if !pool.setPrivate(v) { +		pool.put(v) +	} +	runtime_procUnpin() + +	// Increment local pool size +	atomic.AddInt64(&p.localSz, 1) +} + +// Clean will drop the current victim pools, move the current local pools to its +// place and reset the local pools ptr in order to be regenerated +func (p *Pool) Clean() { +	p.mutex.Lock() + +	// victim becomes local, local becomes nil +	localPtr := atomic.SwapPointer(&p.local, nil) +	victimPtr := atomic.SwapPointer(&p.victim, localPtr) +	localSz := atomic.SwapInt64(&p.localSz, 0) +	atomic.StoreInt64(&p.victimSz, localSz) + +	var victim []ppool +	if victimPtr != nil { +		victim = *(*[]ppool)(victimPtr) +	} + +	// drain each of the vict _ppool items +	for i := 0; i < len(victim); i++ { +		ppool := &victim[i] +		ppool.evict(p.Evict) +	} + +	p.mutex.Unlock() +} + +// LocalSize returns the total number of elements in all the proc-local pools +func (p *Pool) LocalSize() int64 { +	return atomic.LoadInt64(&p.localSz) +} + +// VictimSize returns the total number of elements in all the victim (old proc-local) pools +func (p *Pool) VictimSize() int64 { +	return atomic.LoadInt64(&p.victimSz) +} + +// getSlow is the slow path for fetching an element, attempting to steal from other proc's +// local pools, and failing that, from the aging-out victim pools. pid is still passed so +// not all procs start iterating from the same index +func (p *Pool) getSlow(pid int) interface{} { +	// get local pools +	local := p.localPools() + +	// Try to steal from other proc locals +	for i := 0; i < len(local); i++ { +		pool := &local[(pid+i+1)%len(local)] +		if v := pool.get(); v != nil { +			atomic.AddInt64(&p.localSz, -1) +			return v +		} +	} + +	// get victim pools +	victim := p.victimPools() + +	// Attempt to steal from victim pools +	for i := 0; i < len(victim); i++ { +		pool := &victim[(pid+i+1)%len(victim)] +		if v := pool.get(); v != nil { +			atomic.AddInt64(&p.victimSz, -1) +			return v +		} +	} + +	// Set victim pools to nil (none found) +	atomic.StorePointer(&p.victim, nil) + +	return nil +} + +// localPools safely loads slice of local _ppools +func (p *Pool) localPools() []ppool { +	local := atomic.LoadPointer(&p.local) +	if local == nil { +		return nil +	} +	return *(*[]ppool)(local) +} + +// victimPools safely loads slice of victim _ppools +func (p *Pool) victimPools() []ppool { +	victim := atomic.LoadPointer(&p.victim) +	if victim == nil { +		return nil +	} +	return *(*[]ppool)(victim) +} + +// pin will get fetch pin proc to PID, fetch proc-local _ppool and current PID we're pinned to +func (p *Pool) pin() (*ppool, int) { +	for { +		// get local pools +		local := p.localPools() + +		if len(local) > 0 { +			// local already initialized + +			// pin to current proc +			pid := runtime_procPin() + +			// check for pid local pool +			if pid < len(local) { +				return &local[pid], pid +			} + +			// unpin from proc +			runtime_procUnpin() +		} else { +			// local not yet initialized + +			// Check functions are set +			if p.New == nil { +				panic("new func must not be nil") +			} +			if p.Evict == nil { +				panic("evict func must not be nil") +			} +		} + +		// allocate local +		p.allocLocal() +	} +} + +// allocLocal allocates a new local pool slice, with the old length passed to check +// if pool was previously nil, or whether a change in GOMAXPROCS occurred +func (p *Pool) allocLocal() { +	// get pool lock +	p.mutex.Lock() + +	// Calculate new size to use +	size := runtime.GOMAXPROCS(0) + +	local := p.localPools() +	if len(local) != size { +		// GOMAXPROCS changed, reallocate +		pools := make([]ppool, size) +		atomic.StorePointer(&p.local, unsafe.Pointer(&pools)) + +		// Evict old local elements +		for i := 0; i < len(local); i++ { +			pool := &local[i] +			pool.evict(p.Evict) +		} +	} + +	// Unlock pool +	p.mutex.Unlock() +} + +// _ppool is a proc local pool +type _ppool struct { +	// root is the root element of the _ppool queue, +	// and protects concurrent access to the queue +	root unsafe.Pointer + +	// private is a proc private member accessible +	// only to the pid this _ppool is assigned to, +	// except during evict (hence the unsafe pointer) +	private unsafe.Pointer +} + +// ppool wraps _ppool with pad. +type ppool struct { +	_ppool + +	// Prevents false sharing on widespread platforms with +	// 128 mod (cache line size) = 0 . +	pad [128 - unsafe.Sizeof(_ppool{})%128]byte +} + +// getPrivate gets the proc private member +func (pp *_ppool) getPrivate() interface{} { +	ptr := atomic.SwapPointer(&pp.private, nil) +	if ptr == nil { +		return nil +	} +	return *(*interface{})(ptr) +} + +// setPrivate sets the proc private member (only if unset) +func (pp *_ppool) setPrivate(v interface{}) bool { +	return atomic.CompareAndSwapPointer(&pp.private, nil, unsafe.Pointer(&v)) +} + +// get fetches an element from the queue +func (pp *_ppool) get() interface{} { +	for { +		// Attempt to load root elem +		root := atomic.LoadPointer(&pp.root) +		if root == nil { +			return nil +		} + +		// Attempt to consume root elem +		if root == inUsePtr || +			!atomic.CompareAndSwapPointer(&pp.root, root, inUsePtr) { +			continue +		} + +		// Root becomes next in chain +		e := (*elem)(root) +		v := e.value + +		// Place new root back in the chain +		atomic.StorePointer(&pp.root, unsafe.Pointer(e.next)) +		putElem(e) + +		return v +	} +} + +// put places an element in the queue +func (pp *_ppool) put(v interface{}) { +	// Prepare next elem +	e := getElem() +	e.value = v + +	for { +		// Attempt to load root elem +		root := atomic.LoadPointer(&pp.root) +		if root == inUsePtr { +			continue +		} + +		// Set the next elem value (might be nil) +		e.next = (*elem)(root) + +		// Attempt to store this new value at root +		if atomic.CompareAndSwapPointer(&pp.root, root, unsafe.Pointer(e)) { +			break +		} +	} +} + +// hook evicts all entries from pool, calling hook on each +func (pp *_ppool) evict(hook func(interface{})) { +	if v := pp.getPrivate(); v != nil { +		hook(v) +	} +	for { +		v := pp.get() +		if v == nil { +			break +		} +		hook(v) +	} +} + +// inUsePtr is a ptr used to indicate _ppool is in use +var inUsePtr = unsafe.Pointer(&elem{ +	next:  nil, +	value: "in_use", +}) + +// elem defines an element in the _ppool queue +type elem struct { +	next  *elem +	value interface{} +} + +// elemPool is a simple pool of unused elements +var elemPool = struct { +	root unsafe.Pointer +}{} + +// getElem fetches a new elem from pool, or creates new +func getElem() *elem { +	// Attempt to load root elem +	root := atomic.LoadPointer(&elemPool.root) +	if root == nil { +		return &elem{} +	} + +	// Attempt to consume root elem +	if root == inUsePtr || +		!atomic.CompareAndSwapPointer(&elemPool.root, root, inUsePtr) { +		return &elem{} +	} + +	// Root becomes next in chain +	e := (*elem)(root) +	atomic.StorePointer(&elemPool.root, unsafe.Pointer(e.next)) +	e.next = nil + +	return e +} + +// putElem will place element in the pool +func putElem(e *elem) { +	e.value = nil + +	// Attempt to load root elem +	root := atomic.LoadPointer(&elemPool.root) +	if root == inUsePtr { +		return // drop +	} + +	// Set the next elem value (might be nil) +	e.next = (*elem)(root) + +	// Attempt to store this new value at root +	atomic.CompareAndSwapPointer(&elemPool.root, root, unsafe.Pointer(e)) +} + +//go:linkname runtime_procPin sync.runtime_procPin +func runtime_procPin() int + +//go:linkname runtime_procUnpin sync.runtime_procUnpin +func runtime_procUnpin() | 
