diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-pools')
-rw-r--r-- | vendor/codeberg.org/gruf/go-pools/bufio.go | 42 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-pools/bytes.go | 14 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-pools/henc.go | 46 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-pools/pool.go | 387 |
4 files changed, 468 insertions, 21 deletions
diff --git a/vendor/codeberg.org/gruf/go-pools/bufio.go b/vendor/codeberg.org/gruf/go-pools/bufio.go index 8c2ef9730..e22fd6a1c 100644 --- a/vendor/codeberg.org/gruf/go-pools/bufio.go +++ b/vendor/codeberg.org/gruf/go-pools/bufio.go @@ -6,7 +6,7 @@ import ( "sync" ) -// BufioReaderPool is a pooled allocator for bufio.Reader objects +// BufioReaderPool is a pooled allocator for bufio.Reader objects. type BufioReaderPool interface { // Get fetches a bufio.Reader from pool and resets to supplied reader Get(io.Reader) *bufio.Reader @@ -15,32 +15,39 @@ type BufioReaderPool interface { Put(*bufio.Reader) } -// NewBufioReaderPool returns a newly instantiated bufio.Reader pool +// NewBufioReaderPool returns a newly instantiated bufio.Reader pool. func NewBufioReaderPool(size int) BufioReaderPool { return &bufioReaderPool{ - Pool: sync.Pool{ + pool: sync.Pool{ New: func() interface{} { return bufio.NewReaderSize(nil, size) }, }, + size: size, } } -// bufioReaderPool is our implementation of BufioReaderPool -type bufioReaderPool struct{ sync.Pool } +// bufioReaderPool is our implementation of BufioReaderPool. +type bufioReaderPool struct { + pool sync.Pool + size int +} func (p *bufioReaderPool) Get(r io.Reader) *bufio.Reader { - br := p.Pool.Get().(*bufio.Reader) + br := p.pool.Get().(*bufio.Reader) br.Reset(r) return br } func (p *bufioReaderPool) Put(br *bufio.Reader) { + if br.Size() < p.size { + return + } br.Reset(nil) - p.Pool.Put(br) + p.pool.Put(br) } -// BufioWriterPool is a pooled allocator for bufio.Writer objects +// BufioWriterPool is a pooled allocator for bufio.Writer objects. type BufioWriterPool interface { // Get fetches a bufio.Writer from pool and resets to supplied writer Get(io.Writer) *bufio.Writer @@ -49,27 +56,34 @@ type BufioWriterPool interface { Put(*bufio.Writer) } -// NewBufioWriterPool returns a newly instantiated bufio.Writer pool +// NewBufioWriterPool returns a newly instantiated bufio.Writer pool. func NewBufioWriterPool(size int) BufioWriterPool { return &bufioWriterPool{ - Pool: sync.Pool{ + pool: sync.Pool{ New: func() interface{} { return bufio.NewWriterSize(nil, size) }, }, + size: size, } } -// bufioWriterPool is our implementation of BufioWriterPool -type bufioWriterPool struct{ sync.Pool } +// bufioWriterPool is our implementation of BufioWriterPool. +type bufioWriterPool struct { + pool sync.Pool + size int +} func (p *bufioWriterPool) Get(w io.Writer) *bufio.Writer { - bw := p.Pool.Get().(*bufio.Writer) + bw := p.pool.Get().(*bufio.Writer) bw.Reset(w) return bw } func (p *bufioWriterPool) Put(bw *bufio.Writer) { + if bw.Size() < p.size { + return + } bw.Reset(nil) - p.Pool.Put(bw) + p.pool.Put(bw) } diff --git a/vendor/codeberg.org/gruf/go-pools/bytes.go b/vendor/codeberg.org/gruf/go-pools/bytes.go index 76fe18616..1aee77064 100644 --- a/vendor/codeberg.org/gruf/go-pools/bytes.go +++ b/vendor/codeberg.org/gruf/go-pools/bytes.go @@ -3,16 +3,16 @@ package pools import ( "sync" - "codeberg.org/gruf/go-bytes" + "codeberg.org/gruf/go-byteutil" ) // BufferPool is a pooled allocator for bytes.Buffer objects type BufferPool interface { // Get fetches a bytes.Buffer from pool - Get() *bytes.Buffer + Get() *byteutil.Buffer // Put places supplied bytes.Buffer in pool - Put(*bytes.Buffer) + Put(*byteutil.Buffer) } // NewBufferPool returns a newly instantiated bytes.Buffer pool @@ -20,7 +20,7 @@ func NewBufferPool(size int) BufferPool { return &bufferPool{ pool: sync.Pool{ New: func() interface{} { - return &bytes.Buffer{B: make([]byte, 0, size)} + return &byteutil.Buffer{B: make([]byte, 0, size)} }, }, size: size, @@ -33,11 +33,11 @@ type bufferPool struct { size int } -func (p *bufferPool) Get() *bytes.Buffer { - return p.pool.Get().(*bytes.Buffer) +func (p *bufferPool) Get() *byteutil.Buffer { + return p.pool.Get().(*byteutil.Buffer) } -func (p *bufferPool) Put(buf *bytes.Buffer) { +func (p *bufferPool) Put(buf *byteutil.Buffer) { if buf.Cap() < p.size { return } diff --git a/vendor/codeberg.org/gruf/go-pools/henc.go b/vendor/codeberg.org/gruf/go-pools/henc.go new file mode 100644 index 000000000..cad905af4 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-pools/henc.go @@ -0,0 +1,46 @@ +package pools + +import ( + "hash" + "sync" + + "codeberg.org/gruf/go-hashenc" +) + +// HashEncoderPool is a pooled allocator for hashenc.HashEncoder objects. +type HashEncoderPool interface { + // Get fetches a hashenc.HashEncoder from pool + Get() hashenc.HashEncoder + + // Put places supplied hashenc.HashEncoder back in pool + Put(hashenc.HashEncoder) +} + +// NewHashEncoderPool returns a newly instantiated hashenc.HashEncoder pool. +func NewHashEncoderPool(hash func() hash.Hash, enc func() hashenc.Encoder) HashEncoderPool { + return &hencPool{ + pool: sync.Pool{ + New: func() interface{} { + return hashenc.New(hash(), enc()) + }, + }, + size: hashenc.New(hash(), enc()).Size(), + } +} + +// hencPool is our implementation of HashEncoderPool. +type hencPool struct { + pool sync.Pool + size int +} + +func (p *hencPool) Get() hashenc.HashEncoder { + return p.pool.Get().(hashenc.HashEncoder) +} + +func (p *hencPool) Put(henc hashenc.HashEncoder) { + if henc.Size() < p.size { + return + } + p.pool.Put(henc) +} diff --git a/vendor/codeberg.org/gruf/go-pools/pool.go b/vendor/codeberg.org/gruf/go-pools/pool.go new file mode 100644 index 000000000..1e3db74b2 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-pools/pool.go @@ -0,0 +1,387 @@ +package pools + +import ( + "runtime" + "sync" + "sync/atomic" + "unsafe" +) + +type Pool struct { + // New is used to instantiate new items + New func() interface{} + + // Evict is called on evicted items during pool .Clean() + Evict func(interface{}) + + local unsafe.Pointer // ptr to []_ppool + localSz int64 // count of all elems in local + victim unsafe.Pointer // ptr to []_ppool + victimSz int64 // count of all elems in victim + mutex sync.Mutex // mutex protects new cleanups, and new allocations of local +} + +// Get attempts to fetch an item from the pool, failing that allocates with supplied .New() function +func (p *Pool) Get() interface{} { + // Get local pool for proc + // (also pins proc) + pool, pid := p.pin() + + if v := pool.getPrivate(); v != nil { + // local _ppool private elem acquired + runtime_procUnpin() + atomic.AddInt64(&p.localSz, -1) + return v + } + + if v := pool.get(); v != nil { + // local _ppool queue elem acquired + runtime_procUnpin() + atomic.AddInt64(&p.localSz, -1) + return v + } + + // Unpin before attempting slow + runtime_procUnpin() + if v := p.getSlow(pid); v != nil { + // note size decrementing + // is handled within p.getSlow() + // as we don't know if it came + // from the local or victim pools + return v + } + + // Alloc new + return p.New() +} + +// Put places supplied item in the proc local pool +func (p *Pool) Put(v interface{}) { + // Don't store nil + if v == nil { + return + } + + // Get proc local pool + // (also pins proc) + pool, _ := p.pin() + + // first try private, then queue + if !pool.setPrivate(v) { + pool.put(v) + } + runtime_procUnpin() + + // Increment local pool size + atomic.AddInt64(&p.localSz, 1) +} + +// Clean will drop the current victim pools, move the current local pools to its +// place and reset the local pools ptr in order to be regenerated +func (p *Pool) Clean() { + p.mutex.Lock() + + // victim becomes local, local becomes nil + localPtr := atomic.SwapPointer(&p.local, nil) + victimPtr := atomic.SwapPointer(&p.victim, localPtr) + localSz := atomic.SwapInt64(&p.localSz, 0) + atomic.StoreInt64(&p.victimSz, localSz) + + var victim []ppool + if victimPtr != nil { + victim = *(*[]ppool)(victimPtr) + } + + // drain each of the vict _ppool items + for i := 0; i < len(victim); i++ { + ppool := &victim[i] + ppool.evict(p.Evict) + } + + p.mutex.Unlock() +} + +// LocalSize returns the total number of elements in all the proc-local pools +func (p *Pool) LocalSize() int64 { + return atomic.LoadInt64(&p.localSz) +} + +// VictimSize returns the total number of elements in all the victim (old proc-local) pools +func (p *Pool) VictimSize() int64 { + return atomic.LoadInt64(&p.victimSz) +} + +// getSlow is the slow path for fetching an element, attempting to steal from other proc's +// local pools, and failing that, from the aging-out victim pools. pid is still passed so +// not all procs start iterating from the same index +func (p *Pool) getSlow(pid int) interface{} { + // get local pools + local := p.localPools() + + // Try to steal from other proc locals + for i := 0; i < len(local); i++ { + pool := &local[(pid+i+1)%len(local)] + if v := pool.get(); v != nil { + atomic.AddInt64(&p.localSz, -1) + return v + } + } + + // get victim pools + victim := p.victimPools() + + // Attempt to steal from victim pools + for i := 0; i < len(victim); i++ { + pool := &victim[(pid+i+1)%len(victim)] + if v := pool.get(); v != nil { + atomic.AddInt64(&p.victimSz, -1) + return v + } + } + + // Set victim pools to nil (none found) + atomic.StorePointer(&p.victim, nil) + + return nil +} + +// localPools safely loads slice of local _ppools +func (p *Pool) localPools() []ppool { + local := atomic.LoadPointer(&p.local) + if local == nil { + return nil + } + return *(*[]ppool)(local) +} + +// victimPools safely loads slice of victim _ppools +func (p *Pool) victimPools() []ppool { + victim := atomic.LoadPointer(&p.victim) + if victim == nil { + return nil + } + return *(*[]ppool)(victim) +} + +// pin will get fetch pin proc to PID, fetch proc-local _ppool and current PID we're pinned to +func (p *Pool) pin() (*ppool, int) { + for { + // get local pools + local := p.localPools() + + if len(local) > 0 { + // local already initialized + + // pin to current proc + pid := runtime_procPin() + + // check for pid local pool + if pid < len(local) { + return &local[pid], pid + } + + // unpin from proc + runtime_procUnpin() + } else { + // local not yet initialized + + // Check functions are set + if p.New == nil { + panic("new func must not be nil") + } + if p.Evict == nil { + panic("evict func must not be nil") + } + } + + // allocate local + p.allocLocal() + } +} + +// allocLocal allocates a new local pool slice, with the old length passed to check +// if pool was previously nil, or whether a change in GOMAXPROCS occurred +func (p *Pool) allocLocal() { + // get pool lock + p.mutex.Lock() + + // Calculate new size to use + size := runtime.GOMAXPROCS(0) + + local := p.localPools() + if len(local) != size { + // GOMAXPROCS changed, reallocate + pools := make([]ppool, size) + atomic.StorePointer(&p.local, unsafe.Pointer(&pools)) + + // Evict old local elements + for i := 0; i < len(local); i++ { + pool := &local[i] + pool.evict(p.Evict) + } + } + + // Unlock pool + p.mutex.Unlock() +} + +// _ppool is a proc local pool +type _ppool struct { + // root is the root element of the _ppool queue, + // and protects concurrent access to the queue + root unsafe.Pointer + + // private is a proc private member accessible + // only to the pid this _ppool is assigned to, + // except during evict (hence the unsafe pointer) + private unsafe.Pointer +} + +// ppool wraps _ppool with pad. +type ppool struct { + _ppool + + // Prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + pad [128 - unsafe.Sizeof(_ppool{})%128]byte +} + +// getPrivate gets the proc private member +func (pp *_ppool) getPrivate() interface{} { + ptr := atomic.SwapPointer(&pp.private, nil) + if ptr == nil { + return nil + } + return *(*interface{})(ptr) +} + +// setPrivate sets the proc private member (only if unset) +func (pp *_ppool) setPrivate(v interface{}) bool { + return atomic.CompareAndSwapPointer(&pp.private, nil, unsafe.Pointer(&v)) +} + +// get fetches an element from the queue +func (pp *_ppool) get() interface{} { + for { + // Attempt to load root elem + root := atomic.LoadPointer(&pp.root) + if root == nil { + return nil + } + + // Attempt to consume root elem + if root == inUsePtr || + !atomic.CompareAndSwapPointer(&pp.root, root, inUsePtr) { + continue + } + + // Root becomes next in chain + e := (*elem)(root) + v := e.value + + // Place new root back in the chain + atomic.StorePointer(&pp.root, unsafe.Pointer(e.next)) + putElem(e) + + return v + } +} + +// put places an element in the queue +func (pp *_ppool) put(v interface{}) { + // Prepare next elem + e := getElem() + e.value = v + + for { + // Attempt to load root elem + root := atomic.LoadPointer(&pp.root) + if root == inUsePtr { + continue + } + + // Set the next elem value (might be nil) + e.next = (*elem)(root) + + // Attempt to store this new value at root + if atomic.CompareAndSwapPointer(&pp.root, root, unsafe.Pointer(e)) { + break + } + } +} + +// hook evicts all entries from pool, calling hook on each +func (pp *_ppool) evict(hook func(interface{})) { + if v := pp.getPrivate(); v != nil { + hook(v) + } + for { + v := pp.get() + if v == nil { + break + } + hook(v) + } +} + +// inUsePtr is a ptr used to indicate _ppool is in use +var inUsePtr = unsafe.Pointer(&elem{ + next: nil, + value: "in_use", +}) + +// elem defines an element in the _ppool queue +type elem struct { + next *elem + value interface{} +} + +// elemPool is a simple pool of unused elements +var elemPool = struct { + root unsafe.Pointer +}{} + +// getElem fetches a new elem from pool, or creates new +func getElem() *elem { + // Attempt to load root elem + root := atomic.LoadPointer(&elemPool.root) + if root == nil { + return &elem{} + } + + // Attempt to consume root elem + if root == inUsePtr || + !atomic.CompareAndSwapPointer(&elemPool.root, root, inUsePtr) { + return &elem{} + } + + // Root becomes next in chain + e := (*elem)(root) + atomic.StorePointer(&elemPool.root, unsafe.Pointer(e.next)) + e.next = nil + + return e +} + +// putElem will place element in the pool +func putElem(e *elem) { + e.value = nil + + // Attempt to load root elem + root := atomic.LoadPointer(&elemPool.root) + if root == inUsePtr { + return // drop + } + + // Set the next elem value (might be nil) + e.next = (*elem)(root) + + // Attempt to store this new value at root + atomic.CompareAndSwapPointer(&elemPool.root, root, unsafe.Pointer(e)) +} + +//go:linkname runtime_procPin sync.runtime_procPin +func runtime_procPin() int + +//go:linkname runtime_procUnpin sync.runtime_procUnpin +func runtime_procUnpin() |