summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-pools
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-pools')
-rw-r--r--vendor/codeberg.org/gruf/go-pools/bufio.go42
-rw-r--r--vendor/codeberg.org/gruf/go-pools/bytes.go14
-rw-r--r--vendor/codeberg.org/gruf/go-pools/henc.go46
-rw-r--r--vendor/codeberg.org/gruf/go-pools/pool.go387
4 files changed, 468 insertions, 21 deletions
diff --git a/vendor/codeberg.org/gruf/go-pools/bufio.go b/vendor/codeberg.org/gruf/go-pools/bufio.go
index 8c2ef9730..e22fd6a1c 100644
--- a/vendor/codeberg.org/gruf/go-pools/bufio.go
+++ b/vendor/codeberg.org/gruf/go-pools/bufio.go
@@ -6,7 +6,7 @@ import (
"sync"
)
-// BufioReaderPool is a pooled allocator for bufio.Reader objects
+// BufioReaderPool is a pooled allocator for bufio.Reader objects.
type BufioReaderPool interface {
// Get fetches a bufio.Reader from pool and resets to supplied reader
Get(io.Reader) *bufio.Reader
@@ -15,32 +15,39 @@ type BufioReaderPool interface {
Put(*bufio.Reader)
}
-// NewBufioReaderPool returns a newly instantiated bufio.Reader pool
+// NewBufioReaderPool returns a newly instantiated bufio.Reader pool.
func NewBufioReaderPool(size int) BufioReaderPool {
return &bufioReaderPool{
- Pool: sync.Pool{
+ pool: sync.Pool{
New: func() interface{} {
return bufio.NewReaderSize(nil, size)
},
},
+ size: size,
}
}
-// bufioReaderPool is our implementation of BufioReaderPool
-type bufioReaderPool struct{ sync.Pool }
+// bufioReaderPool is our implementation of BufioReaderPool.
+type bufioReaderPool struct {
+ pool sync.Pool
+ size int
+}
func (p *bufioReaderPool) Get(r io.Reader) *bufio.Reader {
- br := p.Pool.Get().(*bufio.Reader)
+ br := p.pool.Get().(*bufio.Reader)
br.Reset(r)
return br
}
func (p *bufioReaderPool) Put(br *bufio.Reader) {
+ if br.Size() < p.size {
+ return
+ }
br.Reset(nil)
- p.Pool.Put(br)
+ p.pool.Put(br)
}
-// BufioWriterPool is a pooled allocator for bufio.Writer objects
+// BufioWriterPool is a pooled allocator for bufio.Writer objects.
type BufioWriterPool interface {
// Get fetches a bufio.Writer from pool and resets to supplied writer
Get(io.Writer) *bufio.Writer
@@ -49,27 +56,34 @@ type BufioWriterPool interface {
Put(*bufio.Writer)
}
-// NewBufioWriterPool returns a newly instantiated bufio.Writer pool
+// NewBufioWriterPool returns a newly instantiated bufio.Writer pool.
func NewBufioWriterPool(size int) BufioWriterPool {
return &bufioWriterPool{
- Pool: sync.Pool{
+ pool: sync.Pool{
New: func() interface{} {
return bufio.NewWriterSize(nil, size)
},
},
+ size: size,
}
}
-// bufioWriterPool is our implementation of BufioWriterPool
-type bufioWriterPool struct{ sync.Pool }
+// bufioWriterPool is our implementation of BufioWriterPool.
+type bufioWriterPool struct {
+ pool sync.Pool
+ size int
+}
func (p *bufioWriterPool) Get(w io.Writer) *bufio.Writer {
- bw := p.Pool.Get().(*bufio.Writer)
+ bw := p.pool.Get().(*bufio.Writer)
bw.Reset(w)
return bw
}
func (p *bufioWriterPool) Put(bw *bufio.Writer) {
+ if bw.Size() < p.size {
+ return
+ }
bw.Reset(nil)
- p.Pool.Put(bw)
+ p.pool.Put(bw)
}
diff --git a/vendor/codeberg.org/gruf/go-pools/bytes.go b/vendor/codeberg.org/gruf/go-pools/bytes.go
index 76fe18616..1aee77064 100644
--- a/vendor/codeberg.org/gruf/go-pools/bytes.go
+++ b/vendor/codeberg.org/gruf/go-pools/bytes.go
@@ -3,16 +3,16 @@ package pools
import (
"sync"
- "codeberg.org/gruf/go-bytes"
+ "codeberg.org/gruf/go-byteutil"
)
// BufferPool is a pooled allocator for bytes.Buffer objects
type BufferPool interface {
// Get fetches a bytes.Buffer from pool
- Get() *bytes.Buffer
+ Get() *byteutil.Buffer
// Put places supplied bytes.Buffer in pool
- Put(*bytes.Buffer)
+ Put(*byteutil.Buffer)
}
// NewBufferPool returns a newly instantiated bytes.Buffer pool
@@ -20,7 +20,7 @@ func NewBufferPool(size int) BufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() interface{} {
- return &bytes.Buffer{B: make([]byte, 0, size)}
+ return &byteutil.Buffer{B: make([]byte, 0, size)}
},
},
size: size,
@@ -33,11 +33,11 @@ type bufferPool struct {
size int
}
-func (p *bufferPool) Get() *bytes.Buffer {
- return p.pool.Get().(*bytes.Buffer)
+func (p *bufferPool) Get() *byteutil.Buffer {
+ return p.pool.Get().(*byteutil.Buffer)
}
-func (p *bufferPool) Put(buf *bytes.Buffer) {
+func (p *bufferPool) Put(buf *byteutil.Buffer) {
if buf.Cap() < p.size {
return
}
diff --git a/vendor/codeberg.org/gruf/go-pools/henc.go b/vendor/codeberg.org/gruf/go-pools/henc.go
new file mode 100644
index 000000000..cad905af4
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-pools/henc.go
@@ -0,0 +1,46 @@
+package pools
+
+import (
+ "hash"
+ "sync"
+
+ "codeberg.org/gruf/go-hashenc"
+)
+
+// HashEncoderPool is a pooled allocator for hashenc.HashEncoder objects.
+type HashEncoderPool interface {
+ // Get fetches a hashenc.HashEncoder from pool
+ Get() hashenc.HashEncoder
+
+ // Put places supplied hashenc.HashEncoder back in pool
+ Put(hashenc.HashEncoder)
+}
+
+// NewHashEncoderPool returns a newly instantiated hashenc.HashEncoder pool.
+func NewHashEncoderPool(hash func() hash.Hash, enc func() hashenc.Encoder) HashEncoderPool {
+ return &hencPool{
+ pool: sync.Pool{
+ New: func() interface{} {
+ return hashenc.New(hash(), enc())
+ },
+ },
+ size: hashenc.New(hash(), enc()).Size(),
+ }
+}
+
+// hencPool is our implementation of HashEncoderPool.
+type hencPool struct {
+ pool sync.Pool
+ size int
+}
+
+func (p *hencPool) Get() hashenc.HashEncoder {
+ return p.pool.Get().(hashenc.HashEncoder)
+}
+
+func (p *hencPool) Put(henc hashenc.HashEncoder) {
+ if henc.Size() < p.size {
+ return
+ }
+ p.pool.Put(henc)
+}
diff --git a/vendor/codeberg.org/gruf/go-pools/pool.go b/vendor/codeberg.org/gruf/go-pools/pool.go
new file mode 100644
index 000000000..1e3db74b2
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-pools/pool.go
@@ -0,0 +1,387 @@
+package pools
+
+import (
+ "runtime"
+ "sync"
+ "sync/atomic"
+ "unsafe"
+)
+
+type Pool struct {
+ // New is used to instantiate new items
+ New func() interface{}
+
+ // Evict is called on evicted items during pool .Clean()
+ Evict func(interface{})
+
+ local unsafe.Pointer // ptr to []_ppool
+ localSz int64 // count of all elems in local
+ victim unsafe.Pointer // ptr to []_ppool
+ victimSz int64 // count of all elems in victim
+ mutex sync.Mutex // mutex protects new cleanups, and new allocations of local
+}
+
+// Get attempts to fetch an item from the pool, failing that allocates with supplied .New() function
+func (p *Pool) Get() interface{} {
+ // Get local pool for proc
+ // (also pins proc)
+ pool, pid := p.pin()
+
+ if v := pool.getPrivate(); v != nil {
+ // local _ppool private elem acquired
+ runtime_procUnpin()
+ atomic.AddInt64(&p.localSz, -1)
+ return v
+ }
+
+ if v := pool.get(); v != nil {
+ // local _ppool queue elem acquired
+ runtime_procUnpin()
+ atomic.AddInt64(&p.localSz, -1)
+ return v
+ }
+
+ // Unpin before attempting slow
+ runtime_procUnpin()
+ if v := p.getSlow(pid); v != nil {
+ // note size decrementing
+ // is handled within p.getSlow()
+ // as we don't know if it came
+ // from the local or victim pools
+ return v
+ }
+
+ // Alloc new
+ return p.New()
+}
+
+// Put places supplied item in the proc local pool
+func (p *Pool) Put(v interface{}) {
+ // Don't store nil
+ if v == nil {
+ return
+ }
+
+ // Get proc local pool
+ // (also pins proc)
+ pool, _ := p.pin()
+
+ // first try private, then queue
+ if !pool.setPrivate(v) {
+ pool.put(v)
+ }
+ runtime_procUnpin()
+
+ // Increment local pool size
+ atomic.AddInt64(&p.localSz, 1)
+}
+
+// Clean will drop the current victim pools, move the current local pools to its
+// place and reset the local pools ptr in order to be regenerated
+func (p *Pool) Clean() {
+ p.mutex.Lock()
+
+ // victim becomes local, local becomes nil
+ localPtr := atomic.SwapPointer(&p.local, nil)
+ victimPtr := atomic.SwapPointer(&p.victim, localPtr)
+ localSz := atomic.SwapInt64(&p.localSz, 0)
+ atomic.StoreInt64(&p.victimSz, localSz)
+
+ var victim []ppool
+ if victimPtr != nil {
+ victim = *(*[]ppool)(victimPtr)
+ }
+
+ // drain each of the vict _ppool items
+ for i := 0; i < len(victim); i++ {
+ ppool := &victim[i]
+ ppool.evict(p.Evict)
+ }
+
+ p.mutex.Unlock()
+}
+
+// LocalSize returns the total number of elements in all the proc-local pools
+func (p *Pool) LocalSize() int64 {
+ return atomic.LoadInt64(&p.localSz)
+}
+
+// VictimSize returns the total number of elements in all the victim (old proc-local) pools
+func (p *Pool) VictimSize() int64 {
+ return atomic.LoadInt64(&p.victimSz)
+}
+
+// getSlow is the slow path for fetching an element, attempting to steal from other proc's
+// local pools, and failing that, from the aging-out victim pools. pid is still passed so
+// not all procs start iterating from the same index
+func (p *Pool) getSlow(pid int) interface{} {
+ // get local pools
+ local := p.localPools()
+
+ // Try to steal from other proc locals
+ for i := 0; i < len(local); i++ {
+ pool := &local[(pid+i+1)%len(local)]
+ if v := pool.get(); v != nil {
+ atomic.AddInt64(&p.localSz, -1)
+ return v
+ }
+ }
+
+ // get victim pools
+ victim := p.victimPools()
+
+ // Attempt to steal from victim pools
+ for i := 0; i < len(victim); i++ {
+ pool := &victim[(pid+i+1)%len(victim)]
+ if v := pool.get(); v != nil {
+ atomic.AddInt64(&p.victimSz, -1)
+ return v
+ }
+ }
+
+ // Set victim pools to nil (none found)
+ atomic.StorePointer(&p.victim, nil)
+
+ return nil
+}
+
+// localPools safely loads slice of local _ppools
+func (p *Pool) localPools() []ppool {
+ local := atomic.LoadPointer(&p.local)
+ if local == nil {
+ return nil
+ }
+ return *(*[]ppool)(local)
+}
+
+// victimPools safely loads slice of victim _ppools
+func (p *Pool) victimPools() []ppool {
+ victim := atomic.LoadPointer(&p.victim)
+ if victim == nil {
+ return nil
+ }
+ return *(*[]ppool)(victim)
+}
+
+// pin will get fetch pin proc to PID, fetch proc-local _ppool and current PID we're pinned to
+func (p *Pool) pin() (*ppool, int) {
+ for {
+ // get local pools
+ local := p.localPools()
+
+ if len(local) > 0 {
+ // local already initialized
+
+ // pin to current proc
+ pid := runtime_procPin()
+
+ // check for pid local pool
+ if pid < len(local) {
+ return &local[pid], pid
+ }
+
+ // unpin from proc
+ runtime_procUnpin()
+ } else {
+ // local not yet initialized
+
+ // Check functions are set
+ if p.New == nil {
+ panic("new func must not be nil")
+ }
+ if p.Evict == nil {
+ panic("evict func must not be nil")
+ }
+ }
+
+ // allocate local
+ p.allocLocal()
+ }
+}
+
+// allocLocal allocates a new local pool slice, with the old length passed to check
+// if pool was previously nil, or whether a change in GOMAXPROCS occurred
+func (p *Pool) allocLocal() {
+ // get pool lock
+ p.mutex.Lock()
+
+ // Calculate new size to use
+ size := runtime.GOMAXPROCS(0)
+
+ local := p.localPools()
+ if len(local) != size {
+ // GOMAXPROCS changed, reallocate
+ pools := make([]ppool, size)
+ atomic.StorePointer(&p.local, unsafe.Pointer(&pools))
+
+ // Evict old local elements
+ for i := 0; i < len(local); i++ {
+ pool := &local[i]
+ pool.evict(p.Evict)
+ }
+ }
+
+ // Unlock pool
+ p.mutex.Unlock()
+}
+
+// _ppool is a proc local pool
+type _ppool struct {
+ // root is the root element of the _ppool queue,
+ // and protects concurrent access to the queue
+ root unsafe.Pointer
+
+ // private is a proc private member accessible
+ // only to the pid this _ppool is assigned to,
+ // except during evict (hence the unsafe pointer)
+ private unsafe.Pointer
+}
+
+// ppool wraps _ppool with pad.
+type ppool struct {
+ _ppool
+
+ // Prevents false sharing on widespread platforms with
+ // 128 mod (cache line size) = 0 .
+ pad [128 - unsafe.Sizeof(_ppool{})%128]byte
+}
+
+// getPrivate gets the proc private member
+func (pp *_ppool) getPrivate() interface{} {
+ ptr := atomic.SwapPointer(&pp.private, nil)
+ if ptr == nil {
+ return nil
+ }
+ return *(*interface{})(ptr)
+}
+
+// setPrivate sets the proc private member (only if unset)
+func (pp *_ppool) setPrivate(v interface{}) bool {
+ return atomic.CompareAndSwapPointer(&pp.private, nil, unsafe.Pointer(&v))
+}
+
+// get fetches an element from the queue
+func (pp *_ppool) get() interface{} {
+ for {
+ // Attempt to load root elem
+ root := atomic.LoadPointer(&pp.root)
+ if root == nil {
+ return nil
+ }
+
+ // Attempt to consume root elem
+ if root == inUsePtr ||
+ !atomic.CompareAndSwapPointer(&pp.root, root, inUsePtr) {
+ continue
+ }
+
+ // Root becomes next in chain
+ e := (*elem)(root)
+ v := e.value
+
+ // Place new root back in the chain
+ atomic.StorePointer(&pp.root, unsafe.Pointer(e.next))
+ putElem(e)
+
+ return v
+ }
+}
+
+// put places an element in the queue
+func (pp *_ppool) put(v interface{}) {
+ // Prepare next elem
+ e := getElem()
+ e.value = v
+
+ for {
+ // Attempt to load root elem
+ root := atomic.LoadPointer(&pp.root)
+ if root == inUsePtr {
+ continue
+ }
+
+ // Set the next elem value (might be nil)
+ e.next = (*elem)(root)
+
+ // Attempt to store this new value at root
+ if atomic.CompareAndSwapPointer(&pp.root, root, unsafe.Pointer(e)) {
+ break
+ }
+ }
+}
+
+// hook evicts all entries from pool, calling hook on each
+func (pp *_ppool) evict(hook func(interface{})) {
+ if v := pp.getPrivate(); v != nil {
+ hook(v)
+ }
+ for {
+ v := pp.get()
+ if v == nil {
+ break
+ }
+ hook(v)
+ }
+}
+
+// inUsePtr is a ptr used to indicate _ppool is in use
+var inUsePtr = unsafe.Pointer(&elem{
+ next: nil,
+ value: "in_use",
+})
+
+// elem defines an element in the _ppool queue
+type elem struct {
+ next *elem
+ value interface{}
+}
+
+// elemPool is a simple pool of unused elements
+var elemPool = struct {
+ root unsafe.Pointer
+}{}
+
+// getElem fetches a new elem from pool, or creates new
+func getElem() *elem {
+ // Attempt to load root elem
+ root := atomic.LoadPointer(&elemPool.root)
+ if root == nil {
+ return &elem{}
+ }
+
+ // Attempt to consume root elem
+ if root == inUsePtr ||
+ !atomic.CompareAndSwapPointer(&elemPool.root, root, inUsePtr) {
+ return &elem{}
+ }
+
+ // Root becomes next in chain
+ e := (*elem)(root)
+ atomic.StorePointer(&elemPool.root, unsafe.Pointer(e.next))
+ e.next = nil
+
+ return e
+}
+
+// putElem will place element in the pool
+func putElem(e *elem) {
+ e.value = nil
+
+ // Attempt to load root elem
+ root := atomic.LoadPointer(&elemPool.root)
+ if root == inUsePtr {
+ return // drop
+ }
+
+ // Set the next elem value (might be nil)
+ e.next = (*elem)(root)
+
+ // Attempt to store this new value at root
+ atomic.CompareAndSwapPointer(&elemPool.root, root, unsafe.Pointer(e))
+}
+
+//go:linkname runtime_procPin sync.runtime_procPin
+func runtime_procPin() int
+
+//go:linkname runtime_procUnpin sync.runtime_procUnpin
+func runtime_procUnpin()