summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go')
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go59
1 files changed, 42 insertions, 17 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
index 8f602e409..4ed90def8 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
@@ -74,6 +74,7 @@ type PgConn struct {
frontend *pgproto3.Frontend
bgReader *bgreader.BGReader
slowWriteTimer *time.Timer
+ bgReaderStarted chan struct{}
config *Config
@@ -301,8 +302,14 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
pgConn.parameterStatuses = make(map[string]string)
pgConn.status = connStatusConnecting
pgConn.bgReader = bgreader.New(pgConn.conn)
- pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
+ pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
+ func() {
+ pgConn.bgReader.Start()
+ pgConn.bgReaderStarted <- struct{}{}
+ },
+ )
pgConn.slowWriteTimer.Stop()
+ pgConn.bgReaderStarted = make(chan struct{})
pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn)
startupMsg := pgproto3.StartupMessage{
@@ -593,7 +600,7 @@ func (pgConn *PgConn) Frontend() *pgproto3.Frontend {
return pgConn.frontend
}
-// Close closes a connection. It is safe to call Close on a already closed connection. Close attempts a clean close by
+// Close closes a connection. It is safe to call Close on an already closed connection. Close attempts a clean close by
// sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The
// underlying net.Conn.Close() will always be called regardless of any other errors.
func (pgConn *PgConn) Close(ctx context.Context) error {
@@ -935,16 +942,21 @@ func (pgConn *PgConn) CancelRequest(ctx context.Context) error {
buf := make([]byte, 16)
binary.BigEndian.PutUint32(buf[0:4], 16)
binary.BigEndian.PutUint32(buf[4:8], 80877102)
- binary.BigEndian.PutUint32(buf[8:12], uint32(pgConn.pid))
- binary.BigEndian.PutUint32(buf[12:16], uint32(pgConn.secretKey))
- // Postgres will process the request and close the connection
- // so when don't need to read the reply
- // https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.6.7.10
- _, err = cancelConn.Write(buf)
- return err
+ binary.BigEndian.PutUint32(buf[8:12], pgConn.pid)
+ binary.BigEndian.PutUint32(buf[12:16], pgConn.secretKey)
+
+ if _, err := cancelConn.Write(buf); err != nil {
+ return fmt.Errorf("write to connection for cancellation: %w", err)
+ }
+
+ // Wait for the cancel request to be acknowledged by the server.
+ // It copies the behavior of the libpq: https://github.com/postgres/postgres/blob/REL_16_0/src/interfaces/libpq/fe-connect.c#L4946-L4960
+ _, _ = cancelConn.Read(buf)
+
+ return nil
}
-// WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not
+// WaitForNotification waits for a LISTEN/NOTIFY message to be received. It returns an error if a notification was not
// received.
func (pgConn *PgConn) WaitForNotification(ctx context.Context) error {
if err := pgConn.lock(); err != nil {
@@ -1732,10 +1744,16 @@ func (pgConn *PgConn) enterPotentialWriteReadDeadlock() {
// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
func (pgConn *PgConn) exitPotentialWriteReadDeadlock() {
- // The state of the timer is not relevant upon exiting the potential slow write. It may both
- // fire (due to a slow write), or not fire (due to a fast write).
- _ = pgConn.slowWriteTimer.Stop()
- pgConn.bgReader.Stop()
+ if !pgConn.slowWriteTimer.Stop() {
+ // The timer starts its function in a separate goroutine. It is necessary to ensure the background reader has
+ // started before calling Stop. Otherwise, the background reader may not be stopped. That on its own is not a
+ // serious problem. But what is a serious problem is that the background reader may start at an inopportune time in
+ // a subsequent query. For example, if a subsequent query was canceled then a deadline may be set on the net.Conn to
+ // interrupt an in-progress read. After the read is interrupted, but before the deadline is cleared, the background
+ // reader could start and read a deadline error. Then the next query would receive the an unexpected deadline error.
+ <-pgConn.bgReaderStarted
+ pgConn.bgReader.Stop()
+ }
}
func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
@@ -1764,7 +1782,7 @@ func (pgConn *PgConn) SyncConn(ctx context.Context) error {
}
}
- // This should never happen. Only way I can imagine this occuring is if the server is constantly sending data such as
+ // This should never happen. Only way I can imagine this occurring is if the server is constantly sending data such as
// LISTEN/NOTIFY or log notifications such that we never can get an empty buffer.
return errors.New("SyncConn: conn never synchronized")
}
@@ -1830,8 +1848,14 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
pgConn.contextWatcher = newContextWatcher(pgConn.conn)
pgConn.bgReader = bgreader.New(pgConn.conn)
- pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
+ pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
+ func() {
+ pgConn.bgReader.Start()
+ pgConn.bgReaderStarted <- struct{}{}
+ },
+ )
pgConn.slowWriteTimer.Stop()
+ pgConn.bgReaderStarted = make(chan struct{})
pgConn.frontend = hc.Config.BuildFrontend(pgConn.bgReader, pgConn.conn)
return pgConn, nil
@@ -1996,7 +2020,8 @@ func (p *Pipeline) GetResults() (results any, err error) {
for {
msg, err := p.conn.receiveMessage()
if err != nil {
- return nil, err
+ p.conn.asyncClose()
+ return nil, normalizeTimeoutError(p.ctx, err)
}
switch msg := msg.(type) {