summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/stdlib/sql.go
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-11-06 14:44:53 +0000
committerLibravatar GitHub <noreply@github.com>2023-11-06 14:44:53 +0000
commit9b76afc851090268316fd8890366957632b49a44 (patch)
tree4f1539dd8f831bb22bad9ae21d8ab1d17278b323 /vendor/github.com/jackc/pgx/v5/stdlib/sql.go
parent[chore]: Bump github.com/tdewolff/minify/v2 from 2.20.0 to 2.20.6 (#2337) (diff)
downloadgotosocial-9b76afc851090268316fd8890366957632b49a44.tar.xz
[chore]: Bump github.com/jackc/pgx/v5 from 5.4.3 to 5.5.0 (#2336)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/stdlib/sql.go')
-rw-r--r--vendor/github.com/jackc/pgx/v5/stdlib/sql.go137
1 files changed, 112 insertions, 25 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/stdlib/sql.go b/vendor/github.com/jackc/pgx/v5/stdlib/sql.go
index 97ecc9b2b..c5be1a3f2 100644
--- a/vendor/github.com/jackc/pgx/v5/stdlib/sql.go
+++ b/vendor/github.com/jackc/pgx/v5/stdlib/sql.go
@@ -14,6 +14,18 @@
// return err
// }
//
+// Or from a *pgxpool.Pool.
+//
+// pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
+// if err != nil {
+// return err
+// }
+//
+// db, err := stdlib.OpenDBFromPool(pool)
+// if err != nil {
+// return err
+// }
+//
// Or a pgx.ConnConfig can be used to set configuration not accessible via connection string. In this case the
// pgx.ConnConfig must first be registered with the driver. This registration returns a connection string which is used
// with sql.Open.
@@ -74,6 +86,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
+ "github.com/jackc/pgx/v5/pgxpool"
)
// Only intrinsic types should be binary format with database/sql.
@@ -125,14 +138,14 @@ func contains(list []string, y string) bool {
type OptionOpenDB func(*connector)
// OptionBeforeConnect provides a callback for before connect. It is passed a shallow copy of the ConnConfig that will
-// be used to connect, so only its immediate members should be modified.
+// be used to connect, so only its immediate members should be modified. Used only if db is opened with *pgx.ConnConfig.
func OptionBeforeConnect(bc func(context.Context, *pgx.ConnConfig) error) OptionOpenDB {
return func(dc *connector) {
dc.BeforeConnect = bc
}
}
-// OptionAfterConnect provides a callback for after connect.
+// OptionAfterConnect provides a callback for after connect. Used only if db is opened with *pgx.ConnConfig.
func OptionAfterConnect(ac func(context.Context, *pgx.Conn) error) OptionOpenDB {
return func(dc *connector) {
dc.AfterConnect = ac
@@ -191,13 +204,42 @@ func GetConnector(config pgx.ConnConfig, opts ...OptionOpenDB) driver.Connector
return c
}
+// GetPoolConnector creates a new driver.Connector from the given *pgxpool.Pool. By using this be sure to set the
+// maximum idle connections of the *sql.DB created with this connector to zero since they must be managed from the
+// *pgxpool.Pool. This is required to avoid acquiring all the connections from the pgxpool and starving any direct
+// users of the pgxpool.
+func GetPoolConnector(pool *pgxpool.Pool, opts ...OptionOpenDB) driver.Connector {
+ c := connector{
+ pool: pool,
+ ResetSession: func(context.Context, *pgx.Conn) error { return nil }, // noop reset session by default
+ driver: pgxDriver,
+ }
+
+ for _, opt := range opts {
+ opt(&c)
+ }
+
+ return c
+}
+
func OpenDB(config pgx.ConnConfig, opts ...OptionOpenDB) *sql.DB {
c := GetConnector(config, opts...)
return sql.OpenDB(c)
}
+// OpenDBFromPool creates a new *sql.DB from the given *pgxpool.Pool. Note that this method automatically sets the
+// maximum number of idle connections in *sql.DB to zero, since they must be managed from the *pgxpool.Pool. This is
+// required to avoid acquiring all the connections from the pgxpool and starving any direct users of the pgxpool.
+func OpenDBFromPool(pool *pgxpool.Pool, opts ...OptionOpenDB) *sql.DB {
+ c := GetPoolConnector(pool, opts...)
+ db := sql.OpenDB(c)
+ db.SetMaxIdleConns(0)
+ return db
+}
+
type connector struct {
pgx.ConnConfig
+ pool *pgxpool.Pool
BeforeConnect func(context.Context, *pgx.ConnConfig) error // function to call before creation of every new connection
AfterConnect func(context.Context, *pgx.Conn) error // function to call after creation of every new connection
ResetSession func(context.Context, *pgx.Conn) error // function is called before a connection is reused
@@ -207,25 +249,53 @@ type connector struct {
// Connect implement driver.Connector interface
func (c connector) Connect(ctx context.Context) (driver.Conn, error) {
var (
- err error
- conn *pgx.Conn
+ connConfig pgx.ConnConfig
+ conn *pgx.Conn
+ close func(context.Context) error
+ err error
)
- // Create a shallow copy of the config, so that BeforeConnect can safely modify it
- connConfig := c.ConnConfig
- if err = c.BeforeConnect(ctx, &connConfig); err != nil {
- return nil, err
- }
+ if c.pool == nil {
+ // Create a shallow copy of the config, so that BeforeConnect can safely modify it
+ connConfig = c.ConnConfig
- if conn, err = pgx.ConnectConfig(ctx, &connConfig); err != nil {
- return nil, err
- }
+ if err = c.BeforeConnect(ctx, &connConfig); err != nil {
+ return nil, err
+ }
- if err = c.AfterConnect(ctx, conn); err != nil {
- return nil, err
+ if conn, err = pgx.ConnectConfig(ctx, &connConfig); err != nil {
+ return nil, err
+ }
+
+ if err = c.AfterConnect(ctx, conn); err != nil {
+ return nil, err
+ }
+
+ close = conn.Close
+ } else {
+ var pconn *pgxpool.Conn
+
+ pconn, err = c.pool.Acquire(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ conn = pconn.Conn()
+
+ close = func(_ context.Context) error {
+ pconn.Release()
+ return nil
+ }
}
- return &Conn{conn: conn, driver: c.driver, connConfig: connConfig, resetSessionFunc: c.ResetSession}, nil
+ return &Conn{
+ conn: conn,
+ close: close,
+ driver: c.driver,
+ connConfig: connConfig,
+ resetSessionFunc: c.ResetSession,
+ psRefCounts: make(map[*pgconn.StatementDescription]int),
+ }, nil
}
// Driver implement driver.Connector interface
@@ -302,9 +372,11 @@ func (dc *driverConnector) Connect(ctx context.Context) (driver.Conn, error) {
c := &Conn{
conn: conn,
+ close: conn.Close,
driver: dc.driver,
connConfig: *connConfig,
resetSessionFunc: func(context.Context, *pgx.Conn) error { return nil },
+ psRefCounts: make(map[*pgconn.StatementDescription]int),
}
return c, nil
@@ -326,11 +398,19 @@ func UnregisterConnConfig(connStr string) {
type Conn struct {
conn *pgx.Conn
- psCount int64 // Counter used for creating unique prepared statement names
+ close func(context.Context) error
driver *Driver
connConfig pgx.ConnConfig
resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused
lastResetSessionTime time.Time
+
+ // psRefCounts contains reference counts for prepared statements. Prepare uses the underlying pgx logic to generate
+ // deterministic statement names from the statement text. If this query has already been prepared then the existing
+ // *pgconn.StatementDescription will be returned. However, this means that if Close is called on the returned Stmt
+ // then the underlying prepared statement will be closed even when the underlying prepared statement is still in use
+ // by another database/sql Stmt. To prevent this psRefCounts keeps track of how many database/sql statements are using
+ // the same underlying statement and only closes the underlying statement when the reference count reaches 0.
+ psRefCounts map[*pgconn.StatementDescription]int
}
// Conn returns the underlying *pgx.Conn
@@ -347,13 +427,11 @@ func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
return nil, driver.ErrBadConn
}
- name := fmt.Sprintf("pgx_%d", c.psCount)
- c.psCount++
-
- sd, err := c.conn.Prepare(ctx, name, query)
+ sd, err := c.conn.Prepare(ctx, query, query)
if err != nil {
return nil, err
}
+ c.psRefCounts[sd]++
return &Stmt{sd: sd, conn: c}, nil
}
@@ -361,7 +439,7 @@ func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
func (c *Conn) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
- return c.conn.Close(ctx)
+ return c.close(ctx)
}
func (c *Conn) Begin() (driver.Tx, error) {
@@ -470,7 +548,7 @@ func (c *Conn) ResetSession(ctx context.Context) error {
now := time.Now()
if now.Sub(c.lastResetSessionTime) > time.Second {
- if err := c.conn.PgConn().CheckConn(); err != nil {
+ if err := c.conn.PgConn().Ping(ctx); err != nil {
return driver.ErrBadConn
}
}
@@ -487,7 +565,16 @@ type Stmt struct {
func (s *Stmt) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
- return s.conn.conn.Deallocate(ctx, s.sd.Name)
+
+ refCount := s.conn.psRefCounts[s.sd]
+ if refCount == 1 {
+ delete(s.conn.psRefCounts, s.sd)
+ } else {
+ s.conn.psRefCounts[s.sd]--
+ return nil
+ }
+
+ return s.conn.conn.Deallocate(ctx, s.sd.SQL)
}
func (s *Stmt) NumInput() int {
@@ -499,7 +586,7 @@ func (s *Stmt) Exec(argsV []driver.Value) (driver.Result, error) {
}
func (s *Stmt) ExecContext(ctx context.Context, argsV []driver.NamedValue) (driver.Result, error) {
- return s.conn.ExecContext(ctx, s.sd.Name, argsV)
+ return s.conn.ExecContext(ctx, s.sd.SQL, argsV)
}
func (s *Stmt) Query(argsV []driver.Value) (driver.Rows, error) {
@@ -507,7 +594,7 @@ func (s *Stmt) Query(argsV []driver.Value) (driver.Rows, error) {
}
func (s *Stmt) QueryContext(ctx context.Context, argsV []driver.NamedValue) (driver.Rows, error) {
- return s.conn.QueryContext(ctx, s.sd.Name, argsV)
+ return s.conn.QueryContext(ctx, s.sd.SQL, argsV)
}
type rowValueFunc func(src []byte) (driver.Value, error)