diff options
Diffstat (limited to 'vendor/github.com/jackc/puddle/v2')
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/CHANGELOG.md | 79 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/LICENSE | 22 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/README.md | 80 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/context.go | 24 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/doc.go | 11 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go | 85 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go | 39 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/log.go | 32 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/nanotime.go | 16 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/pool.go | 710 | ||||
-rw-r--r-- | vendor/github.com/jackc/puddle/v2/resource_list.go | 28 |
11 files changed, 0 insertions, 1126 deletions
diff --git a/vendor/github.com/jackc/puddle/v2/CHANGELOG.md b/vendor/github.com/jackc/puddle/v2/CHANGELOG.md deleted file mode 100644 index d0d202c74..000000000 --- a/vendor/github.com/jackc/puddle/v2/CHANGELOG.md +++ /dev/null @@ -1,79 +0,0 @@ -# 2.2.2 (September 10, 2024) - -* Add empty acquire time to stats (Maxim Ivanov) -* Stop importing nanotime from runtime via linkname (maypok86) - -# 2.2.1 (July 15, 2023) - -* Fix: CreateResource cannot overflow pool. This changes documented behavior of CreateResource. Previously, - CreateResource could create a resource even if the pool was full. This could cause the pool to overflow. While this - was documented, it was documenting incorrect behavior. CreateResource now returns an error if the pool is full. - -# 2.2.0 (February 11, 2023) - -* Use Go 1.19 atomics and drop go.uber.org/atomic dependency - -# 2.1.2 (November 12, 2022) - -* Restore support to Go 1.18 via go.uber.org/atomic - -# 2.1.1 (November 11, 2022) - -* Fix create resource concurrently with Stat call race - -# 2.1.0 (October 28, 2022) - -* Concurrency control is now implemented with a semaphore. This simplifies some internal logic, resolves a few error conditions (including a deadlock), and improves performance. (Jan Dubsky) -* Go 1.19 is now required for the improved atomic support. - -# 2.0.1 (October 28, 2022) - -* Fix race condition when Close is called concurrently with multiple constructors - -# 2.0.0 (September 17, 2022) - -* Use generics instead of interface{} (Столяров Владимир Алексеевич) -* Add Reset -* Do not cancel resource construction when Acquire is canceled -* NewPool takes Config - -# 1.3.0 (August 27, 2022) - -* Acquire creates resources in background to allow creation to continue after Acquire is canceled (James Hartig) - -# 1.2.1 (December 2, 2021) - -* TryAcquire now does not block when background constructing resource - -# 1.2.0 (November 20, 2021) - -* Add TryAcquire (A. Jensen) -* Fix: remove memory leak / unintentionally pinned memory when shrinking slices (Alexander Staubo) -* Fix: Do not leave pool locked after panic from nil context - -# 1.1.4 (September 11, 2021) - -* Fix: Deadlock in CreateResource if pool was closed during resource acquisition (Dmitriy Matrenichev) - -# 1.1.3 (December 3, 2020) - -* Fix: Failed resource creation could cause concurrent Acquire to hang. (Evgeny Vanslov) - -# 1.1.2 (September 26, 2020) - -* Fix: Resource.Destroy no longer removes itself from the pool before its destructor has completed. -* Fix: Prevent crash when pool is closed while resource is being created. - -# 1.1.1 (April 2, 2020) - -* Pool.Close can be safely called multiple times -* AcquireAllIDle immediately returns nil if pool is closed -* CreateResource checks if pool is closed before taking any action -* Fix potential race condition when CreateResource and Close are called concurrently. CreateResource now checks if pool is closed before adding newly created resource to pool. - -# 1.1.0 (February 5, 2020) - -* Use runtime.nanotime for faster tracking of acquire time and last usage time. -* Track resource idle time to enable client health check logic. (Patrick Ellul) -* Add CreateResource to construct a new resource without acquiring it. (Patrick Ellul) -* Fix deadlock race when acquire is cancelled. (Michael Tharp) diff --git a/vendor/github.com/jackc/puddle/v2/LICENSE b/vendor/github.com/jackc/puddle/v2/LICENSE deleted file mode 100644 index bcc286c54..000000000 --- a/vendor/github.com/jackc/puddle/v2/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -Copyright (c) 2018 Jack Christensen - -MIT License - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/jackc/puddle/v2/README.md b/vendor/github.com/jackc/puddle/v2/README.md deleted file mode 100644 index fa82a9d46..000000000 --- a/vendor/github.com/jackc/puddle/v2/README.md +++ /dev/null @@ -1,80 +0,0 @@ -[](https://pkg.go.dev/github.com/jackc/puddle/v2) - - -# Puddle - -Puddle is a tiny generic resource pool library for Go that uses the standard -context library to signal cancellation of acquires. It is designed to contain -the minimum functionality required for a resource pool. It can be used directly -or it can be used as the base for a domain specific resource pool. For example, -a database connection pool may use puddle internally and implement health checks -and keep-alive behavior without needing to implement any concurrent code of its -own. - -## Features - -* Acquire cancellation via context standard library -* Statistics API for monitoring pool pressure -* No dependencies outside of standard library and golang.org/x/sync -* High performance -* 100% test coverage of reachable code - -## Example Usage - -```go -package main - -import ( - "context" - "log" - "net" - - "github.com/jackc/puddle/v2" -) - -func main() { - constructor := func(context.Context) (net.Conn, error) { - return net.Dial("tcp", "127.0.0.1:8080") - } - destructor := func(value net.Conn) { - value.Close() - } - maxPoolSize := int32(10) - - pool, err := puddle.NewPool(&puddle.Config[net.Conn]{Constructor: constructor, Destructor: destructor, MaxSize: maxPoolSize}) - if err != nil { - log.Fatal(err) - } - - // Acquire resource from the pool. - res, err := pool.Acquire(context.Background()) - if err != nil { - log.Fatal(err) - } - - // Use resource. - _, err = res.Value().Write([]byte{1}) - if err != nil { - log.Fatal(err) - } - - // Release when done. - res.Release() -} -``` - -## Status - -Puddle is stable and feature complete. - -* Bug reports and fixes are welcome. -* New features will usually not be accepted if they can be feasibly implemented in a wrapper. -* Performance optimizations will usually not be accepted unless the performance issue rises to the level of a bug. - -## Supported Go Versions - -puddle supports the same versions of Go that are supported by the Go project. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases. This means puddle supports Go 1.19 and higher. - -## License - -MIT diff --git a/vendor/github.com/jackc/puddle/v2/context.go b/vendor/github.com/jackc/puddle/v2/context.go deleted file mode 100644 index e19d2a609..000000000 --- a/vendor/github.com/jackc/puddle/v2/context.go +++ /dev/null @@ -1,24 +0,0 @@ -package puddle - -import ( - "context" - "time" -) - -// valueCancelCtx combines two contexts into one. One context is used for values and the other is used for cancellation. -type valueCancelCtx struct { - valueCtx context.Context - cancelCtx context.Context -} - -func (ctx *valueCancelCtx) Deadline() (time.Time, bool) { return ctx.cancelCtx.Deadline() } -func (ctx *valueCancelCtx) Done() <-chan struct{} { return ctx.cancelCtx.Done() } -func (ctx *valueCancelCtx) Err() error { return ctx.cancelCtx.Err() } -func (ctx *valueCancelCtx) Value(key any) any { return ctx.valueCtx.Value(key) } - -func newValueCancelCtx(valueCtx, cancelContext context.Context) context.Context { - return &valueCancelCtx{ - valueCtx: valueCtx, - cancelCtx: cancelContext, - } -} diff --git a/vendor/github.com/jackc/puddle/v2/doc.go b/vendor/github.com/jackc/puddle/v2/doc.go deleted file mode 100644 index 818e4a698..000000000 --- a/vendor/github.com/jackc/puddle/v2/doc.go +++ /dev/null @@ -1,11 +0,0 @@ -// Package puddle is a generic resource pool with type-parametrized api. -/* - -Puddle is a tiny generic resource pool library for Go that uses the standard -context library to signal cancellation of acquires. It is designed to contain -the minimum functionality a resource pool needs that cannot be implemented -without concurrency concerns. For example, a database connection pool may use -puddle internally and implement health checks and keep-alive behavior without -needing to implement any concurrent code of its own. -*/ -package puddle diff --git a/vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go b/vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go deleted file mode 100644 index 7e4660c8c..000000000 --- a/vendor/github.com/jackc/puddle/v2/internal/genstack/gen_stack.go +++ /dev/null @@ -1,85 +0,0 @@ -package genstack - -// GenStack implements a generational stack. -// -// GenStack works as common stack except for the fact that all elements in the -// older generation are guaranteed to be popped before any element in the newer -// generation. New elements are always pushed to the current (newest) -// generation. -// -// We could also say that GenStack behaves as a stack in case of a single -// generation, but it behaves as a queue of individual generation stacks. -type GenStack[T any] struct { - // We can represent arbitrary number of generations using 2 stacks. The - // new stack stores all new pushes and the old stack serves all reads. - // Old stack can represent multiple generations. If old == new, then all - // elements pushed in previous (not current) generations have already - // been popped. - - old *stack[T] - new *stack[T] -} - -// NewGenStack creates a new empty GenStack. -func NewGenStack[T any]() *GenStack[T] { - s := &stack[T]{} - return &GenStack[T]{ - old: s, - new: s, - } -} - -func (s *GenStack[T]) Pop() (T, bool) { - // Pushes always append to the new stack, so if the old once becomes - // empty, it will remail empty forever. - if s.old.len() == 0 && s.old != s.new { - s.old = s.new - } - - if s.old.len() == 0 { - var zero T - return zero, false - } - - return s.old.pop(), true -} - -// Push pushes a new element at the top of the stack. -func (s *GenStack[T]) Push(v T) { s.new.push(v) } - -// NextGen starts a new stack generation. -func (s *GenStack[T]) NextGen() { - if s.old == s.new { - s.new = &stack[T]{} - return - } - - // We need to pop from the old stack to the top of the new stack. Let's - // have an example: - // - // Old: <bottom> 4 3 2 1 - // New: <bottom> 8 7 6 5 - // PopOrder: 1 2 3 4 5 6 7 8 - // - // - // To preserve pop order, we have to take all elements from the old - // stack and push them to the top of new stack: - // - // New: 8 7 6 5 4 3 2 1 - // - s.new.push(s.old.takeAll()...) - - // We have the old stack allocated and empty, so why not to reuse it as - // new new stack. - s.old, s.new = s.new, s.old -} - -// Len returns number of elements in the stack. -func (s *GenStack[T]) Len() int { - l := s.old.len() - if s.old != s.new { - l += s.new.len() - } - - return l -} diff --git a/vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go b/vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go deleted file mode 100644 index dbced0c72..000000000 --- a/vendor/github.com/jackc/puddle/v2/internal/genstack/stack.go +++ /dev/null @@ -1,39 +0,0 @@ -package genstack - -// stack is a wrapper around an array implementing a stack. -// -// We cannot use slice to represent the stack because append might change the -// pointer value of the slice. That would be an issue in GenStack -// implementation. -type stack[T any] struct { - arr []T -} - -// push pushes a new element at the top of a stack. -func (s *stack[T]) push(vs ...T) { s.arr = append(s.arr, vs...) } - -// pop pops the stack top-most element. -// -// If stack length is zero, this method panics. -func (s *stack[T]) pop() T { - idx := s.len() - 1 - val := s.arr[idx] - - // Avoid memory leak - var zero T - s.arr[idx] = zero - - s.arr = s.arr[:idx] - return val -} - -// takeAll returns all elements in the stack in order as they are stored - i.e. -// the top-most stack element is the last one. -func (s *stack[T]) takeAll() []T { - arr := s.arr - s.arr = nil - return arr -} - -// len returns number of elements in the stack. -func (s *stack[T]) len() int { return len(s.arr) } diff --git a/vendor/github.com/jackc/puddle/v2/log.go b/vendor/github.com/jackc/puddle/v2/log.go deleted file mode 100644 index b21b94630..000000000 --- a/vendor/github.com/jackc/puddle/v2/log.go +++ /dev/null @@ -1,32 +0,0 @@ -package puddle - -import "unsafe" - -type ints interface { - int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 -} - -// log2Int returns log2 of an integer. This function panics if val < 0. For val -// == 0, returns 0. -func log2Int[T ints](val T) uint8 { - if val <= 0 { - panic("log2 of non-positive number does not exist") - } - - return log2IntRange(val, 0, uint8(8*unsafe.Sizeof(val))) -} - -func log2IntRange[T ints](val T, begin, end uint8) uint8 { - length := end - begin - if length == 1 { - return begin - } - - delim := begin + length/2 - mask := T(1) << delim - if mask > val { - return log2IntRange(val, begin, delim) - } else { - return log2IntRange(val, delim, end) - } -} diff --git a/vendor/github.com/jackc/puddle/v2/nanotime.go b/vendor/github.com/jackc/puddle/v2/nanotime.go deleted file mode 100644 index 8a5351a0d..000000000 --- a/vendor/github.com/jackc/puddle/v2/nanotime.go +++ /dev/null @@ -1,16 +0,0 @@ -package puddle - -import "time" - -// nanotime returns the time in nanoseconds since process start. -// -// This approach, described at -// https://github.com/golang/go/issues/61765#issuecomment-1672090302, -// is fast, monotonic, and portable, and avoids the previous -// dependence on runtime.nanotime using the (unsafe) linkname hack. -// In particular, time.Since does less work than time.Now. -func nanotime() int64 { - return time.Since(globalStart).Nanoseconds() -} - -var globalStart = time.Now() diff --git a/vendor/github.com/jackc/puddle/v2/pool.go b/vendor/github.com/jackc/puddle/v2/pool.go deleted file mode 100644 index c411d2f6e..000000000 --- a/vendor/github.com/jackc/puddle/v2/pool.go +++ /dev/null @@ -1,710 +0,0 @@ -package puddle - -import ( - "context" - "errors" - "sync" - "sync/atomic" - "time" - - "github.com/jackc/puddle/v2/internal/genstack" - "golang.org/x/sync/semaphore" -) - -const ( - resourceStatusConstructing = 0 - resourceStatusIdle = iota - resourceStatusAcquired = iota - resourceStatusHijacked = iota -) - -// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool -// or a pool that is closed while the acquire is waiting. -var ErrClosedPool = errors.New("closed pool") - -// ErrNotAvailable occurs on an attempt to acquire a resource from a pool -// that is at maximum capacity and has no available resources. -var ErrNotAvailable = errors.New("resource not available") - -// Constructor is a function called by the pool to construct a resource. -type Constructor[T any] func(ctx context.Context) (res T, err error) - -// Destructor is a function called by the pool to destroy a resource. -type Destructor[T any] func(res T) - -// Resource is the resource handle returned by acquiring from the pool. -type Resource[T any] struct { - value T - pool *Pool[T] - creationTime time.Time - lastUsedNano int64 - poolResetCount int - status byte -} - -// Value returns the resource value. -func (res *Resource[T]) Value() T { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - return res.value -} - -// Release returns the resource to the pool. res must not be subsequently used. -func (res *Resource[T]) Release() { - if res.status != resourceStatusAcquired { - panic("tried to release resource that is not acquired") - } - res.pool.releaseAcquiredResource(res, nanotime()) -} - -// ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime -// will not change. res must not be subsequently used. -func (res *Resource[T]) ReleaseUnused() { - if res.status != resourceStatusAcquired { - panic("tried to release resource that is not acquired") - } - res.pool.releaseAcquiredResource(res, res.lastUsedNano) -} - -// Destroy returns the resource to the pool for destruction. res must not be -// subsequently used. -func (res *Resource[T]) Destroy() { - if res.status != resourceStatusAcquired { - panic("tried to destroy resource that is not acquired") - } - go res.pool.destroyAcquiredResource(res) -} - -// Hijack assumes ownership of the resource from the pool. Caller is responsible -// for cleanup of resource value. -func (res *Resource[T]) Hijack() { - if res.status != resourceStatusAcquired { - panic("tried to hijack resource that is not acquired") - } - res.pool.hijackAcquiredResource(res) -} - -// CreationTime returns when the resource was created by the pool. -func (res *Resource[T]) CreationTime() time.Time { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - return res.creationTime -} - -// LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time -// (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with -// other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead. -func (res *Resource[T]) LastUsedNanotime() int64 { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - - return res.lastUsedNano -} - -// IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting -// LastUsedNanotime to the current nanotime. -func (res *Resource[T]) IdleDuration() time.Duration { - if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) { - panic("tried to access resource that is not acquired or hijacked") - } - - return time.Duration(nanotime() - res.lastUsedNano) -} - -// Pool is a concurrency-safe resource pool. -type Pool[T any] struct { - // mux is the pool internal lock. Any modification of shared state of - // the pool (but Acquires of acquireSem) must be performed only by - // holder of the lock. Long running operations are not allowed when mux - // is held. - mux sync.Mutex - // acquireSem provides an allowance to acquire a resource. - // - // Releases are allowed only when caller holds mux. Acquires have to - // happen before mux is locked (doesn't apply to semaphore.TryAcquire in - // AcquireAllIdle). - acquireSem *semaphore.Weighted - destructWG sync.WaitGroup - - allResources resList[T] - idleResources *genstack.GenStack[*Resource[T]] - - constructor Constructor[T] - destructor Destructor[T] - maxSize int32 - - acquireCount int64 - acquireDuration time.Duration - emptyAcquireCount int64 - emptyAcquireWaitTime time.Duration - canceledAcquireCount atomic.Int64 - - resetCount int - - baseAcquireCtx context.Context - cancelBaseAcquireCtx context.CancelFunc - closed bool -} - -type Config[T any] struct { - Constructor Constructor[T] - Destructor Destructor[T] - MaxSize int32 -} - -// NewPool creates a new pool. Returns an error iff MaxSize is less than 1. -func NewPool[T any](config *Config[T]) (*Pool[T], error) { - if config.MaxSize < 1 { - return nil, errors.New("MaxSize must be >= 1") - } - - baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background()) - - return &Pool[T]{ - acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), - idleResources: genstack.NewGenStack[*Resource[T]](), - maxSize: config.MaxSize, - constructor: config.Constructor, - destructor: config.Destructor, - baseAcquireCtx: baseAcquireCtx, - cancelBaseAcquireCtx: cancelBaseAcquireCtx, - }, nil -} - -// Close destroys all resources in the pool and rejects future Acquire calls. -// Blocks until all resources are returned to pool and destroyed. -func (p *Pool[T]) Close() { - defer p.destructWG.Wait() - - p.mux.Lock() - defer p.mux.Unlock() - - if p.closed { - return - } - p.closed = true - p.cancelBaseAcquireCtx() - - for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() { - p.allResources.remove(res) - go p.destructResourceValue(res.value) - } -} - -// Stat is a snapshot of Pool statistics. -type Stat struct { - constructingResources int32 - acquiredResources int32 - idleResources int32 - maxResources int32 - acquireCount int64 - acquireDuration time.Duration - emptyAcquireCount int64 - emptyAcquireWaitTime time.Duration - canceledAcquireCount int64 -} - -// TotalResources returns the total number of resources currently in the pool. -// The value is the sum of ConstructingResources, AcquiredResources, and -// IdleResources. -func (s *Stat) TotalResources() int32 { - return s.constructingResources + s.acquiredResources + s.idleResources -} - -// ConstructingResources returns the number of resources with construction in progress in -// the pool. -func (s *Stat) ConstructingResources() int32 { - return s.constructingResources -} - -// AcquiredResources returns the number of currently acquired resources in the pool. -func (s *Stat) AcquiredResources() int32 { - return s.acquiredResources -} - -// IdleResources returns the number of currently idle resources in the pool. -func (s *Stat) IdleResources() int32 { - return s.idleResources -} - -// MaxResources returns the maximum size of the pool. -func (s *Stat) MaxResources() int32 { - return s.maxResources -} - -// AcquireCount returns the cumulative count of successful acquires from the pool. -func (s *Stat) AcquireCount() int64 { - return s.acquireCount -} - -// AcquireDuration returns the total duration of all successful acquires from -// the pool. -func (s *Stat) AcquireDuration() time.Duration { - return s.acquireDuration -} - -// EmptyAcquireCount returns the cumulative count of successful acquires from the pool -// that waited for a resource to be released or constructed because the pool was -// empty. -func (s *Stat) EmptyAcquireCount() int64 { - return s.emptyAcquireCount -} - -// EmptyAcquireWaitTime returns the cumulative time waited for successful acquires -// from the pool for a resource to be released or constructed because the pool was -// empty. -func (s *Stat) EmptyAcquireWaitTime() time.Duration { - return s.emptyAcquireWaitTime -} - -// CanceledAcquireCount returns the cumulative count of acquires from the pool -// that were canceled by a context. -func (s *Stat) CanceledAcquireCount() int64 { - return s.canceledAcquireCount -} - -// Stat returns the current pool statistics. -func (p *Pool[T]) Stat() *Stat { - p.mux.Lock() - defer p.mux.Unlock() - - s := &Stat{ - maxResources: p.maxSize, - acquireCount: p.acquireCount, - emptyAcquireCount: p.emptyAcquireCount, - emptyAcquireWaitTime: p.emptyAcquireWaitTime, - canceledAcquireCount: p.canceledAcquireCount.Load(), - acquireDuration: p.acquireDuration, - } - - for _, res := range p.allResources { - switch res.status { - case resourceStatusConstructing: - s.constructingResources += 1 - case resourceStatusIdle: - s.idleResources += 1 - case resourceStatusAcquired: - s.acquiredResources += 1 - } - } - - return s -} - -// tryAcquireIdleResource checks if there is any idle resource. If there is -// some, this method removes it from idle list and returns it. If the idle pool -// is empty, this method returns nil and doesn't modify the idleResources slice. -// -// WARNING: Caller of this method must hold the pool mutex! -func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] { - res, ok := p.idleResources.Pop() - if !ok { - return nil - } - - res.status = resourceStatusAcquired - return res -} - -// createNewResource creates a new resource and inserts it into list of pool -// resources. -// -// WARNING: Caller of this method must hold the pool mutex! -func (p *Pool[T]) createNewResource() *Resource[T] { - res := &Resource[T]{ - pool: p, - creationTime: time.Now(), - lastUsedNano: nanotime(), - poolResetCount: p.resetCount, - status: resourceStatusConstructing, - } - - p.allResources.append(res) - p.destructWG.Add(1) - - return res -} - -// Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will -// create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be -// used to cancel the Acquire. -// -// If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to -// ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids -// the problem of it being impossible to create resources when the time to create a resource is greater than any one -// caller of Acquire is willing to wait. -func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { - select { - case <-ctx.Done(): - p.canceledAcquireCount.Add(1) - return nil, ctx.Err() - default: - } - - return p.acquire(ctx) -} - -// acquire is a continuation of Acquire function that doesn't check context -// validity. -// -// This function exists solely only for benchmarking purposes. -func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { - startNano := nanotime() - - var waitedForLock bool - if !p.acquireSem.TryAcquire(1) { - waitedForLock = true - err := p.acquireSem.Acquire(ctx, 1) - if err != nil { - p.canceledAcquireCount.Add(1) - return nil, err - } - } - - p.mux.Lock() - if p.closed { - p.acquireSem.Release(1) - p.mux.Unlock() - return nil, ErrClosedPool - } - - // If a resource is available in the pool. - if res := p.tryAcquireIdleResource(); res != nil { - waitTime := time.Duration(nanotime() - startNano) - if waitedForLock { - p.emptyAcquireCount += 1 - p.emptyAcquireWaitTime += waitTime - } - p.acquireCount += 1 - p.acquireDuration += waitTime - p.mux.Unlock() - return res, nil - } - - if len(p.allResources) >= int(p.maxSize) { - // Unreachable code. - panic("bug: semaphore allowed more acquires than pool allows") - } - - // The resource is not idle, but there is enough space to create one. - res := p.createNewResource() - p.mux.Unlock() - - res, err := p.initResourceValue(ctx, res) - if err != nil { - return nil, err - } - - p.mux.Lock() - defer p.mux.Unlock() - - p.emptyAcquireCount += 1 - p.acquireCount += 1 - waitTime := time.Duration(nanotime() - startNano) - p.acquireDuration += waitTime - p.emptyAcquireWaitTime += waitTime - - return res, nil -} - -func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) { - // Create the resource in a goroutine to immediately return from Acquire - // if ctx is canceled without also canceling the constructor. - // - // See: - // - https://github.com/jackc/pgx/issues/1287 - // - https://github.com/jackc/pgx/issues/1259 - constructErrChan := make(chan error) - go func() { - constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx) - value, err := p.constructor(constructorCtx) - if err != nil { - p.mux.Lock() - p.allResources.remove(res) - p.destructWG.Done() - - // The resource won't be acquired because its - // construction failed. We have to allow someone else to - // take that resouce. - p.acquireSem.Release(1) - p.mux.Unlock() - - select { - case constructErrChan <- err: - case <-ctx.Done(): - // The caller is cancelled, so no-one awaits the - // error. This branch avoid goroutine leak. - } - return - } - - // The resource is already in p.allResources where it might be read. So we need to acquire the lock to update its - // status. - p.mux.Lock() - res.value = value - res.status = resourceStatusAcquired - p.mux.Unlock() - - // This select works because the channel is unbuffered. - select { - case constructErrChan <- nil: - case <-ctx.Done(): - p.releaseAcquiredResource(res, res.lastUsedNano) - } - }() - - select { - case <-ctx.Done(): - p.canceledAcquireCount.Add(1) - return nil, ctx.Err() - case err := <-constructErrChan: - if err != nil { - return nil, err - } - return res, nil - } -} - -// TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no -// resources are available but the pool has room to grow, a resource will be created in the background. ctx is only -// used to cancel the background creation. -func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { - if !p.acquireSem.TryAcquire(1) { - return nil, ErrNotAvailable - } - - p.mux.Lock() - defer p.mux.Unlock() - - if p.closed { - p.acquireSem.Release(1) - return nil, ErrClosedPool - } - - // If a resource is available now - if res := p.tryAcquireIdleResource(); res != nil { - p.acquireCount += 1 - return res, nil - } - - if len(p.allResources) >= int(p.maxSize) { - // Unreachable code. - panic("bug: semaphore allowed more acquires than pool allows") - } - - res := p.createNewResource() - go func() { - value, err := p.constructor(ctx) - - p.mux.Lock() - defer p.mux.Unlock() - // We have to create the resource and only then release the - // semaphore - For the time being there is no resource that - // someone could acquire. - defer p.acquireSem.Release(1) - - if err != nil { - p.allResources.remove(res) - p.destructWG.Done() - return - } - - res.value = value - res.status = resourceStatusIdle - p.idleResources.Push(res) - }() - - return nil, ErrNotAvailable -} - -// acquireSemAll tries to acquire num free tokens from sem. This function is -// guaranteed to acquire at least the lowest number of tokens that has been -// available in the semaphore during runtime of this function. -// -// For the time being, semaphore doesn't allow to acquire all tokens atomically -// (see https://github.com/golang/sync/pull/19). We simulate this by trying all -// powers of 2 that are less or equal to num. -// -// For example, let's immagine we have 19 free tokens in the semaphore which in -// total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if -// num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1 -// tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed. -// -// Naturally, Acquires and Releases of the semaphore might take place -// concurrently. For this reason, it's not guaranteed that absolutely all free -// tokens in the semaphore will be acquired. But it's guaranteed that at least -// the minimal number of tokens that has been present over the whole process -// will be acquired. This is sufficient for the use-case we have in this -// package. -// -// TODO: Replace this with acquireSem.TryAcquireAll() if it gets to -// upstream. https://github.com/golang/sync/pull/19 -func acquireSemAll(sem *semaphore.Weighted, num int) int { - if sem.TryAcquire(int64(num)) { - return num - } - - var acquired int - for i := int(log2Int(num)); i >= 0; i-- { - val := 1 << i - if sem.TryAcquire(int64(val)) { - acquired += val - } - } - - return acquired -} - -// AcquireAllIdle acquires all currently idle resources. Its intended use is for -// health check and keep-alive functionality. It does not update pool -// statistics. -func (p *Pool[T]) AcquireAllIdle() []*Resource[T] { - p.mux.Lock() - defer p.mux.Unlock() - - if p.closed { - return nil - } - - numIdle := p.idleResources.Len() - if numIdle == 0 { - return nil - } - - // In acquireSemAll we use only TryAcquire and not Acquire. Because - // TryAcquire cannot block, the fact that we hold mutex locked and try - // to acquire semaphore cannot result in dead-lock. - // - // Because the mutex is locked, no parallel Release can run. This - // implies that the number of tokens can only decrease because some - // Acquire/TryAcquire call can consume the semaphore token. Consequently - // acquired is always less or equal to numIdle. Moreover if acquired < - // numIdle, then there are some parallel Acquire/TryAcquire calls that - // will take the remaining idle connections. - acquired := acquireSemAll(p.acquireSem, numIdle) - - idle := make([]*Resource[T], acquired) - for i := range idle { - res, _ := p.idleResources.Pop() - res.status = resourceStatusAcquired - idle[i] = res - } - - // We have to bump the generation to ensure that Acquire/TryAcquire - // calls running in parallel (those which caused acquired < numIdle) - // will consume old connections and not freshly released connections - // instead. - p.idleResources.NextGen() - - return idle -} - -// CreateResource constructs a new resource without acquiring it. It goes straight in the IdlePool. If the pool is full -// it returns an error. It can be useful to maintain warm resources under little load. -func (p *Pool[T]) CreateResource(ctx context.Context) error { - if !p.acquireSem.TryAcquire(1) { - return ErrNotAvailable - } - - p.mux.Lock() - if p.closed { - p.acquireSem.Release(1) - p.mux.Unlock() - return ErrClosedPool - } - - if len(p.allResources) >= int(p.maxSize) { - p.acquireSem.Release(1) - p.mux.Unlock() - return ErrNotAvailable - } - - res := p.createNewResource() - p.mux.Unlock() - - value, err := p.constructor(ctx) - p.mux.Lock() - defer p.mux.Unlock() - defer p.acquireSem.Release(1) - if err != nil { - p.allResources.remove(res) - p.destructWG.Done() - return err - } - - res.value = value - res.status = resourceStatusIdle - - // If closed while constructing resource then destroy it and return an error - if p.closed { - go p.destructResourceValue(res.value) - return ErrClosedPool - } - - p.idleResources.Push(res) - - return nil -} - -// Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would -// disrupt all resources (such as a network interruption or a server state change). -// -// It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned -// to the pool. -func (p *Pool[T]) Reset() { - p.mux.Lock() - defer p.mux.Unlock() - - p.resetCount++ - - for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() { - p.allResources.remove(res) - go p.destructResourceValue(res.value) - } -} - -// releaseAcquiredResource returns res to the the pool. -func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) { - p.mux.Lock() - defer p.mux.Unlock() - defer p.acquireSem.Release(1) - - if p.closed || res.poolResetCount != p.resetCount { - p.allResources.remove(res) - go p.destructResourceValue(res.value) - } else { - res.lastUsedNano = lastUsedNano - res.status = resourceStatusIdle - p.idleResources.Push(res) - } -} - -// Remove removes res from the pool and closes it. If res is not part of the -// pool Remove will panic. -func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) { - p.destructResourceValue(res.value) - - p.mux.Lock() - defer p.mux.Unlock() - defer p.acquireSem.Release(1) - - p.allResources.remove(res) -} - -func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) { - p.mux.Lock() - defer p.mux.Unlock() - defer p.acquireSem.Release(1) - - p.allResources.remove(res) - res.status = resourceStatusHijacked - p.destructWG.Done() // not responsible for destructing hijacked resources -} - -func (p *Pool[T]) destructResourceValue(value T) { - p.destructor(value) - p.destructWG.Done() -} diff --git a/vendor/github.com/jackc/puddle/v2/resource_list.go b/vendor/github.com/jackc/puddle/v2/resource_list.go deleted file mode 100644 index b2430959b..000000000 --- a/vendor/github.com/jackc/puddle/v2/resource_list.go +++ /dev/null @@ -1,28 +0,0 @@ -package puddle - -type resList[T any] []*Resource[T] - -func (l *resList[T]) append(val *Resource[T]) { *l = append(*l, val) } - -func (l *resList[T]) popBack() *Resource[T] { - idx := len(*l) - 1 - val := (*l)[idx] - (*l)[idx] = nil // Avoid memory leak - *l = (*l)[:idx] - - return val -} - -func (l *resList[T]) remove(val *Resource[T]) { - for i, elem := range *l { - if elem == val { - lastIdx := len(*l) - 1 - (*l)[i] = (*l)[lastIdx] - (*l)[lastIdx] = nil // Avoid memory leak - (*l) = (*l)[:lastIdx] - return - } - } - - panic("BUG: removeResource could not find res in slice") -} |