summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go')
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go399
1 files changed, 249 insertions, 150 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
index 0bf03f335..7efb522a4 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
@@ -18,8 +18,8 @@ import (
"github.com/jackc/pgx/v5/internal/iobufpool"
"github.com/jackc/pgx/v5/internal/pgio"
+ "github.com/jackc/pgx/v5/pgconn/ctxwatch"
"github.com/jackc/pgx/v5/pgconn/internal/bgreader"
- "github.com/jackc/pgx/v5/pgconn/internal/ctxwatch"
"github.com/jackc/pgx/v5/pgproto3"
)
@@ -82,6 +82,8 @@ type PgConn struct {
slowWriteTimer *time.Timer
bgReaderStarted chan struct{}
+ customData map[string]any
+
config *Config
status byte // One of connStatus* constants
@@ -103,8 +105,9 @@ type PgConn struct {
cleanupDone chan struct{}
}
-// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format)
-// to provide configuration. See documentation for [ParseConfig] for details. ctx can be used to cancel a connect attempt.
+// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value
+// format) to provide configuration. See documentation for [ParseConfig] for details. ctx can be used to cancel a
+// connect attempt.
func Connect(ctx context.Context, connString string) (*PgConn, error) {
config, err := ParseConfig(connString)
if err != nil {
@@ -114,9 +117,9 @@ func Connect(ctx context.Context, connString string) (*PgConn, error) {
return ConnectConfig(ctx, config)
}
-// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format)
-// and ParseConfigOptions to provide additional configuration. See documentation for [ParseConfig] for details. ctx can be
-// used to cancel a connect attempt.
+// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value
+// format) and ParseConfigOptions to provide additional configuration. See documentation for [ParseConfig] for details.
+// ctx can be used to cancel a connect attempt.
func ConnectWithOptions(ctx context.Context, connString string, parseConfigOptions ParseConfigOptions) (*PgConn, error) {
config, err := ParseConfigWithOptions(connString, parseConfigOptions)
if err != nil {
@@ -131,113 +134,77 @@ func ConnectWithOptions(ctx context.Context, connString string, parseConfigOptio
//
// If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An
// authentication error will terminate the chain of attempts (like libpq:
-// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. Otherwise,
-// if all attempts fail the last error is returned.
-func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err error) {
+// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error.
+func ConnectConfig(ctx context.Context, config *Config) (*PgConn, error) {
// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
// zero values.
if !config.createdByParseConfig {
panic("config must be created by ParseConfig")
}
- // Simplify usage by treating primary config and fallbacks the same.
- fallbackConfigs := []*FallbackConfig{
- {
- Host: config.Host,
- Port: config.Port,
- TLSConfig: config.TLSConfig,
- },
- }
- fallbackConfigs = append(fallbackConfigs, config.Fallbacks...)
- ctx := octx
- fallbackConfigs, err = expandWithIPs(ctx, config.LookupFunc, fallbackConfigs)
- if err != nil {
- return nil, &ConnectError{Config: config, msg: "hostname resolving error", err: err}
- }
+ var allErrors []error
- if len(fallbackConfigs) == 0 {
- return nil, &ConnectError{Config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")}
- }
-
- foundBestServer := false
- var fallbackConfig *FallbackConfig
- for i, fc := range fallbackConfigs {
- // ConnectTimeout restricts the whole connection process.
- if config.ConnectTimeout != 0 {
- // create new context first time or when previous host was different
- if i == 0 || (fallbackConfigs[i].Host != fallbackConfigs[i-1].Host) {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(octx, config.ConnectTimeout)
- defer cancel()
- }
- } else {
- ctx = octx
- }
- pgConn, err = connect(ctx, config, fc, false)
- if err == nil {
- foundBestServer = true
- break
- } else if pgerr, ok := err.(*PgError); ok {
- err = &ConnectError{Config: config, msg: "server error", err: pgerr}
- const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password
- const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings
- const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
- const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
- if pgerr.Code == ERRCODE_INVALID_PASSWORD ||
- pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && fc.TLSConfig != nil ||
- pgerr.Code == ERRCODE_INVALID_CATALOG_NAME ||
- pgerr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
- break
- }
- } else if cerr, ok := err.(*ConnectError); ok {
- if _, ok := cerr.err.(*NotPreferredError); ok {
- fallbackConfig = fc
- }
- }
+ connectConfigs, errs := buildConnectOneConfigs(ctx, config)
+ if len(errs) > 0 {
+ allErrors = append(allErrors, errs...)
}
- if !foundBestServer && fallbackConfig != nil {
- pgConn, err = connect(ctx, config, fallbackConfig, true)
- if pgerr, ok := err.(*PgError); ok {
- err = &ConnectError{Config: config, msg: "server error", err: pgerr}
- }
+ if len(connectConfigs) == 0 {
+ return nil, &ConnectError{Config: config, err: fmt.Errorf("hostname resolving error: %w", errors.Join(allErrors...))}
}
- if err != nil {
- return nil, err // no need to wrap in connectError because it will already be wrapped in all cases except PgError
+ pgConn, errs := connectPreferred(ctx, config, connectConfigs)
+ if len(errs) > 0 {
+ allErrors = append(allErrors, errs...)
+ return nil, &ConnectError{Config: config, err: errors.Join(allErrors...)}
}
if config.AfterConnect != nil {
err := config.AfterConnect(ctx, pgConn)
if err != nil {
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "AfterConnect error", err: err}
+ return nil, &ConnectError{Config: config, err: fmt.Errorf("AfterConnect error: %w", err)}
}
}
return pgConn, nil
}
-func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*FallbackConfig) ([]*FallbackConfig, error) {
- var configs []*FallbackConfig
+// buildConnectOneConfigs resolves hostnames and builds a list of connectOneConfigs to try connecting to. It returns a
+// slice of successfully resolved connectOneConfigs and a slice of errors. It is possible for both slices to contain
+// values if some hosts were successfully resolved and others were not.
+func buildConnectOneConfigs(ctx context.Context, config *Config) ([]*connectOneConfig, []error) {
+ // Simplify usage by treating primary config and fallbacks the same.
+ fallbackConfigs := []*FallbackConfig{
+ {
+ Host: config.Host,
+ Port: config.Port,
+ TLSConfig: config.TLSConfig,
+ },
+ }
+ fallbackConfigs = append(fallbackConfigs, config.Fallbacks...)
+
+ var configs []*connectOneConfig
- var lookupErrors []error
+ var allErrors []error
- for _, fb := range fallbacks {
+ for _, fb := range fallbackConfigs {
// skip resolve for unix sockets
if isAbsolutePath(fb.Host) {
- configs = append(configs, &FallbackConfig{
- Host: fb.Host,
- Port: fb.Port,
- TLSConfig: fb.TLSConfig,
+ network, address := NetworkAddress(fb.Host, fb.Port)
+ configs = append(configs, &connectOneConfig{
+ network: network,
+ address: address,
+ originalHostname: fb.Host,
+ tlsConfig: fb.TLSConfig,
})
continue
}
- ips, err := lookupFn(ctx, fb.Host)
+ ips, err := config.LookupFunc(ctx, fb.Host)
if err != nil {
- lookupErrors = append(lookupErrors, err)
+ allErrors = append(allErrors, err)
continue
}
@@ -246,63 +213,126 @@ func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*Fallba
if err == nil {
port, err := strconv.ParseUint(splitPort, 10, 16)
if err != nil {
- return nil, fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err)
+ return nil, []error{fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err)}
}
- configs = append(configs, &FallbackConfig{
- Host: splitIP,
- Port: uint16(port),
- TLSConfig: fb.TLSConfig,
+ network, address := NetworkAddress(splitIP, uint16(port))
+ configs = append(configs, &connectOneConfig{
+ network: network,
+ address: address,
+ originalHostname: fb.Host,
+ tlsConfig: fb.TLSConfig,
})
} else {
- configs = append(configs, &FallbackConfig{
- Host: ip,
- Port: fb.Port,
- TLSConfig: fb.TLSConfig,
+ network, address := NetworkAddress(ip, fb.Port)
+ configs = append(configs, &connectOneConfig{
+ network: network,
+ address: address,
+ originalHostname: fb.Host,
+ tlsConfig: fb.TLSConfig,
})
}
}
}
- // See https://github.com/jackc/pgx/issues/1464. When Go 1.20 can be used in pgx consider using errors.Join so all
- // errors are reported.
- if len(configs) == 0 && len(lookupErrors) > 0 {
- return nil, lookupErrors[0]
+ return configs, allErrors
+}
+
+// connectPreferred attempts to connect to the preferred host from connectOneConfigs. The connections are attempted in
+// order. If a connection is successful it is returned. If no connection is successful then all errors are returned. If
+// a connection attempt returns a [NotPreferredError], then that host will be used if no other hosts are successful.
+func connectPreferred(ctx context.Context, config *Config, connectOneConfigs []*connectOneConfig) (*PgConn, []error) {
+ octx := ctx
+ var allErrors []error
+
+ var fallbackConnectOneConfig *connectOneConfig
+ for i, c := range connectOneConfigs {
+ // ConnectTimeout restricts the whole connection process.
+ if config.ConnectTimeout != 0 {
+ // create new context first time or when previous host was different
+ if i == 0 || (connectOneConfigs[i].address != connectOneConfigs[i-1].address) {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(octx, config.ConnectTimeout)
+ defer cancel()
+ }
+ } else {
+ ctx = octx
+ }
+
+ pgConn, err := connectOne(ctx, config, c, false)
+ if pgConn != nil {
+ return pgConn, nil
+ }
+
+ allErrors = append(allErrors, err)
+
+ var pgErr *PgError
+ if errors.As(err, &pgErr) {
+ const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password
+ const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings
+ const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
+ const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
+ if pgErr.Code == ERRCODE_INVALID_PASSWORD ||
+ pgErr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && c.tlsConfig != nil ||
+ pgErr.Code == ERRCODE_INVALID_CATALOG_NAME ||
+ pgErr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
+ return nil, allErrors
+ }
+ }
+
+ var npErr *NotPreferredError
+ if errors.As(err, &npErr) {
+ fallbackConnectOneConfig = c
+ }
+ }
+
+ if fallbackConnectOneConfig != nil {
+ pgConn, err := connectOne(ctx, config, fallbackConnectOneConfig, true)
+ if err == nil {
+ return pgConn, nil
+ }
+ allErrors = append(allErrors, err)
}
- return configs, nil
+ return nil, allErrors
}
-func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig,
+// connectOne makes one connection attempt to a single host.
+func connectOne(ctx context.Context, config *Config, connectConfig *connectOneConfig,
ignoreNotPreferredErr bool,
) (*PgConn, error) {
pgConn := new(PgConn)
pgConn.config = config
pgConn.cleanupDone = make(chan struct{})
+ pgConn.customData = make(map[string]any)
var err error
- network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port)
- netConn, err := config.DialFunc(ctx, network, address)
- if err != nil {
- return nil, &ConnectError{Config: config, msg: "dial error", err: normalizeTimeoutError(ctx, err)}
+
+ newPerDialConnectError := func(msg string, err error) *perDialConnectError {
+ err = normalizeTimeoutError(ctx, err)
+ e := &perDialConnectError{address: connectConfig.address, originalHostname: connectConfig.originalHostname, err: fmt.Errorf("%s: %w", msg, err)}
+ return e
}
- pgConn.conn = netConn
- pgConn.contextWatcher = newContextWatcher(netConn)
- pgConn.contextWatcher.Watch(ctx)
+ pgConn.conn, err = config.DialFunc(ctx, connectConfig.network, connectConfig.address)
+ if err != nil {
+ return nil, newPerDialConnectError("dial error", err)
+ }
- if fallbackConfig.TLSConfig != nil {
- nbTLSConn, err := startTLS(netConn, fallbackConfig.TLSConfig)
+ if connectConfig.tlsConfig != nil {
+ pgConn.contextWatcher = ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: pgConn.conn})
+ pgConn.contextWatcher.Watch(ctx)
+ tlsConn, err := startTLS(pgConn.conn, connectConfig.tlsConfig)
pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS.
if err != nil {
- netConn.Close()
- return nil, &ConnectError{Config: config, msg: "tls error", err: normalizeTimeoutError(ctx, err)}
+ pgConn.conn.Close()
+ return nil, newPerDialConnectError("tls error", err)
}
- pgConn.conn = nbTLSConn
- pgConn.contextWatcher = newContextWatcher(nbTLSConn)
- pgConn.contextWatcher.Watch(ctx)
+ pgConn.conn = tlsConn
}
+ pgConn.contextWatcher = ctxwatch.NewContextWatcher(config.BuildContextWatcherHandler(pgConn))
+ pgConn.contextWatcher.Watch(ctx)
defer pgConn.contextWatcher.Unwatch()
pgConn.parameterStatuses = make(map[string]string)
@@ -336,7 +366,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
pgConn.frontend.Send(&startupMsg)
if err := pgConn.flushWithPotentialWriteReadDeadlock(); err != nil {
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "failed to write startup message", err: normalizeTimeoutError(ctx, err)}
+ return nil, newPerDialConnectError("failed to write startup message", err)
}
for {
@@ -344,9 +374,9 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
if err != nil {
pgConn.conn.Close()
if err, ok := err.(*PgError); ok {
- return nil, err
+ return nil, newPerDialConnectError("server error", err)
}
- return nil, &ConnectError{Config: config, msg: "failed to receive message", err: normalizeTimeoutError(ctx, err)}
+ return nil, newPerDialConnectError("failed to receive message", err)
}
switch msg := msg.(type) {
@@ -359,26 +389,26 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
err = pgConn.txPasswordMessage(pgConn.config.Password)
if err != nil {
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "failed to write password message", err: err}
+ return nil, newPerDialConnectError("failed to write password message", err)
}
case *pgproto3.AuthenticationMD5Password:
digestedPassword := "md5" + hexMD5(hexMD5(pgConn.config.Password+pgConn.config.User)+string(msg.Salt[:]))
err = pgConn.txPasswordMessage(digestedPassword)
if err != nil {
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "failed to write password message", err: err}
+ return nil, newPerDialConnectError("failed to write password message", err)
}
case *pgproto3.AuthenticationSASL:
err = pgConn.scramAuth(msg.AuthMechanisms)
if err != nil {
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "failed SASL auth", err: err}
+ return nil, newPerDialConnectError("failed SASL auth", err)
}
case *pgproto3.AuthenticationGSS:
err = pgConn.gssAuth()
if err != nil {
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "failed GSS auth", err: err}
+ return nil, newPerDialConnectError("failed GSS auth", err)
}
case *pgproto3.ReadyForQuery:
pgConn.status = connStatusIdle
@@ -396,7 +426,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
return pgConn, nil
}
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "ValidateConnect failed", err: err}
+ return nil, newPerDialConnectError("ValidateConnect failed", err)
}
}
return pgConn, nil
@@ -404,21 +434,14 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
// handled by ReceiveMessage
case *pgproto3.ErrorResponse:
pgConn.conn.Close()
- return nil, ErrorResponseToPgError(msg)
+ return nil, newPerDialConnectError("server error", ErrorResponseToPgError(msg))
default:
pgConn.conn.Close()
- return nil, &ConnectError{Config: config, msg: "received unexpected message", err: err}
+ return nil, newPerDialConnectError("received unexpected message", err)
}
}
}
-func newContextWatcher(conn net.Conn) *ctxwatch.ContextWatcher {
- return ctxwatch.NewContextWatcher(
- func() { conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
- func() { conn.SetDeadline(time.Time{}) },
- )
-}
-
func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) {
err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103})
if err != nil {
@@ -928,23 +951,24 @@ func (pgConn *PgConn) Deallocate(ctx context.Context, name string) error {
// ErrorResponseToPgError converts a wire protocol error message to a *PgError.
func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError {
return &PgError{
- Severity: msg.Severity,
- Code: string(msg.Code),
- Message: string(msg.Message),
- Detail: string(msg.Detail),
- Hint: msg.Hint,
- Position: msg.Position,
- InternalPosition: msg.InternalPosition,
- InternalQuery: string(msg.InternalQuery),
- Where: string(msg.Where),
- SchemaName: string(msg.SchemaName),
- TableName: string(msg.TableName),
- ColumnName: string(msg.ColumnName),
- DataTypeName: string(msg.DataTypeName),
- ConstraintName: msg.ConstraintName,
- File: string(msg.File),
- Line: msg.Line,
- Routine: string(msg.Routine),
+ Severity: msg.Severity,
+ SeverityUnlocalized: msg.SeverityUnlocalized,
+ Code: string(msg.Code),
+ Message: string(msg.Message),
+ Detail: string(msg.Detail),
+ Hint: msg.Hint,
+ Position: msg.Position,
+ InternalPosition: msg.InternalPosition,
+ InternalQuery: string(msg.InternalQuery),
+ Where: string(msg.Where),
+ SchemaName: string(msg.SchemaName),
+ TableName: string(msg.TableName),
+ ColumnName: string(msg.ColumnName),
+ DataTypeName: string(msg.DataTypeName),
+ ConstraintName: msg.ConstraintName,
+ File: string(msg.File),
+ Line: msg.Line,
+ Routine: string(msg.Routine),
}
}
@@ -987,10 +1011,7 @@ func (pgConn *PgConn) CancelRequest(ctx context.Context) error {
defer cancelConn.Close()
if ctx != context.Background() {
- contextWatcher := ctxwatch.NewContextWatcher(
- func() { cancelConn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) },
- func() { cancelConn.SetDeadline(time.Time{}) },
- )
+ contextWatcher := ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: cancelConn})
contextWatcher.Watch(ctx)
defer contextWatcher.Unwatch()
}
@@ -1523,8 +1544,10 @@ func (rr *ResultReader) Read() *Result {
values := rr.Values()
row := make([][]byte, len(values))
for i := range row {
- row[i] = make([]byte, len(values[i]))
- copy(row[i], values[i])
+ if values[i] != nil {
+ row[i] = make([]byte, len(values[i]))
+ copy(row[i], values[i])
+ }
}
br.Rows = append(br.Rows, row)
}
@@ -1879,6 +1902,11 @@ func (pgConn *PgConn) SyncConn(ctx context.Context) error {
return errors.New("SyncConn: conn never synchronized")
}
+// CustomData returns a map that can be used to associate custom data with the connection.
+func (pgConn *PgConn) CustomData() map[string]any {
+ return pgConn.customData
+}
+
// HijackedConn is the result of hijacking a connection.
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
@@ -1891,6 +1919,7 @@ type HijackedConn struct {
TxStatus byte
Frontend *pgproto3.Frontend
Config *Config
+ CustomData map[string]any
}
// Hijack extracts the internal connection data. pgConn must be in an idle state. SyncConn should be called immediately
@@ -1913,6 +1942,7 @@ func (pgConn *PgConn) Hijack() (*HijackedConn, error) {
TxStatus: pgConn.txStatus,
Frontend: pgConn.frontend,
Config: pgConn.config,
+ CustomData: pgConn.customData,
}, nil
}
@@ -1932,13 +1962,14 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
txStatus: hc.TxStatus,
frontend: hc.Frontend,
config: hc.Config,
+ customData: hc.CustomData,
status: connStatusIdle,
cleanupDone: make(chan struct{}),
}
- pgConn.contextWatcher = newContextWatcher(pgConn.conn)
+ pgConn.contextWatcher = ctxwatch.NewContextWatcher(hc.Config.BuildContextWatcherHandler(pgConn))
pgConn.bgReader = bgreader.New(pgConn.conn)
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64),
func() {
@@ -2245,3 +2276,71 @@ func (p *Pipeline) Close() error {
return p.err
}
+
+// DeadlineContextWatcherHandler handles canceled contexts by setting a deadline on a net.Conn.
+type DeadlineContextWatcherHandler struct {
+ Conn net.Conn
+
+ // DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled.
+ DeadlineDelay time.Duration
+}
+
+func (h *DeadlineContextWatcherHandler) HandleCancel(ctx context.Context) {
+ h.Conn.SetDeadline(time.Now().Add(h.DeadlineDelay))
+}
+
+func (h *DeadlineContextWatcherHandler) HandleUnwatchAfterCancel() {
+ h.Conn.SetDeadline(time.Time{})
+}
+
+// CancelRequestContextWatcherHandler handles canceled contexts by sending a cancel request to the server. It also sets
+// a deadline on a net.Conn as a fallback.
+type CancelRequestContextWatcherHandler struct {
+ Conn *PgConn
+
+ // CancelRequestDelay is the delay before sending the cancel request to the server.
+ CancelRequestDelay time.Duration
+
+ // DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled.
+ DeadlineDelay time.Duration
+
+ cancelFinishedChan chan struct{}
+ handleUnwatchAfterCancelCalled func()
+}
+
+func (h *CancelRequestContextWatcherHandler) HandleCancel(context.Context) {
+ h.cancelFinishedChan = make(chan struct{})
+ var handleUnwatchedAfterCancelCalledCtx context.Context
+ handleUnwatchedAfterCancelCalledCtx, h.handleUnwatchAfterCancelCalled = context.WithCancel(context.Background())
+
+ deadline := time.Now().Add(h.DeadlineDelay)
+ h.Conn.conn.SetDeadline(deadline)
+
+ go func() {
+ defer close(h.cancelFinishedChan)
+
+ select {
+ case <-handleUnwatchedAfterCancelCalledCtx.Done():
+ return
+ case <-time.After(h.CancelRequestDelay):
+ }
+
+ cancelRequestCtx, cancel := context.WithDeadline(handleUnwatchedAfterCancelCalledCtx, deadline)
+ defer cancel()
+ h.Conn.CancelRequest(cancelRequestCtx)
+
+ // CancelRequest is inherently racy. Even though the cancel request has been received by the server at this point,
+ // it hasn't necessarily been delivered to the other connection. If we immediately return and the connection is
+ // immediately used then it is possible the CancelRequest will actually cancel our next query. The
+ // TestCancelRequestContextWatcherHandler Stress test can produce this error without the sleep below. The sleep time
+ // is arbitrary, but should be sufficient to prevent this error case.
+ time.Sleep(100 * time.Millisecond)
+ }()
+}
+
+func (h *CancelRequestContextWatcherHandler) HandleUnwatchAfterCancel() {
+ h.handleUnwatchAfterCancelCalled()
+ <-h.cancelFinishedChan
+
+ h.Conn.conn.SetDeadline(time.Time{})
+}