summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--vendor/github.com/jackc/pgx/v5/CHANGELOG.md14
-rw-r--r--vendor/github.com/jackc/pgx/v5/README.md14
-rw-r--r--vendor/github.com/jackc/pgx/v5/Rakefile2
-rw-r--r--vendor/github.com/jackc/pgx/v5/conn.go21
-rw-r--r--vendor/github.com/jackc/pgx/v5/derived_types.go26
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/sanitize/benchmmark.sh60
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go183
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go240
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go2
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgtype/doc.go19
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgtype/int.go3
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgtype/json.go98
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go21
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go20
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go25
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/pool.go28
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgxpool/stat.go7
-rw-r--r--vendor/github.com/jackc/pgx/v5/rows.go13
-rw-r--r--vendor/github.com/jackc/pgx/v5/tx.go5
-rw-r--r--vendor/modules.txt2
22 files changed, 604 insertions, 205 deletions
diff --git a/go.mod b/go.mod
index 699d99d56..2d687dd35 100644
--- a/go.mod
+++ b/go.mod
@@ -47,7 +47,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/feeds v1.2.0
github.com/gorilla/websocket v1.5.3
- github.com/jackc/pgx/v5 v5.7.2
+ github.com/jackc/pgx/v5 v5.7.3
github.com/k3a/html2text v1.2.1
github.com/microcosm-cc/bluemonday v1.0.27
github.com/miekg/dns v1.1.63
diff --git a/go.sum b/go.sum
index 3ca4813b2..ab230d9d9 100644
--- a/go.sum
+++ b/go.sum
@@ -258,8 +258,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
-github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
-github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
+github.com/jackc/pgx/v5 v5.7.3 h1:PO1wNKj/bTAwxSJnO1Z4Ai8j4magtqg2SLNjEDzcXQo=
+github.com/jackc/pgx/v5 v5.7.3/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
diff --git a/vendor/github.com/jackc/pgx/v5/CHANGELOG.md b/vendor/github.com/jackc/pgx/v5/CHANGELOG.md
index 6470088b2..75578db0a 100644
--- a/vendor/github.com/jackc/pgx/v5/CHANGELOG.md
+++ b/vendor/github.com/jackc/pgx/v5/CHANGELOG.md
@@ -1,3 +1,16 @@
+# 5.7.3 (March 21, 2025)
+
+* Expose EmptyAcquireWaitTime in pgxpool.Stat (vamshiaruru32)
+* Improve SQL sanitizer performance (ninedraft)
+* Fix Scan confusion with json(b), sql.Scanner, and automatic dereferencing (moukoublen, felix-roehrich)
+* Fix Values() for xml type always returning nil instead of []byte
+* Add ability to send Flush message in pipeline mode (zenkovev)
+* Fix pgtype.Timestamp's JSON behavior to match PostgreSQL (pconstantinou)
+* Better error messages when scanning structs (logicbomb)
+* Fix handling of error on batch write (bonnefoa)
+* Match libpq's connection fallback behavior more closely (felix-roehrich)
+* Add MinIdleConns to pgxpool (djahandarie)
+
# 5.7.2 (December 21, 2024)
* Fix prepared statement already exists on batch prepare failure
@@ -9,6 +22,7 @@
* Implement pgtype.UUID.String() (Konstantin Grachev)
* Switch from ExecParams to Exec in ValidateConnectTargetSessionAttrs functions (Alexander Rumyantsev)
* Update golang.org/x/crypto
+* Fix json(b) columns prefer sql.Scanner interface like database/sql (Ludovico Russo)
# 5.7.1 (September 10, 2024)
diff --git a/vendor/github.com/jackc/pgx/v5/README.md b/vendor/github.com/jackc/pgx/v5/README.md
index bbeb1336c..9da49d866 100644
--- a/vendor/github.com/jackc/pgx/v5/README.md
+++ b/vendor/github.com/jackc/pgx/v5/README.md
@@ -92,7 +92,7 @@ See the presentation at Golang Estonia, [PGX Top to Bottom](https://www.youtube.
## Supported Go and PostgreSQL Versions
-pgx supports the same versions of Go and PostgreSQL that are supported by their respective teams. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases and for [PostgreSQL](https://www.postgresql.org/support/versioning/) the major releases in the last 5 years. This means pgx supports Go 1.21 and higher and PostgreSQL 12 and higher. pgx also is tested against the latest version of [CockroachDB](https://www.cockroachlabs.com/product/).
+pgx supports the same versions of Go and PostgreSQL that are supported by their respective teams. For [Go](https://golang.org/doc/devel/release.html#policy) that is the two most recent major releases and for [PostgreSQL](https://www.postgresql.org/support/versioning/) the major releases in the last 5 years. This means pgx supports Go 1.22 and higher and PostgreSQL 13 and higher. pgx also is tested against the latest version of [CockroachDB](https://www.cockroachlabs.com/product/).
## Version Policy
@@ -172,3 +172,15 @@ Supports, structs, maps, slices and custom mapping functions.
### [github.com/z0ne-dev/mgx](https://github.com/z0ne-dev/mgx)
Code first migration library for native pgx (no database/sql abstraction).
+
+### [github.com/amirsalarsafaei/sqlc-pgx-monitoring](https://github.com/amirsalarsafaei/sqlc-pgx-monitoring)
+
+A database monitoring/metrics library for pgx and sqlc. Trace, log and monitor your sqlc query performance using OpenTelemetry.
+
+### [https://github.com/nikolayk812/pgx-outbox](https://github.com/nikolayk812/pgx-outbox)
+
+Simple Golang implementation for transactional outbox pattern for PostgreSQL using jackc/pgx driver.
+
+### [https://github.com/Arlandaren/pgxWrappy](https://github.com/Arlandaren/pgxWrappy)
+
+Simplifies working with the pgx library, providing convenient scanning of nested structures.
diff --git a/vendor/github.com/jackc/pgx/v5/Rakefile b/vendor/github.com/jackc/pgx/v5/Rakefile
index d957573e9..3e3aa5030 100644
--- a/vendor/github.com/jackc/pgx/v5/Rakefile
+++ b/vendor/github.com/jackc/pgx/v5/Rakefile
@@ -2,7 +2,7 @@ require "erb"
rule '.go' => '.go.erb' do |task|
erb = ERB.new(File.read(task.source))
- File.write(task.name, "// Do not edit. Generated from #{task.source}\n" + erb.result(binding))
+ File.write(task.name, "// Code generated from #{task.source}. DO NOT EDIT.\n\n" + erb.result(binding))
sh "goimports", "-w", task.name
end
diff --git a/vendor/github.com/jackc/pgx/v5/conn.go b/vendor/github.com/jackc/pgx/v5/conn.go
index ed6a3a09e..93e2e7182 100644
--- a/vendor/github.com/jackc/pgx/v5/conn.go
+++ b/vendor/github.com/jackc/pgx/v5/conn.go
@@ -420,7 +420,7 @@ func (c *Conn) IsClosed() bool {
return c.pgConn.IsClosed()
}
-func (c *Conn) die(err error) {
+func (c *Conn) die() {
if c.IsClosed() {
return
}
@@ -588,14 +588,6 @@ func (c *Conn) execPrepared(ctx context.Context, sd *pgconn.StatementDescription
return result.CommandTag, result.Err
}
-type unknownArgumentTypeQueryExecModeExecError struct {
- arg any
-}
-
-func (e *unknownArgumentTypeQueryExecModeExecError) Error() string {
- return fmt.Sprintf("cannot use unregistered type %T as query argument in QueryExecModeExec", e.arg)
-}
-
func (c *Conn) execSQLParams(ctx context.Context, sql string, args []any) (pgconn.CommandTag, error) {
err := c.eqb.Build(c.typeMap, nil, args)
if err != nil {
@@ -661,11 +653,12 @@ const (
// should implement pgtype.Int64Valuer.
QueryExecModeExec
- // Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments. Queries
- // are executed in a single round trip. Type mappings can be registered with pgtype.Map.RegisterDefaultPgType. Queries
- // will be rejected that have arguments that are unregistered or ambiguous. e.g. A map[string]string may have the
- // PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use a map[string]string directly as an
- // argument. This mode cannot.
+ // Use the simple protocol. Assume the PostgreSQL query parameter types based on the Go type of the arguments. This is
+ // especially significant for []byte values. []byte values are encoded as PostgreSQL bytea. string must be used
+ // instead for text type values including json and jsonb. Type mappings can be registered with
+ // pgtype.Map.RegisterDefaultPgType. Queries will be rejected that have arguments that are unregistered or ambiguous.
+ // e.g. A map[string]string may have the PostgreSQL type json or hstore. Modes that know the PostgreSQL type can use a
+ // map[string]string directly as an argument. This mode cannot. Queries are executed in a single round trip.
//
// QueryExecModeSimpleProtocol should have the user application visible behavior as QueryExecModeExec. This includes
// the warning regarding differences in text format and binary format encoding with user defined types. There may be
diff --git a/vendor/github.com/jackc/pgx/v5/derived_types.go b/vendor/github.com/jackc/pgx/v5/derived_types.go
index 22ab069cf..72c0a2423 100644
--- a/vendor/github.com/jackc/pgx/v5/derived_types.go
+++ b/vendor/github.com/jackc/pgx/v5/derived_types.go
@@ -161,7 +161,7 @@ type derivedTypeInfo struct {
// The result of this call can be passed into RegisterTypes to complete the process.
func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Type, error) {
m := c.TypeMap()
- if typeNames == nil || len(typeNames) == 0 {
+ if len(typeNames) == 0 {
return nil, fmt.Errorf("No type names were supplied.")
}
@@ -169,13 +169,7 @@ func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Typ
// the SQL not support recent structures such as multirange
serverVersion, _ := serverVersion(c)
sql := buildLoadDerivedTypesSQL(serverVersion, typeNames)
- var rows Rows
- var err error
- if typeNames == nil {
- rows, err = c.Query(ctx, sql, QueryExecModeSimpleProtocol)
- } else {
- rows, err = c.Query(ctx, sql, QueryExecModeSimpleProtocol, typeNames)
- }
+ rows, err := c.Query(ctx, sql, QueryExecModeSimpleProtocol, typeNames)
if err != nil {
return nil, fmt.Errorf("While generating load types query: %w", err)
}
@@ -232,15 +226,15 @@ func (c *Conn) LoadTypes(ctx context.Context, typeNames []string) ([]*pgtype.Typ
default:
return nil, fmt.Errorf("Unknown typtype %q was found while registering %q", ti.Typtype, ti.TypeName)
}
- if type_ != nil {
- m.RegisterType(type_)
- if ti.NspName != "" {
- nspType := &pgtype.Type{Name: ti.NspName + "." + type_.Name, OID: type_.OID, Codec: type_.Codec}
- m.RegisterType(nspType)
- result = append(result, nspType)
- }
- result = append(result, type_)
+
+ // the type_ is imposible to be null
+ m.RegisterType(type_)
+ if ti.NspName != "" {
+ nspType := &pgtype.Type{Name: ti.NspName + "." + type_.Name, OID: type_.OID, Codec: type_.Codec}
+ m.RegisterType(nspType)
+ result = append(result, nspType)
}
+ result = append(result, type_)
}
return result, nil
}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/sanitize/benchmmark.sh b/vendor/github.com/jackc/pgx/v5/internal/sanitize/benchmmark.sh
new file mode 100644
index 000000000..ec0f7b03a
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/sanitize/benchmmark.sh
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+
+current_branch=$(git rev-parse --abbrev-ref HEAD)
+if [ "$current_branch" == "HEAD" ]; then
+ current_branch=$(git rev-parse HEAD)
+fi
+
+restore_branch() {
+ echo "Restoring original branch/commit: $current_branch"
+ git checkout "$current_branch"
+}
+trap restore_branch EXIT
+
+# Check if there are uncommitted changes
+if ! git diff --quiet || ! git diff --cached --quiet; then
+ echo "There are uncommitted changes. Please commit or stash them before running this script."
+ exit 1
+fi
+
+# Ensure that at least one commit argument is passed
+if [ "$#" -lt 1 ]; then
+ echo "Usage: $0 <commit1> <commit2> ... <commitN>"
+ exit 1
+fi
+
+commits=("$@")
+benchmarks_dir=benchmarks
+
+if ! mkdir -p "${benchmarks_dir}"; then
+ echo "Unable to create dir for benchmarks data"
+ exit 1
+fi
+
+# Benchmark results
+bench_files=()
+
+# Run benchmark for each listed commit
+for i in "${!commits[@]}"; do
+ commit="${commits[i]}"
+ git checkout "$commit" || {
+ echo "Failed to checkout $commit"
+ exit 1
+ }
+
+ # Sanitized commmit message
+ commit_message=$(git log -1 --pretty=format:"%s" | tr -c '[:alnum:]-_' '_')
+
+ # Benchmark data will go there
+ bench_file="${benchmarks_dir}/${i}_${commit_message}.bench"
+
+ if ! go test -bench=. -count=10 >"$bench_file"; then
+ echo "Benchmarking failed for commit $commit"
+ exit 1
+ fi
+
+ bench_files+=("$bench_file")
+done
+
+# go install golang.org/x/perf/cmd/benchstat[@latest]
+benchstat "${bench_files[@]}"
diff --git a/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go b/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go
index df58c4484..b516817cb 100644
--- a/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go
+++ b/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go
@@ -4,8 +4,10 @@ import (
"bytes"
"encoding/hex"
"fmt"
+ "slices"
"strconv"
"strings"
+ "sync"
"time"
"unicode/utf8"
)
@@ -24,18 +26,33 @@ type Query struct {
// https://github.com/jackc/pgx/issues/1380
const replacementcharacterwidth = 3
+const maxBufSize = 16384 // 16 Ki
+
+var bufPool = &pool[*bytes.Buffer]{
+ new: func() *bytes.Buffer {
+ return &bytes.Buffer{}
+ },
+ reset: func(b *bytes.Buffer) bool {
+ n := b.Len()
+ b.Reset()
+ return n < maxBufSize
+ },
+}
+
+var null = []byte("null")
+
func (q *Query) Sanitize(args ...any) (string, error) {
argUse := make([]bool, len(args))
- buf := &bytes.Buffer{}
+ buf := bufPool.get()
+ defer bufPool.put(buf)
for _, part := range q.Parts {
- var str string
switch part := part.(type) {
case string:
- str = part
+ buf.WriteString(part)
case int:
argIdx := part - 1
-
+ var p []byte
if argIdx < 0 {
return "", fmt.Errorf("first sql argument must be > 0")
}
@@ -43,34 +60,41 @@ func (q *Query) Sanitize(args ...any) (string, error) {
if argIdx >= len(args) {
return "", fmt.Errorf("insufficient arguments")
}
+
+ // Prevent SQL injection via Line Comment Creation
+ // https://github.com/jackc/pgx/security/advisories/GHSA-m7wr-2xf7-cm9p
+ buf.WriteByte(' ')
+
arg := args[argIdx]
switch arg := arg.(type) {
case nil:
- str = "null"
+ p = null
case int64:
- str = strconv.FormatInt(arg, 10)
+ p = strconv.AppendInt(buf.AvailableBuffer(), arg, 10)
case float64:
- str = strconv.FormatFloat(arg, 'f', -1, 64)
+ p = strconv.AppendFloat(buf.AvailableBuffer(), arg, 'f', -1, 64)
case bool:
- str = strconv.FormatBool(arg)
+ p = strconv.AppendBool(buf.AvailableBuffer(), arg)
case []byte:
- str = QuoteBytes(arg)
+ p = QuoteBytes(buf.AvailableBuffer(), arg)
case string:
- str = QuoteString(arg)
+ p = QuoteString(buf.AvailableBuffer(), arg)
case time.Time:
- str = arg.Truncate(time.Microsecond).Format("'2006-01-02 15:04:05.999999999Z07:00:00'")
+ p = arg.Truncate(time.Microsecond).
+ AppendFormat(buf.AvailableBuffer(), "'2006-01-02 15:04:05.999999999Z07:00:00'")
default:
return "", fmt.Errorf("invalid arg type: %T", arg)
}
argUse[argIdx] = true
+ buf.Write(p)
+
// Prevent SQL injection via Line Comment Creation
// https://github.com/jackc/pgx/security/advisories/GHSA-m7wr-2xf7-cm9p
- str = " " + str + " "
+ buf.WriteByte(' ')
default:
return "", fmt.Errorf("invalid Part type: %T", part)
}
- buf.WriteString(str)
}
for i, used := range argUse {
@@ -82,26 +106,99 @@ func (q *Query) Sanitize(args ...any) (string, error) {
}
func NewQuery(sql string) (*Query, error) {
- l := &sqlLexer{
- src: sql,
- stateFn: rawState,
+ query := &Query{}
+ query.init(sql)
+
+ return query, nil
+}
+
+var sqlLexerPool = &pool[*sqlLexer]{
+ new: func() *sqlLexer {
+ return &sqlLexer{}
+ },
+ reset: func(sl *sqlLexer) bool {
+ *sl = sqlLexer{}
+ return true
+ },
+}
+
+func (q *Query) init(sql string) {
+ parts := q.Parts[:0]
+ if parts == nil {
+ // dirty, but fast heuristic to preallocate for ~90% usecases
+ n := strings.Count(sql, "$") + strings.Count(sql, "--") + 1
+ parts = make([]Part, 0, n)
}
+ l := sqlLexerPool.get()
+ defer sqlLexerPool.put(l)
+
+ l.src = sql
+ l.stateFn = rawState
+ l.parts = parts
+
for l.stateFn != nil {
l.stateFn = l.stateFn(l)
}
- query := &Query{Parts: l.parts}
-
- return query, nil
+ q.Parts = l.parts
}
-func QuoteString(str string) string {
- return "'" + strings.ReplaceAll(str, "'", "''") + "'"
+func QuoteString(dst []byte, str string) []byte {
+ const quote = '\''
+
+ // Preallocate space for the worst case scenario
+ dst = slices.Grow(dst, len(str)*2+2)
+
+ // Add opening quote
+ dst = append(dst, quote)
+
+ // Iterate through the string without allocating
+ for i := 0; i < len(str); i++ {
+ if str[i] == quote {
+ dst = append(dst, quote, quote)
+ } else {
+ dst = append(dst, str[i])
+ }
+ }
+
+ // Add closing quote
+ dst = append(dst, quote)
+
+ return dst
}
-func QuoteBytes(buf []byte) string {
- return `'\x` + hex.EncodeToString(buf) + "'"
+func QuoteBytes(dst, buf []byte) []byte {
+ if len(buf) == 0 {
+ return append(dst, `'\x'`...)
+ }
+
+ // Calculate required length
+ requiredLen := 3 + hex.EncodedLen(len(buf)) + 1
+
+ // Ensure dst has enough capacity
+ if cap(dst)-len(dst) < requiredLen {
+ newDst := make([]byte, len(dst), len(dst)+requiredLen)
+ copy(newDst, dst)
+ dst = newDst
+ }
+
+ // Record original length and extend slice
+ origLen := len(dst)
+ dst = dst[:origLen+requiredLen]
+
+ // Add prefix
+ dst[origLen] = '\''
+ dst[origLen+1] = '\\'
+ dst[origLen+2] = 'x'
+
+ // Encode bytes directly into dst
+ hex.Encode(dst[origLen+3:len(dst)-1], buf)
+
+ // Add suffix
+ dst[len(dst)-1] = '\''
+
+ return dst
}
type sqlLexer struct {
@@ -319,13 +416,45 @@ func multilineCommentState(l *sqlLexer) stateFn {
}
}
+var queryPool = &pool[*Query]{
+ new: func() *Query {
+ return &Query{}
+ },
+ reset: func(q *Query) bool {
+ n := len(q.Parts)
+ q.Parts = q.Parts[:0]
+ return n < 64 // drop too large queries
+ },
+}
+
// SanitizeSQL replaces placeholder values with args. It quotes and escapes args
// as necessary. This function is only safe when standard_conforming_strings is
// on.
func SanitizeSQL(sql string, args ...any) (string, error) {
- query, err := NewQuery(sql)
- if err != nil {
- return "", err
- }
+ query := queryPool.get()
+ query.init(sql)
+ defer queryPool.put(query)
+
return query.Sanitize(args...)
}
+
+type pool[E any] struct {
+ p sync.Pool
+ new func() E
+ reset func(E) bool
+}
+
+func (pool *pool[E]) get() E {
+ v, ok := pool.p.Get().(E)
+ if !ok {
+ v = pool.new()
+ }
+
+ return v
+}
+
+func (p *pool[E]) put(v E) {
+ if p.reset(v) {
+ p.p.Put(v)
+ }
+}
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
index 7efb522a4..14966aa49 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
@@ -1,6 +1,7 @@
package pgconn
import (
+ "container/list"
"context"
"crypto/md5"
"crypto/tls"
@@ -267,12 +268,15 @@ func connectPreferred(ctx context.Context, config *Config, connectOneConfigs []*
var pgErr *PgError
if errors.As(err, &pgErr) {
- const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password
- const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings
- const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
- const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
+ // pgx will try next host even if libpq does not in certain cases (see #2246)
+ // consider change for the next major version
+
+ const ERRCODE_INVALID_PASSWORD = "28P01"
+ const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
+ const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
+
+ // auth failed due to invalid password, db does not exist or user has no permission
if pgErr.Code == ERRCODE_INVALID_PASSWORD ||
- pgErr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && c.tlsConfig != nil ||
pgErr.Code == ERRCODE_INVALID_CATALOG_NAME ||
pgErr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
return nil, allErrors
@@ -1408,9 +1412,8 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
// MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch.
type MultiResultReader struct {
- pgConn *PgConn
- ctx context.Context
- pipeline *Pipeline
+ pgConn *PgConn
+ ctx context.Context
rr *ResultReader
@@ -1443,12 +1446,8 @@ func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error)
switch msg := msg.(type) {
case *pgproto3.ReadyForQuery:
mrr.closed = true
- if mrr.pipeline != nil {
- mrr.pipeline.expectedReadyForQueryCount--
- } else {
- mrr.pgConn.contextWatcher.Unwatch()
- mrr.pgConn.unlock()
- }
+ mrr.pgConn.contextWatcher.Unwatch()
+ mrr.pgConn.unlock()
case *pgproto3.ErrorResponse:
mrr.err = ErrorResponseToPgError(msg)
}
@@ -1672,7 +1671,11 @@ func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error
case *pgproto3.EmptyQueryResponse:
rr.concludeCommand(CommandTag{}, nil)
case *pgproto3.ErrorResponse:
- rr.concludeCommand(CommandTag{}, ErrorResponseToPgError(msg))
+ pgErr := ErrorResponseToPgError(msg)
+ if rr.pipeline != nil {
+ rr.pipeline.state.HandleError(pgErr)
+ }
+ rr.concludeCommand(CommandTag{}, pgErr)
}
return msg, nil
@@ -1773,9 +1776,10 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR
batch.buf, batch.err = (&pgproto3.Sync{}).Encode(batch.buf)
if batch.err != nil {
+ pgConn.contextWatcher.Unwatch()
+ multiResult.err = normalizeTimeoutError(multiResult.ctx, batch.err)
multiResult.closed = true
- multiResult.err = batch.err
- pgConn.unlock()
+ pgConn.asyncClose()
return multiResult
}
@@ -1783,9 +1787,10 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR
defer pgConn.exitPotentialWriteReadDeadlock()
_, err := pgConn.conn.Write(batch.buf)
if err != nil {
+ pgConn.contextWatcher.Unwatch()
+ multiResult.err = normalizeTimeoutError(multiResult.ctx, err)
multiResult.closed = true
- multiResult.err = err
- pgConn.unlock()
+ pgConn.asyncClose()
return multiResult
}
@@ -1999,9 +2004,7 @@ type Pipeline struct {
conn *PgConn
ctx context.Context
- expectedReadyForQueryCount int
- pendingSync bool
-
+ state pipelineState
err error
closed bool
}
@@ -2012,6 +2015,122 @@ type PipelineSync struct{}
// CloseComplete is returned by GetResults when a CloseComplete message is received.
type CloseComplete struct{}
+type pipelineRequestType int
+
+const (
+ pipelineNil pipelineRequestType = iota
+ pipelinePrepare
+ pipelineQueryParams
+ pipelineQueryPrepared
+ pipelineDeallocate
+ pipelineSyncRequest
+ pipelineFlushRequest
+)
+
+type pipelineRequestEvent struct {
+ RequestType pipelineRequestType
+ WasSentToServer bool
+ BeforeFlushOrSync bool
+}
+
+type pipelineState struct {
+ requestEventQueue list.List
+ lastRequestType pipelineRequestType
+ pgErr *PgError
+ expectedReadyForQueryCount int
+}
+
+func (s *pipelineState) Init() {
+ s.requestEventQueue.Init()
+ s.lastRequestType = pipelineNil
+}
+
+func (s *pipelineState) RegisterSendingToServer() {
+ for elem := s.requestEventQueue.Back(); elem != nil; elem = elem.Prev() {
+ val := elem.Value.(pipelineRequestEvent)
+ if val.WasSentToServer {
+ return
+ }
+ val.WasSentToServer = true
+ elem.Value = val
+ }
+}
+
+func (s *pipelineState) registerFlushingBufferOnServer() {
+ for elem := s.requestEventQueue.Back(); elem != nil; elem = elem.Prev() {
+ val := elem.Value.(pipelineRequestEvent)
+ if val.BeforeFlushOrSync {
+ return
+ }
+ val.BeforeFlushOrSync = true
+ elem.Value = val
+ }
+}
+
+func (s *pipelineState) PushBackRequestType(req pipelineRequestType) {
+ if req == pipelineNil {
+ return
+ }
+
+ if req != pipelineFlushRequest {
+ s.requestEventQueue.PushBack(pipelineRequestEvent{RequestType: req})
+ }
+ if req == pipelineFlushRequest || req == pipelineSyncRequest {
+ s.registerFlushingBufferOnServer()
+ }
+ s.lastRequestType = req
+
+ if req == pipelineSyncRequest {
+ s.expectedReadyForQueryCount++
+ }
+}
+
+func (s *pipelineState) ExtractFrontRequestType() pipelineRequestType {
+ for {
+ elem := s.requestEventQueue.Front()
+ if elem == nil {
+ return pipelineNil
+ }
+ val := elem.Value.(pipelineRequestEvent)
+ if !(val.WasSentToServer && val.BeforeFlushOrSync) {
+ return pipelineNil
+ }
+
+ s.requestEventQueue.Remove(elem)
+ if val.RequestType == pipelineSyncRequest {
+ s.pgErr = nil
+ }
+ if s.pgErr == nil {
+ return val.RequestType
+ }
+ }
+}
+
+func (s *pipelineState) HandleError(err *PgError) {
+ s.pgErr = err
+}
+
+func (s *pipelineState) HandleReadyForQuery() {
+ s.expectedReadyForQueryCount--
+}
+
+func (s *pipelineState) PendingSync() bool {
+ var notPendingSync bool
+
+ if elem := s.requestEventQueue.Back(); elem != nil {
+ val := elem.Value.(pipelineRequestEvent)
+ notPendingSync = (val.RequestType == pipelineSyncRequest) && val.WasSentToServer
+ } else {
+ notPendingSync = (s.lastRequestType == pipelineSyncRequest) || (s.lastRequestType == pipelineNil)
+ }
+
+ return !notPendingSync
+}
+
+func (s *pipelineState) ExpectedReadyForQuery() int {
+ return s.expectedReadyForQueryCount
+}
+
// StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent
// to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection
// to normal mode. While in pipeline mode, no methods that communicate with the server may be called except
@@ -2020,16 +2139,21 @@ type CloseComplete struct{}
// Prefer ExecBatch when only sending one group of queries at once.
func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline {
if err := pgConn.lock(); err != nil {
- return &Pipeline{
+ pipeline := &Pipeline{
closed: true,
err: err,
}
+ pipeline.state.Init()
+
+ return pipeline
}
pgConn.pipeline = Pipeline{
conn: pgConn,
ctx: ctx,
}
+ pgConn.pipeline.state.Init()
+
pipeline := &pgConn.pipeline
if ctx != context.Background() {
@@ -2052,10 +2176,10 @@ func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) {
if p.closed {
return
}
- p.pendingSync = true
p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name})
+ p.state.PushBackRequestType(pipelinePrepare)
}
// SendDeallocate deallocates a prepared statement.
@@ -2063,9 +2187,9 @@ func (p *Pipeline) SendDeallocate(name string) {
if p.closed {
return
}
- p.pendingSync = true
p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name})
+ p.state.PushBackRequestType(pipelineDeallocate)
}
// SendQueryParams is the pipeline version of *PgConn.QueryParams.
@@ -2073,12 +2197,12 @@ func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs [
if p.closed {
return
}
- p.pendingSync = true
p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs})
p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
p.conn.frontend.SendExecute(&pgproto3.Execute{})
+ p.state.PushBackRequestType(pipelineQueryParams)
}
// SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared.
@@ -2086,11 +2210,42 @@ func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, para
if p.closed {
return
}
- p.pendingSync = true
p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
p.conn.frontend.SendExecute(&pgproto3.Execute{})
+ p.state.PushBackRequestType(pipelineQueryPrepared)
+}
+
+// SendFlushRequest sends a request for the server to flush its output buffer.
+//
+// The server flushes its output buffer automatically as a result of Sync being called,
+// or on any request when not in pipeline mode; this function is useful to cause the server
+// to flush its output buffer in pipeline mode without establishing a synchronization point.
+// Note that the request is not itself flushed to the server automatically; use Flush if
+// necessary. This copies the behavior of libpq PQsendFlushRequest.
+func (p *Pipeline) SendFlushRequest() {
+ if p.closed {
+ return
+ }
+
+ p.conn.frontend.Send(&pgproto3.Flush{})
+ p.state.PushBackRequestType(pipelineFlushRequest)
+}
+
+// SendPipelineSync marks a synchronization point in a pipeline by sending a sync message
+// without flushing the send buffer. This serves as the delimiter of an implicit
+// transaction and an error recovery point.
+//
+// Note that the request is not itself flushed to the server automatically; use Flush if
+// necessary. This copies the behavior of libpq PQsendPipelineSync.
+func (p *Pipeline) SendPipelineSync() {
+ if p.closed {
+ return
+ }
+
+ p.conn.frontend.SendSync(&pgproto3.Sync{})
+ p.state.PushBackRequestType(pipelineSyncRequest)
}
// Flush flushes the queued requests without establishing a synchronization point.
@@ -2115,28 +2270,14 @@ func (p *Pipeline) Flush() error {
return err
}
+ p.state.RegisterSendingToServer()
return nil
}
// Sync establishes a synchronization point and flushes the queued requests.
func (p *Pipeline) Sync() error {
- if p.closed {
- if p.err != nil {
- return p.err
- }
- return errors.New("pipeline closed")
- }
-
- p.conn.frontend.SendSync(&pgproto3.Sync{})
- err := p.Flush()
- if err != nil {
- return err
- }
-
- p.pendingSync = false
- p.expectedReadyForQueryCount++
-
- return nil
+ p.SendPipelineSync()
+ return p.Flush()
}
// GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or
@@ -2150,7 +2291,7 @@ func (p *Pipeline) GetResults() (results any, err error) {
return nil, errors.New("pipeline closed")
}
- if p.expectedReadyForQueryCount == 0 {
+ if p.state.ExtractFrontRequestType() == pipelineNil {
return nil, nil
}
@@ -2195,13 +2336,13 @@ func (p *Pipeline) getResults() (results any, err error) {
case *pgproto3.CloseComplete:
return &CloseComplete{}, nil
case *pgproto3.ReadyForQuery:
- p.expectedReadyForQueryCount--
+ p.state.HandleReadyForQuery()
return &PipelineSync{}, nil
case *pgproto3.ErrorResponse:
pgErr := ErrorResponseToPgError(msg)
+ p.state.HandleError(pgErr)
return nil, pgErr
}
-
}
}
@@ -2231,6 +2372,7 @@ func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) {
// These should never happen here. But don't take chances that could lead to a deadlock.
case *pgproto3.ErrorResponse:
pgErr := ErrorResponseToPgError(msg)
+ p.state.HandleError(pgErr)
return nil, pgErr
case *pgproto3.CommandComplete:
p.conn.asyncClose()
@@ -2250,7 +2392,7 @@ func (p *Pipeline) Close() error {
p.closed = true
- if p.pendingSync {
+ if p.state.PendingSync() {
p.conn.asyncClose()
p.err = errors.New("pipeline has unsynced requests")
p.conn.contextWatcher.Unwatch()
@@ -2259,7 +2401,7 @@ func (p *Pipeline) Close() error {
return p.err
}
- for p.expectedReadyForQueryCount > 0 {
+ for p.state.ExpectedReadyForQuery() > 0 {
_, err := p.getResults()
if err != nil {
p.err = err
diff --git a/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go b/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go
index d820d3275..67b78515d 100644
--- a/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go
+++ b/vendor/github.com/jackc/pgx/v5/pgproto3/password_message.go
@@ -12,7 +12,7 @@ type PasswordMessage struct {
// Frontend identifies this message as sendable by a PostgreSQL frontend.
func (*PasswordMessage) Frontend() {}
-// Frontend identifies this message as an authentication response.
+// InitialResponse identifies this message as an authentication response.
func (*PasswordMessage) InitialResponse() {}
// Decode decodes src into dst. src must contain the complete message with the exception of the initial 1 byte message
diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/doc.go b/vendor/github.com/jackc/pgx/v5/pgtype/doc.go
index 7687ea8fe..83dfc5de5 100644
--- a/vendor/github.com/jackc/pgx/v5/pgtype/doc.go
+++ b/vendor/github.com/jackc/pgx/v5/pgtype/doc.go
@@ -53,8 +53,8 @@ similar fashion to database/sql. The second is to use a pointer to a pointer.
return err
}
-When using nullable pgtype types as parameters for queries, one has to remember
-to explicitly set their Valid field to true, otherwise the parameter's value will be NULL.
+When using nullable pgtype types as parameters for queries, one has to remember to explicitly set their Valid field to
+true, otherwise the parameter's value will be NULL.
JSON Support
@@ -159,11 +159,16 @@ example_child_records_test.go for an example.
Overview of Scanning Implementation
-The first step is to use the OID to lookup the correct Codec. If the OID is unavailable, Map will try to find the OID
-from previous calls of Map.RegisterDefaultPgType. The Map will call the Codec's PlanScan method to get a plan for
-scanning into the Go value. A Codec will support scanning into one or more Go types. Oftentime these Go types are
-interfaces rather than explicit types. For example, PointCodec can use any Go type that implements the PointScanner and
-PointValuer interfaces.
+The first step is to use the OID to lookup the correct Codec. The Map will call the Codec's PlanScan method to get a
+plan for scanning into the Go value. A Codec will support scanning into one or more Go types. Oftentime these Go types
+are interfaces rather than explicit types. For example, PointCodec can use any Go type that implements the PointScanner
+and PointValuer interfaces.
+
+If a Go value is not supported directly by a Codec then Map will try see if it is a sql.Scanner. If is then that
+interface will be used to scan the value. Most sql.Scanners require the input to be in the text format (e.g. UUIDs and
+numeric). However, pgx will typically have received the value in the binary format. In this case the binary value will be
+parsed, reencoded as text, and then passed to the sql.Scanner. This may incur additional overhead for query results with
+a large number of affected values.
If a Go value is not supported directly by a Codec then Map will try wrapping it with additional logic and try again.
For example, Int8Codec does not support scanning into a renamed type (e.g. type myInt64 int64). But Map will detect that
diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/int.go b/vendor/github.com/jackc/pgx/v5/pgtype/int.go
index 90a20a26a..7a2f8cb24 100644
--- a/vendor/github.com/jackc/pgx/v5/pgtype/int.go
+++ b/vendor/github.com/jackc/pgx/v5/pgtype/int.go
@@ -1,4 +1,5 @@
-// Do not edit. Generated from pgtype/int.go.erb
+// Code generated from pgtype/int.go.erb. DO NOT EDIT.
+
package pgtype
import (
diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/json.go b/vendor/github.com/jackc/pgx/v5/pgtype/json.go
index 48b9f9771..6f7ebb51f 100644
--- a/vendor/github.com/jackc/pgx/v5/pgtype/json.go
+++ b/vendor/github.com/jackc/pgx/v5/pgtype/json.go
@@ -71,6 +71,27 @@ func (c *JSONCodec) PlanEncode(m *Map, oid uint32, format int16, value any) Enco
}
}
+// JSON needs its on scan plan for pointers to handle 'null'::json(b).
+// Consider making pointerPointerScanPlan more flexible in the future.
+type jsonPointerScanPlan struct {
+ next ScanPlan
+}
+
+func (p jsonPointerScanPlan) Scan(src []byte, dst any) error {
+ el := reflect.ValueOf(dst).Elem()
+ if src == nil || string(src) == "null" {
+ el.SetZero()
+ return nil
+ }
+
+ el.Set(reflect.New(el.Type().Elem()))
+ if p.next != nil {
+ return p.next.Scan(src, el.Interface())
+ }
+
+ return nil
+}
+
type encodePlanJSONCodecEitherFormatString struct{}
func (encodePlanJSONCodecEitherFormatString) Encode(value any, buf []byte) (newBuf []byte, err error) {
@@ -117,60 +138,38 @@ func (e *encodePlanJSONCodecEitherFormatMarshal) Encode(value any, buf []byte) (
return buf, nil
}
-func (c *JSONCodec) PlanScan(m *Map, oid uint32, format int16, target any) ScanPlan {
+func (c *JSONCodec) PlanScan(m *Map, oid uint32, formatCode int16, target any) ScanPlan {
+ return c.planScan(m, oid, formatCode, target, 0)
+}
+
+// JSON cannot fallback to pointerPointerScanPlan because of 'null'::json(b),
+// so we need to duplicate the logic here.
+func (c *JSONCodec) planScan(m *Map, oid uint32, formatCode int16, target any, depth int) ScanPlan {
+ if depth > 8 {
+ return &scanPlanFail{m: m, oid: oid, formatCode: formatCode}
+ }
+
switch target.(type) {
case *string:
- return scanPlanAnyToString{}
-
- case **string:
- // This is to fix **string scanning. It seems wrong to special case **string, but it's not clear what a better
- // solution would be.
- //
- // https://github.com/jackc/pgx/issues/1470 -- **string
- // https://github.com/jackc/pgx/issues/1691 -- ** anything else
-
- if wrapperPlan, nextDst, ok := TryPointerPointerScanPlan(target); ok {
- if nextPlan := m.planScan(oid, format, nextDst, 0); nextPlan != nil {
- if _, failed := nextPlan.(*scanPlanFail); !failed {
- wrapperPlan.SetNext(nextPlan)
- return wrapperPlan
- }
- }
- }
-
+ return &scanPlanAnyToString{}
case *[]byte:
- return scanPlanJSONToByteSlice{}
+ return &scanPlanJSONToByteSlice{}
case BytesScanner:
- return scanPlanBinaryBytesToBytesScanner{}
-
- }
-
- // Cannot rely on sql.Scanner being handled later because scanPlanJSONToJSONUnmarshal will take precedence.
- //
- // https://github.com/jackc/pgx/issues/1418
- if isSQLScanner(target) {
- return &scanPlanSQLScanner{formatCode: format}
+ return &scanPlanBinaryBytesToBytesScanner{}
+ case sql.Scanner:
+ return &scanPlanSQLScanner{formatCode: formatCode}
}
- return &scanPlanJSONToJSONUnmarshal{
- unmarshal: c.Unmarshal,
+ rv := reflect.ValueOf(target)
+ if rv.Kind() == reflect.Pointer && rv.Elem().Kind() == reflect.Pointer {
+ var plan jsonPointerScanPlan
+ plan.next = c.planScan(m, oid, formatCode, rv.Elem().Interface(), depth+1)
+ return plan
+ } else {
+ return &scanPlanJSONToJSONUnmarshal{unmarshal: c.Unmarshal}
}
}
-// we need to check if the target is a pointer to a sql.Scanner (or any of the pointer ref tree implements a sql.Scanner).
-//
-// https://github.com/jackc/pgx/issues/2146
-func isSQLScanner(v any) bool {
- val := reflect.ValueOf(v)
- for val.Kind() == reflect.Ptr {
- if _, ok := val.Interface().(sql.Scanner); ok {
- return true
- }
- val = val.Elem()
- }
- return false
-}
-
type scanPlanAnyToString struct{}
func (scanPlanAnyToString) Scan(src []byte, dst any) error {
@@ -198,7 +197,7 @@ type scanPlanJSONToJSONUnmarshal struct {
}
func (s *scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error {
- if src == nil {
+ if src == nil || string(src) == "null" {
dstValue := reflect.ValueOf(dst)
if dstValue.Kind() == reflect.Ptr {
el := dstValue.Elem()
@@ -212,7 +211,12 @@ func (s *scanPlanJSONToJSONUnmarshal) Scan(src []byte, dst any) error {
return fmt.Errorf("cannot scan NULL into %T", dst)
}
- elem := reflect.ValueOf(dst).Elem()
+ v := reflect.ValueOf(dst)
+ if v.Kind() != reflect.Pointer || v.IsNil() {
+ return fmt.Errorf("cannot scan into non-pointer or nil destinations %T", dst)
+ }
+
+ elem := v.Elem()
elem.Set(reflect.Zero(elem.Type()))
return s.unmarshal(src, dst)
diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go
index f9d43edd7..a1083161c 100644
--- a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go
+++ b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype.go
@@ -396,11 +396,7 @@ type scanPlanSQLScanner struct {
}
func (plan *scanPlanSQLScanner) Scan(src []byte, dst any) error {
- scanner := getSQLScanner(dst)
-
- if scanner == nil {
- return fmt.Errorf("cannot scan into %T", dst)
- }
+ scanner := dst.(sql.Scanner)
if src == nil {
// This is necessary because interface value []byte:nil does not equal nil:nil for the binary format path and the
@@ -413,21 +409,6 @@ func (plan *scanPlanSQLScanner) Scan(src []byte, dst any) error {
}
}
-// we don't know if the target is a sql.Scanner or a pointer on a sql.Scanner, so we need to check recursively
-func getSQLScanner(target any) sql.Scanner {
- val := reflect.ValueOf(target)
- for val.Kind() == reflect.Ptr {
- if _, ok := val.Interface().(sql.Scanner); ok {
- if val.IsNil() {
- val.Set(reflect.New(val.Type().Elem()))
- }
- return val.Interface().(sql.Scanner)
- }
- val = val.Elem()
- }
- return nil
-}
-
type scanPlanString struct{}
func (scanPlanString) Scan(src []byte, dst any) error {
diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go
index 9496cb974..8cb512fa5 100644
--- a/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go
+++ b/vendor/github.com/jackc/pgx/v5/pgtype/pgtype_default.go
@@ -91,7 +91,25 @@ func initDefaultMap() {
defaultMap.RegisterType(&Type{Name: "varchar", OID: VarcharOID, Codec: TextCodec{}})
defaultMap.RegisterType(&Type{Name: "xid", OID: XIDOID, Codec: Uint32Codec{}})
defaultMap.RegisterType(&Type{Name: "xid8", OID: XID8OID, Codec: Uint64Codec{}})
- defaultMap.RegisterType(&Type{Name: "xml", OID: XMLOID, Codec: &XMLCodec{Marshal: xml.Marshal, Unmarshal: xml.Unmarshal}})
+ defaultMap.RegisterType(&Type{Name: "xml", OID: XMLOID, Codec: &XMLCodec{
+ Marshal: xml.Marshal,
+ // xml.Unmarshal does not support unmarshalling into *any. However, XMLCodec.DecodeValue calls Unmarshal with a
+ // *any. Wrap xml.Marshal with a function that copies the data into a new byte slice in this case. Not implementing
+ // directly in XMLCodec.DecodeValue to allow for the unlikely possibility that someone uses an alternative XML
+ // unmarshaler that does support unmarshalling into *any.
+ //
+ // https://github.com/jackc/pgx/issues/2227
+ // https://github.com/jackc/pgx/pull/2228
+ Unmarshal: func(data []byte, v any) error {
+ if v, ok := v.(*any); ok {
+ dstBuf := make([]byte, len(data))
+ copy(dstBuf, data)
+ *v = dstBuf
+ return nil
+ }
+ return xml.Unmarshal(data, v)
+ },
+ }})
// Range types
defaultMap.RegisterType(&Type{Name: "daterange", OID: DaterangeOID, Codec: &RangeCodec{ElementType: defaultMap.oidToType[DateOID]}})
diff --git a/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go b/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go
index ff2739d6b..c31f2ac53 100644
--- a/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go
+++ b/vendor/github.com/jackc/pgx/v5/pgtype/timestamp.go
@@ -12,6 +12,7 @@ import (
)
const pgTimestampFormat = "2006-01-02 15:04:05.999999999"
+const jsonISO8601 = "2006-01-02T15:04:05.999999999"
type TimestampScanner interface {
ScanTimestamp(v Timestamp) error
@@ -76,7 +77,7 @@ func (ts Timestamp) MarshalJSON() ([]byte, error) {
switch ts.InfinityModifier {
case Finite:
- s = ts.Time.Format(time.RFC3339Nano)
+ s = ts.Time.Format(jsonISO8601)
case Infinity:
s = "infinity"
case NegativeInfinity:
@@ -104,15 +105,23 @@ func (ts *Timestamp) UnmarshalJSON(b []byte) error {
case "-infinity":
*ts = Timestamp{Valid: true, InfinityModifier: -Infinity}
default:
- // PostgreSQL uses ISO 8601 wihout timezone for to_json function and casting from a string to timestampt
- tim, err := time.Parse(time.RFC3339Nano, *s+"Z")
- if err != nil {
- return err
+ // Parse time with or without timezonr
+ tss := *s
+ // PostgreSQL uses ISO 8601 without timezone for to_json function and casting from a string to timestampt
+ tim, err := time.Parse(time.RFC3339Nano, tss)
+ if err == nil {
+ *ts = Timestamp{Time: tim, Valid: true}
+ return nil
}
-
- *ts = Timestamp{Time: tim, Valid: true}
+ tim, err = time.ParseInLocation(jsonISO8601, tss, time.UTC)
+ if err == nil {
+ *ts = Timestamp{Time: tim, Valid: true}
+ return nil
+ }
+ ts.Valid = false
+ return fmt.Errorf("cannot unmarshal %s to timestamp with layout %s or %s (%w)",
+ *s, time.RFC3339Nano, jsonISO8601, err)
}
-
return nil
}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go b/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
index 270b7617a..e22ed289a 100644
--- a/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/pool.go
@@ -17,6 +17,7 @@ import (
var defaultMaxConns = int32(4)
var defaultMinConns = int32(0)
+var defaultMinIdleConns = int32(0)
var defaultMaxConnLifetime = time.Hour
var defaultMaxConnIdleTime = time.Minute * 30
var defaultHealthCheckPeriod = time.Minute
@@ -87,6 +88,7 @@ type Pool struct {
afterRelease func(*pgx.Conn) bool
beforeClose func(*pgx.Conn)
minConns int32
+ minIdleConns int32
maxConns int32
maxConnLifetime time.Duration
maxConnLifetimeJitter time.Duration
@@ -144,6 +146,13 @@ type Config struct {
// to create new connections.
MinConns int32
+ // MinIdleConns is the minimum number of idle connections in the pool. You can increase this to ensure that
+ // there are always idle connections available. This can help reduce tail latencies during request processing,
+ // as you can avoid the latency of establishing a new connection while handling requests. It is superior
+ // to MinConns for this purpose.
+ // Similar to MinConns, the pool might temporarily dip below MinIdleConns after connection closes.
+ MinIdleConns int32
+
// HealthCheckPeriod is the duration between checks of the health of idle connections.
HealthCheckPeriod time.Duration
@@ -189,6 +198,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
afterRelease: config.AfterRelease,
beforeClose: config.BeforeClose,
minConns: config.MinConns,
+ minIdleConns: config.MinIdleConns,
maxConns: config.MaxConns,
maxConnLifetime: config.MaxConnLifetime,
maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
@@ -271,7 +281,8 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
}
go func() {
- p.createIdleResources(ctx, int(p.minConns))
+ targetIdleResources := max(int(p.minConns), int(p.minIdleConns))
+ p.createIdleResources(ctx, targetIdleResources)
p.backgroundHealthCheck()
}()
@@ -334,6 +345,17 @@ func ParseConfig(connString string) (*Config, error) {
config.MinConns = defaultMinConns
}
+ if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_idle_conns"]; ok {
+ delete(connConfig.Config.RuntimeParams, "pool_min_idle_conns")
+ n, err := strconv.ParseInt(s, 10, 32)
+ if err != nil {
+ return nil, fmt.Errorf("cannot parse pool_min_idle_conns: %w", err)
+ }
+ config.MinIdleConns = int32(n)
+ } else {
+ config.MinIdleConns = defaultMinIdleConns
+ }
+
if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
d, err := time.ParseDuration(s)
@@ -472,7 +494,9 @@ func (p *Pool) checkMinConns() error {
// TotalConns can include ones that are being destroyed but we should have
// sleep(500ms) around all of the destroys to help prevent that from throwing
// off this check
- toCreate := p.minConns - p.Stat().TotalConns()
+
+ // Create the number of connections needed to get to both minConns and minIdleConns
+ toCreate := max(p.minConns-p.Stat().TotalConns(), p.minIdleConns-p.Stat().IdleConns())
if toCreate > 0 {
return p.createIdleResources(context.Background(), int(toCreate))
}
diff --git a/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go b/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go
index cfa0c4c56..e02b6ac39 100644
--- a/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go
+++ b/vendor/github.com/jackc/pgx/v5/pgxpool/stat.go
@@ -82,3 +82,10 @@ func (s *Stat) MaxLifetimeDestroyCount() int64 {
func (s *Stat) MaxIdleDestroyCount() int64 {
return s.idleDestroyCount
}
+
+// EmptyAcquireWaitTime returns the cumulative time waited for successful acquires
+// from the pool for a resource to be released or constructed because the pool was
+// empty.
+func (s *Stat) EmptyAcquireWaitTime() time.Duration {
+ return s.s.EmptyAcquireWaitTime()
+}
diff --git a/vendor/github.com/jackc/pgx/v5/rows.go b/vendor/github.com/jackc/pgx/v5/rows.go
index f23625d4c..f6f26f479 100644
--- a/vendor/github.com/jackc/pgx/v5/rows.go
+++ b/vendor/github.com/jackc/pgx/v5/rows.go
@@ -272,7 +272,7 @@ func (rows *baseRows) Scan(dest ...any) error {
err := rows.scanPlans[i].Scan(values[i], dst)
if err != nil {
- err = ScanArgError{ColumnIndex: i, Err: err}
+ err = ScanArgError{ColumnIndex: i, FieldName: fieldDescriptions[i].Name, Err: err}
rows.fatal(err)
return err
}
@@ -334,11 +334,16 @@ func (rows *baseRows) Conn() *Conn {
type ScanArgError struct {
ColumnIndex int
+ FieldName string
Err error
}
func (e ScanArgError) Error() string {
- return fmt.Sprintf("can't scan into dest[%d]: %v", e.ColumnIndex, e.Err)
+ if e.FieldName == "?column?" { // Don't include the fieldname if it's unknown
+ return fmt.Sprintf("can't scan into dest[%d]: %v", e.ColumnIndex, e.Err)
+ }
+
+ return fmt.Sprintf("can't scan into dest[%d] (col: %s): %v", e.ColumnIndex, e.FieldName, e.Err)
}
func (e ScanArgError) Unwrap() error {
@@ -366,7 +371,7 @@ func ScanRow(typeMap *pgtype.Map, fieldDescriptions []pgconn.FieldDescription, v
err := typeMap.Scan(fieldDescriptions[i].DataTypeOID, fieldDescriptions[i].Format, values[i], d)
if err != nil {
- return ScanArgError{ColumnIndex: i, Err: err}
+ return ScanArgError{ColumnIndex: i, FieldName: fieldDescriptions[i].Name, Err: err}
}
}
@@ -468,6 +473,8 @@ func CollectOneRow[T any](rows Rows, fn RowToFunc[T]) (T, error) {
return value, err
}
+ // The defer rows.Close() won't have executed yet. If the query returned more than one row, rows would still be open.
+ // rows.Close() must be called before rows.Err() so we explicitly call it here.
rows.Close()
return value, rows.Err()
}
diff --git a/vendor/github.com/jackc/pgx/v5/tx.go b/vendor/github.com/jackc/pgx/v5/tx.go
index 168d7ba6c..571e5e00f 100644
--- a/vendor/github.com/jackc/pgx/v5/tx.go
+++ b/vendor/github.com/jackc/pgx/v5/tx.go
@@ -3,7 +3,6 @@ package pgx
import (
"context"
"errors"
- "fmt"
"strconv"
"strings"
@@ -103,7 +102,7 @@ func (c *Conn) BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) {
if err != nil {
// begin should never fail unless there is an underlying connection issue or
// a context timeout. In either case, the connection is possibly broken.
- c.die(errors.New("failed to begin transaction"))
+ c.die()
return nil, err
}
@@ -216,7 +215,7 @@ func (tx *dbTx) Rollback(ctx context.Context) error {
tx.closed = true
if err != nil {
// A rollback failure leaves the connection in an undefined state
- tx.conn.die(fmt.Errorf("rollback failed: %w", err))
+ tx.conn.die()
return err
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 2b1469c62..5aeaa2824 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -587,7 +587,7 @@ github.com/jackc/pgpassfile
# github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761
## explicit; go 1.14
github.com/jackc/pgservicefile
-# github.com/jackc/pgx/v5 v5.7.2
+# github.com/jackc/pgx/v5 v5.7.3
## explicit; go 1.21
github.com/jackc/pgx/v5
github.com/jackc/pgx/v5/internal/iobufpool