summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/pgconn
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgconn')
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go4
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/config.go10
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go59
3 files changed, 49 insertions, 24 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go b/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go
index 8c4b2de3c..064983615 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go
@@ -47,7 +47,7 @@ func (c *PgConn) scramAuth(serverAuthMechanisms []string) error {
return err
}
- // Receive server-first-message payload in a AuthenticationSASLContinue.
+ // Receive server-first-message payload in an AuthenticationSASLContinue.
saslContinue, err := c.rxSASLContinue()
if err != nil {
return err
@@ -67,7 +67,7 @@ func (c *PgConn) scramAuth(serverAuthMechanisms []string) error {
return err
}
- // Receive server-final-message payload in a AuthenticationSASLFinal.
+ // Receive server-final-message payload in an AuthenticationSASLFinal.
saslFinal, err := c.rxSASLFinal()
if err != nil {
return err
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/config.go b/vendor/github.com/jackc/pgx/v5/pgconn/config.go
index 1c2c647d9..db0170e02 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/config.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/config.go
@@ -809,7 +809,7 @@ func makeConnectTimeoutDialFunc(timeout time.Duration) DialFunc {
return d.DialContext
}
-// ValidateConnectTargetSessionAttrsReadWrite is an ValidateConnectFunc that implements libpq compatible
+// ValidateConnectTargetSessionAttrsReadWrite is a ValidateConnectFunc that implements libpq compatible
// target_session_attrs=read-write.
func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read()
@@ -824,7 +824,7 @@ func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgC
return nil
}
-// ValidateConnectTargetSessionAttrsReadOnly is an ValidateConnectFunc that implements libpq compatible
+// ValidateConnectTargetSessionAttrsReadOnly is a ValidateConnectFunc that implements libpq compatible
// target_session_attrs=read-only.
func ValidateConnectTargetSessionAttrsReadOnly(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read()
@@ -839,7 +839,7 @@ func ValidateConnectTargetSessionAttrsReadOnly(ctx context.Context, pgConn *PgCo
return nil
}
-// ValidateConnectTargetSessionAttrsStandby is an ValidateConnectFunc that implements libpq compatible
+// ValidateConnectTargetSessionAttrsStandby is a ValidateConnectFunc that implements libpq compatible
// target_session_attrs=standby.
func ValidateConnectTargetSessionAttrsStandby(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read()
@@ -854,7 +854,7 @@ func ValidateConnectTargetSessionAttrsStandby(ctx context.Context, pgConn *PgCon
return nil
}
-// ValidateConnectTargetSessionAttrsPrimary is an ValidateConnectFunc that implements libpq compatible
+// ValidateConnectTargetSessionAttrsPrimary is a ValidateConnectFunc that implements libpq compatible
// target_session_attrs=primary.
func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read()
@@ -869,7 +869,7 @@ func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgCon
return nil
}
-// ValidateConnectTargetSessionAttrsPreferStandby is an ValidateConnectFunc that implements libpq compatible
+// ValidateConnectTargetSessionAttrsPreferStandby is a ValidateConnectFunc that implements libpq compatible
// target_session_attrs=prefer-standby.
func ValidateConnectTargetSessionAttrsPreferStandby(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read()
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
index 8f602e409..4ed90def8 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
@@ -74,6 +74,7 @@ type PgConn struct {
frontend *pgproto3.Frontend
bgReader *bgreader.BGReader
slowWriteTimer *time.Timer
+ bgReaderStarted chan struct{}
config *Config
@@ -301,8 +302,14 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
pgConn.parameterStatuses = make(map[string]string)
pgConn.status = connStatusConnecting
pgConn.bgReader = bgreader.New(pgConn.conn)
- pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
+ pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
+ func() {
+ pgConn.bgReader.Start()
+ pgConn.bgReaderStarted <- struct{}{}
+ },
+ )
pgConn.slowWriteTimer.Stop()
+ pgConn.bgReaderStarted = make(chan struct{})
pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn)
startupMsg := pgproto3.StartupMessage{
@@ -593,7 +600,7 @@ func (pgConn *PgConn) Frontend() *pgproto3.Frontend {
return pgConn.frontend
}
-// Close closes a connection. It is safe to call Close on a already closed connection. Close attempts a clean close by
+// Close closes a connection. It is safe to call Close on an already closed connection. Close attempts a clean close by
// sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The
// underlying net.Conn.Close() will always be called regardless of any other errors.
func (pgConn *PgConn) Close(ctx context.Context) error {
@@ -935,16 +942,21 @@ func (pgConn *PgConn) CancelRequest(ctx context.Context) error {
buf := make([]byte, 16)
binary.BigEndian.PutUint32(buf[0:4], 16)
binary.BigEndian.PutUint32(buf[4:8], 80877102)
- binary.BigEndian.PutUint32(buf[8:12], uint32(pgConn.pid))
- binary.BigEndian.PutUint32(buf[12:16], uint32(pgConn.secretKey))
- // Postgres will process the request and close the connection
- // so when don't need to read the reply
- // https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.10
- _, err = cancelConn.Write(buf)
- return err
+ binary.BigEndian.PutUint32(buf[8:12], pgConn.pid)
+ binary.BigEndian.PutUint32(buf[12:16], pgConn.secretKey)
+
+ if _, err := cancelConn.Write(buf); err != nil {
+ return fmt.Errorf("write to connection for cancellation: %w", err)
+ }
+
+ // Wait for the cancel request to be acknowledged by the server.
+ // It copies the behavior of the libpq: https://github.com/postgres/postgres/blob/REL_16_0/src/interfaces/libpq/fe-connect.c#L4946-L4960
+ _, _ = cancelConn.Read(buf)
+
+ return nil
}
-// WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not
+// WaitForNotification waits for a LISTEN/NOTIFY message to be received. It returns an error if a notification was not
// received.
func (pgConn *PgConn) WaitForNotification(ctx context.Context) error {
if err := pgConn.lock(); err != nil {
@@ -1732,10 +1744,16 @@ func (pgConn *PgConn) enterPotentialWriteReadDeadlock() {
// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
func (pgConn *PgConn) exitPotentialWriteReadDeadlock() {
- // The state of the timer is not relevant upon exiting the potential slow write. It may both
- // fire (due to a slow write), or not fire (due to a fast write).
- _ = pgConn.slowWriteTimer.Stop()
- pgConn.bgReader.Stop()
+ if !pgConn.slowWriteTimer.Stop() {
+ // The timer starts its function in a separate goroutine. It is necessary to ensure the background reader has
+ // started before calling Stop. Otherwise, the background reader may not be stopped. That on its own is not a
+ // serious problem. But what is a serious problem is that the background reader may start at an inopportune time in
+ // a subsequent query. For example, if a subsequent query was canceled then a deadline may be set on the net.Conn to
+ // interrupt an in-progress read. After the read is interrupted, but before the deadline is cleared, the background
+ // reader could start and read a deadline error. Then the next query would receive the an unexpected deadline error.
+ <-pgConn.bgReaderStarted
+ pgConn.bgReader.Stop()
+ }
}
func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
@@ -1764,7 +1782,7 @@ func (pgConn *PgConn) SyncConn(ctx context.Context) error {
}
}
- // This should never happen. Only way I can imagine this occuring is if the server is constantly sending data such as
+ // This should never happen. Only way I can imagine this occurring is if the server is constantly sending data such as
// LISTEN/NOTIFY or log notifications such that we never can get an empty buffer.
return errors.New("SyncConn: conn never synchronized")
}
@@ -1830,8 +1848,14 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
pgConn.contextWatcher = newContextWatcher(pgConn.conn)
pgConn.bgReader = bgreader.New(pgConn.conn)
- pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
+ pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
+ func() {
+ pgConn.bgReader.Start()
+ pgConn.bgReaderStarted <- struct{}{}
+ },
+ )
pgConn.slowWriteTimer.Stop()
+ pgConn.bgReaderStarted = make(chan struct{})
pgConn.frontend = hc.Config.BuildFrontend(pgConn.bgReader, pgConn.conn)
return pgConn, nil
@@ -1996,7 +2020,8 @@ func (p *Pipeline) GetResults() (results any, err error) {
for {
msg, err := p.conn.receiveMessage()
if err != nil {
- return nil, err
+ p.conn.asyncClose()
+ return nil, normalizeTimeoutError(p.ctx, err)
}
switch msg := msg.(type) {