summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgxpool/pool.go')
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/pool.go717
1 files changed, 0 insertions, 717 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go b/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
deleted file mode 100644
index 270b7617a..000000000
--- a/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
+++ /dev/null
@@ -1,717 +0,0 @@
-package pgxpool
-
-import (
- "context"
- "fmt"
- "math/rand"
- "runtime"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/jackc/pgx/v5"
- "github.com/jackc/pgx/v5/pgconn"
- "github.com/jackc/puddle/v2"
-)
-
-var defaultMaxConns = int32(4)
-var defaultMinConns = int32(0)
-var defaultMaxConnLifetime = time.Hour
-var defaultMaxConnIdleTime = time.Minute * 30
-var defaultHealthCheckPeriod = time.Minute
-
-type connResource struct {
- conn *pgx.Conn
- conns []Conn
- poolRows []poolRow
- poolRowss []poolRows
- maxAgeTime time.Time
-}
-
-func (cr *connResource) getConn(p *Pool, res *puddle.Resource[*connResource]) *Conn {
- if len(cr.conns) == 0 {
- cr.conns = make([]Conn, 128)
- }
-
- c := &cr.conns[len(cr.conns)-1]
- cr.conns = cr.conns[0 : len(cr.conns)-1]
-
- c.res = res
- c.p = p
-
- return c
-}
-
-func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
- if len(cr.poolRows) == 0 {
- cr.poolRows = make([]poolRow, 128)
- }
-
- pr := &cr.poolRows[len(cr.poolRows)-1]
- cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
-
- pr.c = c
- pr.r = r
-
- return pr
-}
-
-func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
- if len(cr.poolRowss) == 0 {
- cr.poolRowss = make([]poolRows, 128)
- }
-
- pr := &cr.poolRowss[len(cr.poolRowss)-1]
- cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
-
- pr.c = c
- pr.r = r
-
- return pr
-}
-
-// Pool allows for connection reuse.
-type Pool struct {
- // 64 bit fields accessed with atomics must be at beginning of struct to guarantee alignment for certain 32-bit
- // architectures. See BUGS section of https://pkg.go.dev/sync/atomic and https://github.com/jackc/pgx/issues/1288.
- newConnsCount int64
- lifetimeDestroyCount int64
- idleDestroyCount int64
-
- p *puddle.Pool[*connResource]
- config *Config
- beforeConnect func(context.Context, *pgx.ConnConfig) error
- afterConnect func(context.Context, *pgx.Conn) error
- beforeAcquire func(context.Context, *pgx.Conn) bool
- afterRelease func(*pgx.Conn) bool
- beforeClose func(*pgx.Conn)
- minConns int32
- maxConns int32
- maxConnLifetime time.Duration
- maxConnLifetimeJitter time.Duration
- maxConnIdleTime time.Duration
- healthCheckPeriod time.Duration
-
- healthCheckChan chan struct{}
-
- acquireTracer AcquireTracer
- releaseTracer ReleaseTracer
-
- closeOnce sync.Once
- closeChan chan struct{}
-}
-
-// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
-// modified.
-type Config struct {
- ConnConfig *pgx.ConnConfig
-
- // BeforeConnect is called before a new connection is made. It is passed a copy of the underlying pgx.ConnConfig and
- // will not impact any existing open connections.
- BeforeConnect func(context.Context, *pgx.ConnConfig) error
-
- // AfterConnect is called after a connection is established, but before it is added to the pool.
- AfterConnect func(context.Context, *pgx.Conn) error
-
- // BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the
- // acquisition or false to indicate that the connection should be destroyed and a different connection should be
- // acquired.
- BeforeAcquire func(context.Context, *pgx.Conn) bool
-
- // AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to
- // return the connection to the pool or false to destroy the connection.
- AfterRelease func(*pgx.Conn) bool
-
- // BeforeClose is called right before a connection is closed and removed from the pool.
- BeforeClose func(*pgx.Conn)
-
- // MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
- MaxConnLifetime time.Duration
-
- // MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
- // This helps prevent all connections from being closed at the exact same time, starving the pool.
- MaxConnLifetimeJitter time.Duration
-
- // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
- MaxConnIdleTime time.Duration
-
- // MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
- MaxConns int32
-
- // MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
- // number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
- // to create new connections.
- MinConns int32
-
- // HealthCheckPeriod is the duration between checks of the health of idle connections.
- HealthCheckPeriod time.Duration
-
- createdByParseConfig bool // Used to enforce created by ParseConfig rule.
-}
-
-// Copy returns a deep copy of the config that is safe to use and modify.
-// The only exception is the tls.Config:
-// according to the tls.Config docs it must not be modified after creation.
-func (c *Config) Copy() *Config {
- newConfig := new(Config)
- *newConfig = *c
- newConfig.ConnConfig = c.ConnConfig.Copy()
- return newConfig
-}
-
-// ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config.
-func (c *Config) ConnString() string { return c.ConnConfig.ConnString() }
-
-// New creates a new Pool. See [ParseConfig] for information on connString format.
-func New(ctx context.Context, connString string) (*Pool, error) {
- config, err := ParseConfig(connString)
- if err != nil {
- return nil, err
- }
-
- return NewWithConfig(ctx, config)
-}
-
-// NewWithConfig creates a new Pool. config must have been created by [ParseConfig].
-func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
- // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
- // zero values.
- if !config.createdByParseConfig {
- panic("config must be created by ParseConfig")
- }
-
- p := &Pool{
- config: config,
- beforeConnect: config.BeforeConnect,
- afterConnect: config.AfterConnect,
- beforeAcquire: config.BeforeAcquire,
- afterRelease: config.AfterRelease,
- beforeClose: config.BeforeClose,
- minConns: config.MinConns,
- maxConns: config.MaxConns,
- maxConnLifetime: config.MaxConnLifetime,
- maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
- maxConnIdleTime: config.MaxConnIdleTime,
- healthCheckPeriod: config.HealthCheckPeriod,
- healthCheckChan: make(chan struct{}, 1),
- closeChan: make(chan struct{}),
- }
-
- if t, ok := config.ConnConfig.Tracer.(AcquireTracer); ok {
- p.acquireTracer = t
- }
-
- if t, ok := config.ConnConfig.Tracer.(ReleaseTracer); ok {
- p.releaseTracer = t
- }
-
- var err error
- p.p, err = puddle.NewPool(
- &puddle.Config[*connResource]{
- Constructor: func(ctx context.Context) (*connResource, error) {
- atomic.AddInt64(&p.newConnsCount, 1)
- connConfig := p.config.ConnConfig.Copy()
-
- // Connection will continue in background even if Acquire is canceled. Ensure that a connect won't hang forever.
- if connConfig.ConnectTimeout <= 0 {
- connConfig.ConnectTimeout = 2 * time.Minute
- }
-
- if p.beforeConnect != nil {
- if err := p.beforeConnect(ctx, connConfig); err != nil {
- return nil, err
- }
- }
-
- conn, err := pgx.ConnectConfig(ctx, connConfig)
- if err != nil {
- return nil, err
- }
-
- if p.afterConnect != nil {
- err = p.afterConnect(ctx, conn)
- if err != nil {
- conn.Close(ctx)
- return nil, err
- }
- }
-
- jitterSecs := rand.Float64() * config.MaxConnLifetimeJitter.Seconds()
- maxAgeTime := time.Now().Add(config.MaxConnLifetime).Add(time.Duration(jitterSecs) * time.Second)
-
- cr := &connResource{
- conn: conn,
- conns: make([]Conn, 64),
- poolRows: make([]poolRow, 64),
- poolRowss: make([]poolRows, 64),
- maxAgeTime: maxAgeTime,
- }
-
- return cr, nil
- },
- Destructor: func(value *connResource) {
- ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
- conn := value.conn
- if p.beforeClose != nil {
- p.beforeClose(conn)
- }
- conn.Close(ctx)
- select {
- case <-conn.PgConn().CleanupDone():
- case <-ctx.Done():
- }
- cancel()
- },
- MaxSize: config.MaxConns,
- },
- )
- if err != nil {
- return nil, err
- }
-
- go func() {
- p.createIdleResources(ctx, int(p.minConns))
- p.backgroundHealthCheck()
- }()
-
- return p, nil
-}
-
-// ParseConfig builds a Config from connString. It parses connString with the same behavior as [pgx.ParseConfig] with the
-// addition of the following variables:
-//
-// - pool_max_conns: integer greater than 0 (default 4)
-// - pool_min_conns: integer 0 or greater (default 0)
-// - pool_max_conn_lifetime: duration string (default 1 hour)
-// - pool_max_conn_idle_time: duration string (default 30 minutes)
-// - pool_health_check_period: duration string (default 1 minute)
-// - pool_max_conn_lifetime_jitter: duration string (default 0)
-//
-// See Config for definitions of these arguments.
-//
-// # Example Keyword/Value
-// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10 pool_max_conn_lifetime=1h30m
-//
-// # Example URL
-// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10&pool_max_conn_lifetime=1h30m
-func ParseConfig(connString string) (*Config, error) {
- connConfig, err := pgx.ParseConfig(connString)
- if err != nil {
- return nil, err
- }
-
- config := &Config{
- ConnConfig: connConfig,
- createdByParseConfig: true,
- }
-
- if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
- delete(connConfig.Config.RuntimeParams, "pool_max_conns")
- n, err := strconv.ParseInt(s, 10, 32)
- if err != nil {
- return nil, fmt.Errorf("cannot parse pool_max_conns: %w", err)
- }
- if n < 1 {
- return nil, fmt.Errorf("pool_max_conns too small: %d", n)
- }
- config.MaxConns = int32(n)
- } else {
- config.MaxConns = defaultMaxConns
- if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns {
- config.MaxConns = numCPU
- }
- }
-
- if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok {
- delete(connConfig.Config.RuntimeParams, "pool_min_conns")
- n, err := strconv.ParseInt(s, 10, 32)
- if err != nil {
- return nil, fmt.Errorf("cannot parse pool_min_conns: %w", err)
- }
- config.MinConns = int32(n)
- } else {
- config.MinConns = defaultMinConns
- }
-
- if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
- delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
- d, err := time.ParseDuration(s)
- if err != nil {
- return nil, fmt.Errorf("invalid pool_max_conn_lifetime: %w", err)
- }
- config.MaxConnLifetime = d
- } else {
- config.MaxConnLifetime = defaultMaxConnLifetime
- }
-
- if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_idle_time"]; ok {
- delete(connConfig.Config.RuntimeParams, "pool_max_conn_idle_time")
- d, err := time.ParseDuration(s)
- if err != nil {
- return nil, fmt.Errorf("invalid pool_max_conn_idle_time: %w", err)
- }
- config.MaxConnIdleTime = d
- } else {
- config.MaxConnIdleTime = defaultMaxConnIdleTime
- }
-
- if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok {
- delete(connConfig.Config.RuntimeParams, "pool_health_check_period")
- d, err := time.ParseDuration(s)
- if err != nil {
- return nil, fmt.Errorf("invalid pool_health_check_period: %w", err)
- }
- config.HealthCheckPeriod = d
- } else {
- config.HealthCheckPeriod = defaultHealthCheckPeriod
- }
-
- if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
- delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
- d, err := time.ParseDuration(s)
- if err != nil {
- return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
- }
- config.MaxConnLifetimeJitter = d
- }
-
- return config, nil
-}
-
-// Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
-// to pool and closed.
-func (p *Pool) Close() {
- p.closeOnce.Do(func() {
- close(p.closeChan)
- p.p.Close()
- })
-}
-
-func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
- return time.Now().After(res.Value().maxAgeTime)
-}
-
-func (p *Pool) triggerHealthCheck() {
- go func() {
- // Destroy is asynchronous so we give it time to actually remove itself from
- // the pool otherwise we might try to check the pool size too soon
- time.Sleep(500 * time.Millisecond)
- select {
- case p.healthCheckChan <- struct{}{}:
- default:
- }
- }()
-}
-
-func (p *Pool) backgroundHealthCheck() {
- ticker := time.NewTicker(p.healthCheckPeriod)
- defer ticker.Stop()
- for {
- select {
- case <-p.closeChan:
- return
- case <-p.healthCheckChan:
- p.checkHealth()
- case <-ticker.C:
- p.checkHealth()
- }
- }
-}
-
-func (p *Pool) checkHealth() {
- for {
- // If checkMinConns failed we don't destroy any connections since we couldn't
- // even get to minConns
- if err := p.checkMinConns(); err != nil {
- // Should we log this error somewhere?
- break
- }
- if !p.checkConnsHealth() {
- // Since we didn't destroy any connections we can stop looping
- break
- }
- // Technically Destroy is asynchronous but 500ms should be enough for it to
- // remove it from the underlying pool
- select {
- case <-p.closeChan:
- return
- case <-time.After(500 * time.Millisecond):
- }
- }
-}
-
-// checkConnsHealth will check all idle connections, destroy a connection if
-// it's idle or too old, and returns true if any were destroyed
-func (p *Pool) checkConnsHealth() bool {
- var destroyed bool
- totalConns := p.Stat().TotalConns()
- resources := p.p.AcquireAllIdle()
- for _, res := range resources {
- // We're okay going under minConns if the lifetime is up
- if p.isExpired(res) && totalConns >= p.minConns {
- atomic.AddInt64(&p.lifetimeDestroyCount, 1)
- res.Destroy()
- destroyed = true
- // Since Destroy is async we manually decrement totalConns.
- totalConns--
- } else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
- atomic.AddInt64(&p.idleDestroyCount, 1)
- res.Destroy()
- destroyed = true
- // Since Destroy is async we manually decrement totalConns.
- totalConns--
- } else {
- res.ReleaseUnused()
- }
- }
- return destroyed
-}
-
-func (p *Pool) checkMinConns() error {
- // TotalConns can include ones that are being destroyed but we should have
- // sleep(500ms) around all of the destroys to help prevent that from throwing
- // off this check
- toCreate := p.minConns - p.Stat().TotalConns()
- if toCreate > 0 {
- return p.createIdleResources(context.Background(), int(toCreate))
- }
- return nil
-}
-
-func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
- ctx, cancel := context.WithCancel(parentCtx)
- defer cancel()
-
- errs := make(chan error, targetResources)
-
- for i := 0; i < targetResources; i++ {
- go func() {
- err := p.p.CreateResource(ctx)
- // Ignore ErrNotAvailable since it means that the pool has become full since we started creating resource.
- if err == puddle.ErrNotAvailable {
- err = nil
- }
- errs <- err
- }()
- }
-
- var firstError error
- for i := 0; i < targetResources; i++ {
- err := <-errs
- if err != nil && firstError == nil {
- cancel()
- firstError = err
- }
- }
-
- return firstError
-}
-
-// Acquire returns a connection (*Conn) from the Pool
-func (p *Pool) Acquire(ctx context.Context) (c *Conn, err error) {
- if p.acquireTracer != nil {
- ctx = p.acquireTracer.TraceAcquireStart(ctx, p, TraceAcquireStartData{})
- defer func() {
- var conn *pgx.Conn
- if c != nil {
- conn = c.Conn()
- }
- p.acquireTracer.TraceAcquireEnd(ctx, p, TraceAcquireEndData{Conn: conn, Err: err})
- }()
- }
-
- for {
- res, err := p.p.Acquire(ctx)
- if err != nil {
- return nil, err
- }
-
- cr := res.Value()
-
- if res.IdleDuration() > time.Second {
- err := cr.conn.Ping(ctx)
- if err != nil {
- res.Destroy()
- continue
- }
- }
-
- if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
- return cr.getConn(p, res), nil
- }
-
- res.Destroy()
- }
-}
-
-// AcquireFunc acquires a *Conn and calls f with that *Conn. ctx will only affect the Acquire. It has no effect on the
-// call of f. The return value is either an error acquiring the *Conn or the return value of f. The *Conn is
-// automatically released after the call of f.
-func (p *Pool) AcquireFunc(ctx context.Context, f func(*Conn) error) error {
- conn, err := p.Acquire(ctx)
- if err != nil {
- return err
- }
- defer conn.Release()
-
- return f(conn)
-}
-
-// AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
-// keep-alive functionality. It does not update pool statistics.
-func (p *Pool) AcquireAllIdle(ctx context.Context) []*Conn {
- resources := p.p.AcquireAllIdle()
- conns := make([]*Conn, 0, len(resources))
- for _, res := range resources {
- cr := res.Value()
- if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
- conns = append(conns, cr.getConn(p, res))
- } else {
- res.Destroy()
- }
- }
-
- return conns
-}
-
-// Reset closes all connections, but leaves the pool open. It is intended for use when an error is detected that would
-// disrupt all connections (such as a network interruption or a server state change).
-//
-// It is safe to reset a pool while connections are checked out. Those connections will be closed when they are returned
-// to the pool.
-func (p *Pool) Reset() {
- p.p.Reset()
-}
-
-// Config returns a copy of config that was used to initialize this pool.
-func (p *Pool) Config() *Config { return p.config.Copy() }
-
-// Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
-func (p *Pool) Stat() *Stat {
- return &Stat{
- s: p.p.Stat(),
- newConnsCount: atomic.LoadInt64(&p.newConnsCount),
- lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
- idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount),
- }
-}
-
-// Exec acquires a connection from the Pool and executes the given SQL.
-// SQL can be either a prepared statement name or an SQL string.
-// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
-// The acquired connection is returned to the pool when the Exec function returns.
-func (p *Pool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) {
- c, err := p.Acquire(ctx)
- if err != nil {
- return pgconn.CommandTag{}, err
- }
- defer c.Release()
-
- return c.Exec(ctx, sql, arguments...)
-}
-
-// Query acquires a connection and executes a query that returns pgx.Rows.
-// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
-// See pgx.Rows documentation to close the returned Rows and return the acquired connection to the Pool.
-//
-// If there is an error, the returned pgx.Rows will be returned in an error state.
-// If preferred, ignore the error returned from Query and handle errors using the returned pgx.Rows.
-//
-// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
-// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
-// needed. See the documentation for those types for details.
-func (p *Pool) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
- c, err := p.Acquire(ctx)
- if err != nil {
- return errRows{err: err}, err
- }
-
- rows, err := c.Query(ctx, sql, args...)
- if err != nil {
- c.Release()
- return errRows{err: err}, err
- }
-
- return c.getPoolRows(rows), nil
-}
-
-// QueryRow acquires a connection and executes a query that is expected
-// to return at most one row (pgx.Row). Errors are deferred until pgx.Row's
-// Scan method is called. If the query selects no rows, pgx.Row's Scan will
-// return ErrNoRows. Otherwise, pgx.Row's Scan scans the first selected row
-// and discards the rest. The acquired connection is returned to the Pool when
-// pgx.Row's Scan method is called.
-//
-// Arguments should be referenced positionally from the SQL string as $1, $2, etc.
-//
-// For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
-// QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
-// needed. See the documentation for those types for details.
-func (p *Pool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
- c, err := p.Acquire(ctx)
- if err != nil {
- return errRow{err: err}
- }
-
- row := c.QueryRow(ctx, sql, args...)
- return c.getPoolRow(row)
-}
-
-func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
- c, err := p.Acquire(ctx)
- if err != nil {
- return errBatchResults{err: err}
- }
-
- br := c.SendBatch(ctx, b)
- return &poolBatchResults{br: br, c: c}
-}
-
-// Begin acquires a connection from the Pool and starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
-// auto-rollback on context cancellation. Begin initiates a transaction block without explicitly setting a transaction mode for the block (see BeginTx with TxOptions if transaction mode is required).
-// *pgxpool.Tx is returned, which implements the pgx.Tx interface.
-// Commit or Rollback must be called on the returned transaction to finalize the transaction block.
-func (p *Pool) Begin(ctx context.Context) (pgx.Tx, error) {
- return p.BeginTx(ctx, pgx.TxOptions{})
-}
-
-// BeginTx acquires a connection from the Pool and starts a transaction with pgx.TxOptions determining the transaction mode.
-// Unlike database/sql, the context only affects the begin command. i.e. there is no auto-rollback on context cancellation.
-// *pgxpool.Tx is returned, which implements the pgx.Tx interface.
-// Commit or Rollback must be called on the returned transaction to finalize the transaction block.
-func (p *Pool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
- c, err := p.Acquire(ctx)
- if err != nil {
- return nil, err
- }
-
- t, err := c.BeginTx(ctx, txOptions)
- if err != nil {
- c.Release()
- return nil, err
- }
-
- return &Tx{t: t, c: c}, nil
-}
-
-func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
- c, err := p.Acquire(ctx)
- if err != nil {
- return 0, err
- }
- defer c.Release()
-
- return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
-}
-
-// Ping acquires a connection from the Pool and executes an empty sql statement against it.
-// If the sql returns without error, the database Ping is considered successful, otherwise, the error is returned.
-func (p *Pool) Ping(ctx context.Context) error {
- c, err := p.Acquire(ctx)
- if err != nil {
- return err
- }
- defer c.Release()
- return c.Ping(ctx)
-}