diff options
author | 2025-03-09 17:47:56 +0100 | |
---|---|---|
committer | 2025-03-10 01:59:49 +0100 | |
commit | 3ac1ee16f377d31a0fb80c8dae28b6239ac4229e (patch) | |
tree | f61faa581feaaeaba2542b9f2b8234a590684413 /vendor/github.com/jackc/pgx/v5/pgconn | |
parent | [chore] update URLs to forked source (diff) | |
download | gotosocial-3ac1ee16f377d31a0fb80c8dae28b6239ac4229e.tar.xz |
[chore] remove vendor
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgconn')
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/README.md | 29 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go | 272 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/config.go | 934 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go | 80 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/defaults.go | 63 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go | 57 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/doc.go | 38 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/errors.go | 248 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go | 139 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/krb5.go | 100 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go | 2346 |
11 files changed, 0 insertions, 4306 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/README.md b/vendor/github.com/jackc/pgx/v5/pgconn/README.md deleted file mode 100644 index 1fe15c268..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/README.md +++ /dev/null @@ -1,29 +0,0 @@ -# pgconn - -Package pgconn is a low-level PostgreSQL database driver. It operates at nearly the same level as the C library libpq. -It is primarily intended to serve as the foundation for higher level libraries such as https://github.com/jackc/pgx. -Applications should handle normal queries with a higher level library and only use pgconn directly when required for -low-level access to PostgreSQL functionality. - -## Example Usage - -```go -pgConn, err := pgconn.Connect(context.Background(), os.Getenv("DATABASE_URL")) -if err != nil { - log.Fatalln("pgconn failed to connect:", err) -} -defer pgConn.Close(context.Background()) - -result := pgConn.ExecParams(context.Background(), "SELECT email FROM users WHERE id=$1", [][]byte{[]byte("123")}, nil, nil, nil) -for result.NextRow() { - fmt.Println("User 123 has email:", string(result.Values()[0])) -} -_, err = result.Close() -if err != nil { - log.Fatalln("failed reading result:", err) -} -``` - -## Testing - -See CONTRIBUTING.md for setup instructions. diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go b/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go deleted file mode 100644 index 064983615..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/auth_scram.go +++ /dev/null @@ -1,272 +0,0 @@ -// SCRAM-SHA-256 authentication -// -// Resources: -// https://tools.ietf.org/html/rfc5802 -// https://tools.ietf.org/html/rfc8265 -// https://www.postgresql.org/docs/current/sasl-authentication.html -// -// Inspiration drawn from other implementations: -// https://github.com/lib/pq/pull/608 -// https://github.com/lib/pq/pull/788 -// https://github.com/lib/pq/pull/833 - -package pgconn - -import ( - "bytes" - "crypto/hmac" - "crypto/rand" - "crypto/sha256" - "encoding/base64" - "errors" - "fmt" - "strconv" - - "github.com/jackc/pgx/v5/pgproto3" - "golang.org/x/crypto/pbkdf2" - "golang.org/x/text/secure/precis" -) - -const clientNonceLen = 18 - -// Perform SCRAM authentication. -func (c *PgConn) scramAuth(serverAuthMechanisms []string) error { - sc, err := newScramClient(serverAuthMechanisms, c.config.Password) - if err != nil { - return err - } - - // Send client-first-message in a SASLInitialResponse - saslInitialResponse := &pgproto3.SASLInitialResponse{ - AuthMechanism: "SCRAM-SHA-256", - Data: sc.clientFirstMessage(), - } - c.frontend.Send(saslInitialResponse) - err = c.flushWithPotentialWriteReadDeadlock() - if err != nil { - return err - } - - // Receive server-first-message payload in an AuthenticationSASLContinue. - saslContinue, err := c.rxSASLContinue() - if err != nil { - return err - } - err = sc.recvServerFirstMessage(saslContinue.Data) - if err != nil { - return err - } - - // Send client-final-message in a SASLResponse - saslResponse := &pgproto3.SASLResponse{ - Data: []byte(sc.clientFinalMessage()), - } - c.frontend.Send(saslResponse) - err = c.flushWithPotentialWriteReadDeadlock() - if err != nil { - return err - } - - // Receive server-final-message payload in an AuthenticationSASLFinal. - saslFinal, err := c.rxSASLFinal() - if err != nil { - return err - } - return sc.recvServerFinalMessage(saslFinal.Data) -} - -func (c *PgConn) rxSASLContinue() (*pgproto3.AuthenticationSASLContinue, error) { - msg, err := c.receiveMessage() - if err != nil { - return nil, err - } - switch m := msg.(type) { - case *pgproto3.AuthenticationSASLContinue: - return m, nil - case *pgproto3.ErrorResponse: - return nil, ErrorResponseToPgError(m) - } - - return nil, fmt.Errorf("expected AuthenticationSASLContinue message but received unexpected message %T", msg) -} - -func (c *PgConn) rxSASLFinal() (*pgproto3.AuthenticationSASLFinal, error) { - msg, err := c.receiveMessage() - if err != nil { - return nil, err - } - switch m := msg.(type) { - case *pgproto3.AuthenticationSASLFinal: - return m, nil - case *pgproto3.ErrorResponse: - return nil, ErrorResponseToPgError(m) - } - - return nil, fmt.Errorf("expected AuthenticationSASLFinal message but received unexpected message %T", msg) -} - -type scramClient struct { - serverAuthMechanisms []string - password []byte - clientNonce []byte - - clientFirstMessageBare []byte - - serverFirstMessage []byte - clientAndServerNonce []byte - salt []byte - iterations int - - saltedPassword []byte - authMessage []byte -} - -func newScramClient(serverAuthMechanisms []string, password string) (*scramClient, error) { - sc := &scramClient{ - serverAuthMechanisms: serverAuthMechanisms, - } - - // Ensure server supports SCRAM-SHA-256 - hasScramSHA256 := false - for _, mech := range sc.serverAuthMechanisms { - if mech == "SCRAM-SHA-256" { - hasScramSHA256 = true - break - } - } - if !hasScramSHA256 { - return nil, errors.New("server does not support SCRAM-SHA-256") - } - - // precis.OpaqueString is equivalent to SASLprep for password. - var err error - sc.password, err = precis.OpaqueString.Bytes([]byte(password)) - if err != nil { - // PostgreSQL allows passwords invalid according to SCRAM / SASLprep. - sc.password = []byte(password) - } - - buf := make([]byte, clientNonceLen) - _, err = rand.Read(buf) - if err != nil { - return nil, err - } - sc.clientNonce = make([]byte, base64.RawStdEncoding.EncodedLen(len(buf))) - base64.RawStdEncoding.Encode(sc.clientNonce, buf) - - return sc, nil -} - -func (sc *scramClient) clientFirstMessage() []byte { - sc.clientFirstMessageBare = []byte(fmt.Sprintf("n=,r=%s", sc.clientNonce)) - return []byte(fmt.Sprintf("n,,%s", sc.clientFirstMessageBare)) -} - -func (sc *scramClient) recvServerFirstMessage(serverFirstMessage []byte) error { - sc.serverFirstMessage = serverFirstMessage - buf := serverFirstMessage - if !bytes.HasPrefix(buf, []byte("r=")) { - return errors.New("invalid SCRAM server-first-message received from server: did not include r=") - } - buf = buf[2:] - - idx := bytes.IndexByte(buf, ',') - if idx == -1 { - return errors.New("invalid SCRAM server-first-message received from server: did not include s=") - } - sc.clientAndServerNonce = buf[:idx] - buf = buf[idx+1:] - - if !bytes.HasPrefix(buf, []byte("s=")) { - return errors.New("invalid SCRAM server-first-message received from server: did not include s=") - } - buf = buf[2:] - - idx = bytes.IndexByte(buf, ',') - if idx == -1 { - return errors.New("invalid SCRAM server-first-message received from server: did not include i=") - } - saltStr := buf[:idx] - buf = buf[idx+1:] - - if !bytes.HasPrefix(buf, []byte("i=")) { - return errors.New("invalid SCRAM server-first-message received from server: did not include i=") - } - buf = buf[2:] - iterationsStr := buf - - var err error - sc.salt, err = base64.StdEncoding.DecodeString(string(saltStr)) - if err != nil { - return fmt.Errorf("invalid SCRAM salt received from server: %w", err) - } - - sc.iterations, err = strconv.Atoi(string(iterationsStr)) - if err != nil || sc.iterations <= 0 { - return fmt.Errorf("invalid SCRAM iteration count received from server: %w", err) - } - - if !bytes.HasPrefix(sc.clientAndServerNonce, sc.clientNonce) { - return errors.New("invalid SCRAM nonce: did not start with client nonce") - } - - if len(sc.clientAndServerNonce) <= len(sc.clientNonce) { - return errors.New("invalid SCRAM nonce: did not include server nonce") - } - - return nil -} - -func (sc *scramClient) clientFinalMessage() string { - clientFinalMessageWithoutProof := []byte(fmt.Sprintf("c=biws,r=%s", sc.clientAndServerNonce)) - - sc.saltedPassword = pbkdf2.Key([]byte(sc.password), sc.salt, sc.iterations, 32, sha256.New) - sc.authMessage = bytes.Join([][]byte{sc.clientFirstMessageBare, sc.serverFirstMessage, clientFinalMessageWithoutProof}, []byte(",")) - - clientProof := computeClientProof(sc.saltedPassword, sc.authMessage) - - return fmt.Sprintf("%s,p=%s", clientFinalMessageWithoutProof, clientProof) -} - -func (sc *scramClient) recvServerFinalMessage(serverFinalMessage []byte) error { - if !bytes.HasPrefix(serverFinalMessage, []byte("v=")) { - return errors.New("invalid SCRAM server-final-message received from server") - } - - serverSignature := serverFinalMessage[2:] - - if !hmac.Equal(serverSignature, computeServerSignature(sc.saltedPassword, sc.authMessage)) { - return errors.New("invalid SCRAM ServerSignature received from server") - } - - return nil -} - -func computeHMAC(key, msg []byte) []byte { - mac := hmac.New(sha256.New, key) - mac.Write(msg) - return mac.Sum(nil) -} - -func computeClientProof(saltedPassword, authMessage []byte) []byte { - clientKey := computeHMAC(saltedPassword, []byte("Client Key")) - storedKey := sha256.Sum256(clientKey) - clientSignature := computeHMAC(storedKey[:], authMessage) - - clientProof := make([]byte, len(clientSignature)) - for i := 0; i < len(clientSignature); i++ { - clientProof[i] = clientKey[i] ^ clientSignature[i] - } - - buf := make([]byte, base64.StdEncoding.EncodedLen(len(clientProof))) - base64.StdEncoding.Encode(buf, clientProof) - return buf -} - -func computeServerSignature(saltedPassword []byte, authMessage []byte) []byte { - serverKey := computeHMAC(saltedPassword, []byte("Server Key")) - serverSignature := computeHMAC(serverKey, authMessage) - buf := make([]byte, base64.StdEncoding.EncodedLen(len(serverSignature))) - base64.StdEncoding.Encode(buf, serverSignature) - return buf -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/config.go b/vendor/github.com/jackc/pgx/v5/pgconn/config.go deleted file mode 100644 index 46b39f14e..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/config.go +++ /dev/null @@ -1,934 +0,0 @@ -package pgconn - -import ( - "context" - "crypto/tls" - "crypto/x509" - "encoding/pem" - "errors" - "fmt" - "io" - "math" - "net" - "net/url" - "os" - "path/filepath" - "strconv" - "strings" - "time" - - "github.com/jackc/pgpassfile" - "github.com/jackc/pgservicefile" - "github.com/jackc/pgx/v5/pgconn/ctxwatch" - "github.com/jackc/pgx/v5/pgproto3" -) - -type AfterConnectFunc func(ctx context.Context, pgconn *PgConn) error -type ValidateConnectFunc func(ctx context.Context, pgconn *PgConn) error -type GetSSLPasswordFunc func(ctx context.Context) string - -// Config is the settings used to establish a connection to a PostgreSQL server. It must be created by [ParseConfig]. A -// manually initialized Config will cause ConnectConfig to panic. -type Config struct { - Host string // host (e.g. localhost) or absolute path to unix domain socket directory (e.g. /private/tmp) - Port uint16 - Database string - User string - Password string - TLSConfig *tls.Config // nil disables TLS - ConnectTimeout time.Duration - DialFunc DialFunc // e.g. net.Dialer.DialContext - LookupFunc LookupFunc // e.g. net.Resolver.LookupHost - BuildFrontend BuildFrontendFunc - - // BuildContextWatcherHandler is called to create a ContextWatcherHandler for a connection. The handler is called - // when a context passed to a PgConn method is canceled. - BuildContextWatcherHandler func(*PgConn) ctxwatch.Handler - - RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name) - - KerberosSrvName string - KerberosSpn string - Fallbacks []*FallbackConfig - - // ValidateConnect is called during a connection attempt after a successful authentication with the PostgreSQL server. - // It can be used to validate that the server is acceptable. If this returns an error the connection is closed and the next - // fallback config is tried. This allows implementing high availability behavior such as libpq does with target_session_attrs. - ValidateConnect ValidateConnectFunc - - // AfterConnect is called after ValidateConnect. It can be used to set up the connection (e.g. Set session variables - // or prepare statements). If this returns an error the connection attempt fails. - AfterConnect AfterConnectFunc - - // OnNotice is a callback function called when a notice response is received. - OnNotice NoticeHandler - - // OnNotification is a callback function called when a notification from the LISTEN/NOTIFY system is received. - OnNotification NotificationHandler - - // OnPgError is a callback function called when a Postgres error is received by the server. The default handler will close - // the connection on any FATAL errors. If you override this handler you should call the previously set handler or ensure - // that you close on FATAL errors by returning false. - OnPgError PgErrorHandler - - createdByParseConfig bool // Used to enforce created by ParseConfig rule. -} - -// ParseConfigOptions contains options that control how a config is built such as GetSSLPassword. -type ParseConfigOptions struct { - // GetSSLPassword gets the password to decrypt a SSL client certificate. This is analogous to the libpq function - // PQsetSSLKeyPassHook_OpenSSL. - GetSSLPassword GetSSLPasswordFunc -} - -// Copy returns a deep copy of the config that is safe to use and modify. -// The only exception is the TLSConfig field: -// according to the tls.Config docs it must not be modified after creation. -func (c *Config) Copy() *Config { - newConf := new(Config) - *newConf = *c - if newConf.TLSConfig != nil { - newConf.TLSConfig = c.TLSConfig.Clone() - } - if newConf.RuntimeParams != nil { - newConf.RuntimeParams = make(map[string]string, len(c.RuntimeParams)) - for k, v := range c.RuntimeParams { - newConf.RuntimeParams[k] = v - } - } - if newConf.Fallbacks != nil { - newConf.Fallbacks = make([]*FallbackConfig, len(c.Fallbacks)) - for i, fallback := range c.Fallbacks { - newFallback := new(FallbackConfig) - *newFallback = *fallback - if newFallback.TLSConfig != nil { - newFallback.TLSConfig = fallback.TLSConfig.Clone() - } - newConf.Fallbacks[i] = newFallback - } - } - return newConf -} - -// FallbackConfig is additional settings to attempt a connection with when the primary Config fails to establish a -// network connection. It is used for TLS fallback such as sslmode=prefer and high availability (HA) connections. -type FallbackConfig struct { - Host string // host (e.g. localhost) or path to unix domain socket directory (e.g. /private/tmp) - Port uint16 - TLSConfig *tls.Config // nil disables TLS -} - -// connectOneConfig is the configuration for a single attempt to connect to a single host. -type connectOneConfig struct { - network string - address string - originalHostname string // original hostname before resolving - tlsConfig *tls.Config // nil disables TLS -} - -// isAbsolutePath checks if the provided value is an absolute path either -// beginning with a forward slash (as on Linux-based systems) or with a capital -// letter A-Z followed by a colon and a backslash, e.g., "C:\", (as on Windows). -func isAbsolutePath(path string) bool { - isWindowsPath := func(p string) bool { - if len(p) < 3 { - return false - } - drive := p[0] - colon := p[1] - backslash := p[2] - if drive >= 'A' && drive <= 'Z' && colon == ':' && backslash == '\\' { - return true - } - return false - } - return strings.HasPrefix(path, "/") || isWindowsPath(path) -} - -// NetworkAddress converts a PostgreSQL host and port into network and address suitable for use with -// net.Dial. -func NetworkAddress(host string, port uint16) (network, address string) { - if isAbsolutePath(host) { - network = "unix" - address = filepath.Join(host, ".s.PGSQL.") + strconv.FormatInt(int64(port), 10) - } else { - network = "tcp" - address = net.JoinHostPort(host, strconv.Itoa(int(port))) - } - return network, address -} - -// ParseConfig builds a *Config from connString with similar behavior to the PostgreSQL standard C library libpq. It -// uses the same defaults as libpq (e.g. port=5432) and understands most PG* environment variables. ParseConfig closely -// matches the parsing behavior of libpq. connString may either be in URL format or keyword = value format. See -// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING for details. connString also may be empty -// to only read from the environment. If a password is not supplied it will attempt to read the .pgpass file. -// -// # Example Keyword/Value -// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca -// -// # Example URL -// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca -// -// The returned *Config may be modified. However, it is strongly recommended that any configuration that can be done -// through the connection string be done there. In particular the fields Host, Port, TLSConfig, and Fallbacks can be -// interdependent (e.g. TLSConfig needs knowledge of the host to validate the server certificate). These fields should -// not be modified individually. They should all be modified or all left unchanged. -// -// ParseConfig supports specifying multiple hosts in similar manner to libpq. Host and port may include comma separated -// values that will be tried in order. This can be used as part of a high availability system. See -// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS for more information. -// -// # Example URL -// postgres://jack:secret@foo.example.com:5432,bar.example.com:5432/mydb -// -// ParseConfig currently recognizes the following environment variable and their parameter key word equivalents passed -// via database URL or keyword/value: -// -// PGHOST -// PGPORT -// PGDATABASE -// PGUSER -// PGPASSWORD -// PGPASSFILE -// PGSERVICE -// PGSERVICEFILE -// PGSSLMODE -// PGSSLCERT -// PGSSLKEY -// PGSSLROOTCERT -// PGSSLPASSWORD -// PGAPPNAME -// PGCONNECT_TIMEOUT -// PGTARGETSESSIONATTRS -// -// See http://www.postgresql.org/docs/11/static/libpq-envars.html for details on the meaning of environment variables. -// -// See https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-PARAMKEYWORDS for parameter key word names. They are -// usually but not always the environment variable name downcased and without the "PG" prefix. -// -// Important Security Notes: -// -// ParseConfig tries to match libpq behavior with regard to PGSSLMODE. This includes defaulting to "prefer" behavior if -// not set. -// -// See http://www.postgresql.org/docs/11/static/libpq-ssl.html#LIBPQ-SSL-PROTECTION for details on what level of -// security each sslmode provides. -// -// The sslmode "prefer" (the default), sslmode "allow", and multiple hosts are implemented via the Fallbacks field of -// the Config struct. If TLSConfig is manually changed it will not affect the fallbacks. For example, in the case of -// sslmode "prefer" this means it will first try the main Config settings which use TLS, then it will try the fallback -// which does not use TLS. This can lead to an unexpected unencrypted connection if the main TLS config is manually -// changed later but the unencrypted fallback is present. Ensure there are no stale fallbacks when manually setting -// TLSConfig. -// -// Other known differences with libpq: -// -// When multiple hosts are specified, libpq allows them to have different passwords set via the .pgpass file. pgconn -// does not. -// -// In addition, ParseConfig accepts the following options: -// -// - servicefile. -// libpq only reads servicefile from the PGSERVICEFILE environment variable. ParseConfig accepts servicefile as a -// part of the connection string. -func ParseConfig(connString string) (*Config, error) { - var parseConfigOptions ParseConfigOptions - return ParseConfigWithOptions(connString, parseConfigOptions) -} - -// ParseConfigWithOptions builds a *Config from connString and options with similar behavior to the PostgreSQL standard -// C library libpq. options contains settings that cannot be specified in a connString such as providing a function to -// get the SSL password. -func ParseConfigWithOptions(connString string, options ParseConfigOptions) (*Config, error) { - defaultSettings := defaultSettings() - envSettings := parseEnvSettings() - - connStringSettings := make(map[string]string) - if connString != "" { - var err error - // connString may be a database URL or in PostgreSQL keyword/value format - if strings.HasPrefix(connString, "postgres://") || strings.HasPrefix(connString, "postgresql://") { - connStringSettings, err = parseURLSettings(connString) - if err != nil { - return nil, &ParseConfigError{ConnString: connString, msg: "failed to parse as URL", err: err} - } - } else { - connStringSettings, err = parseKeywordValueSettings(connString) - if err != nil { - return nil, &ParseConfigError{ConnString: connString, msg: "failed to parse as keyword/value", err: err} - } - } - } - - settings := mergeSettings(defaultSettings, envSettings, connStringSettings) - if service, present := settings["service"]; present { - serviceSettings, err := parseServiceSettings(settings["servicefile"], service) - if err != nil { - return nil, &ParseConfigError{ConnString: connString, msg: "failed to read service", err: err} - } - - settings = mergeSettings(defaultSettings, envSettings, serviceSettings, connStringSettings) - } - - config := &Config{ - createdByParseConfig: true, - Database: settings["database"], - User: settings["user"], - Password: settings["password"], - RuntimeParams: make(map[string]string), - BuildFrontend: func(r io.Reader, w io.Writer) *pgproto3.Frontend { - return pgproto3.NewFrontend(r, w) - }, - BuildContextWatcherHandler: func(pgConn *PgConn) ctxwatch.Handler { - return &DeadlineContextWatcherHandler{Conn: pgConn.conn} - }, - OnPgError: func(_ *PgConn, pgErr *PgError) bool { - // we want to automatically close any fatal errors - if strings.EqualFold(pgErr.Severity, "FATAL") { - return false - } - return true - }, - } - - if connectTimeoutSetting, present := settings["connect_timeout"]; present { - connectTimeout, err := parseConnectTimeoutSetting(connectTimeoutSetting) - if err != nil { - return nil, &ParseConfigError{ConnString: connString, msg: "invalid connect_timeout", err: err} - } - config.ConnectTimeout = connectTimeout - config.DialFunc = makeConnectTimeoutDialFunc(connectTimeout) - } else { - defaultDialer := makeDefaultDialer() - config.DialFunc = defaultDialer.DialContext - } - - config.LookupFunc = makeDefaultResolver().LookupHost - - notRuntimeParams := map[string]struct{}{ - "host": {}, - "port": {}, - "database": {}, - "user": {}, - "password": {}, - "passfile": {}, - "connect_timeout": {}, - "sslmode": {}, - "sslkey": {}, - "sslcert": {}, - "sslrootcert": {}, - "sslpassword": {}, - "sslsni": {}, - "krbspn": {}, - "krbsrvname": {}, - "target_session_attrs": {}, - "service": {}, - "servicefile": {}, - } - - // Adding kerberos configuration - if _, present := settings["krbsrvname"]; present { - config.KerberosSrvName = settings["krbsrvname"] - } - if _, present := settings["krbspn"]; present { - config.KerberosSpn = settings["krbspn"] - } - - for k, v := range settings { - if _, present := notRuntimeParams[k]; present { - continue - } - config.RuntimeParams[k] = v - } - - fallbacks := []*FallbackConfig{} - - hosts := strings.Split(settings["host"], ",") - ports := strings.Split(settings["port"], ",") - - for i, host := range hosts { - var portStr string - if i < len(ports) { - portStr = ports[i] - } else { - portStr = ports[0] - } - - port, err := parsePort(portStr) - if err != nil { - return nil, &ParseConfigError{ConnString: connString, msg: "invalid port", err: err} - } - - var tlsConfigs []*tls.Config - - // Ignore TLS settings if Unix domain socket like libpq - if network, _ := NetworkAddress(host, port); network == "unix" { - tlsConfigs = append(tlsConfigs, nil) - } else { - var err error - tlsConfigs, err = configTLS(settings, host, options) - if err != nil { - return nil, &ParseConfigError{ConnString: connString, msg: "failed to configure TLS", err: err} - } - } - - for _, tlsConfig := range tlsConfigs { - fallbacks = append(fallbacks, &FallbackConfig{ - Host: host, - Port: port, - TLSConfig: tlsConfig, - }) - } - } - - config.Host = fallbacks[0].Host - config.Port = fallbacks[0].Port - config.TLSConfig = fallbacks[0].TLSConfig - config.Fallbacks = fallbacks[1:] - - passfile, err := pgpassfile.ReadPassfile(settings["passfile"]) - if err == nil { - if config.Password == "" { - host := config.Host - if network, _ := NetworkAddress(config.Host, config.Port); network == "unix" { - host = "localhost" - } - - config.Password = passfile.FindPassword(host, strconv.Itoa(int(config.Port)), config.Database, config.User) - } - } - - switch tsa := settings["target_session_attrs"]; tsa { - case "read-write": - config.ValidateConnect = ValidateConnectTargetSessionAttrsReadWrite - case "read-only": - config.ValidateConnect = ValidateConnectTargetSessionAttrsReadOnly - case "primary": - config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary - case "standby": - config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby - case "prefer-standby": - config.ValidateConnect = ValidateConnectTargetSessionAttrsPreferStandby - case "any": - // do nothing - default: - return nil, &ParseConfigError{ConnString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)} - } - - return config, nil -} - -func mergeSettings(settingSets ...map[string]string) map[string]string { - settings := make(map[string]string) - - for _, s2 := range settingSets { - for k, v := range s2 { - settings[k] = v - } - } - - return settings -} - -func parseEnvSettings() map[string]string { - settings := make(map[string]string) - - nameMap := map[string]string{ - "PGHOST": "host", - "PGPORT": "port", - "PGDATABASE": "database", - "PGUSER": "user", - "PGPASSWORD": "password", - "PGPASSFILE": "passfile", - "PGAPPNAME": "application_name", - "PGCONNECT_TIMEOUT": "connect_timeout", - "PGSSLMODE": "sslmode", - "PGSSLKEY": "sslkey", - "PGSSLCERT": "sslcert", - "PGSSLSNI": "sslsni", - "PGSSLROOTCERT": "sslrootcert", - "PGSSLPASSWORD": "sslpassword", - "PGTARGETSESSIONATTRS": "target_session_attrs", - "PGSERVICE": "service", - "PGSERVICEFILE": "servicefile", - } - - for envname, realname := range nameMap { - value := os.Getenv(envname) - if value != "" { - settings[realname] = value - } - } - - return settings -} - -func parseURLSettings(connString string) (map[string]string, error) { - settings := make(map[string]string) - - parsedURL, err := url.Parse(connString) - if err != nil { - if urlErr := new(url.Error); errors.As(err, &urlErr) { - return nil, urlErr.Err - } - return nil, err - } - - if parsedURL.User != nil { - settings["user"] = parsedURL.User.Username() - if password, present := parsedURL.User.Password(); present { - settings["password"] = password - } - } - - // Handle multiple host:port's in url.Host by splitting them into host,host,host and port,port,port. - var hosts []string - var ports []string - for _, host := range strings.Split(parsedURL.Host, ",") { - if host == "" { - continue - } - if isIPOnly(host) { - hosts = append(hosts, strings.Trim(host, "[]")) - continue - } - h, p, err := net.SplitHostPort(host) - if err != nil { - return nil, fmt.Errorf("failed to split host:port in '%s', err: %w", host, err) - } - if h != "" { - hosts = append(hosts, h) - } - if p != "" { - ports = append(ports, p) - } - } - if len(hosts) > 0 { - settings["host"] = strings.Join(hosts, ",") - } - if len(ports) > 0 { - settings["port"] = strings.Join(ports, ",") - } - - database := strings.TrimLeft(parsedURL.Path, "/") - if database != "" { - settings["database"] = database - } - - nameMap := map[string]string{ - "dbname": "database", - } - - for k, v := range parsedURL.Query() { - if k2, present := nameMap[k]; present { - k = k2 - } - - settings[k] = v[0] - } - - return settings, nil -} - -func isIPOnly(host string) bool { - return net.ParseIP(strings.Trim(host, "[]")) != nil || !strings.Contains(host, ":") -} - -var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} - -func parseKeywordValueSettings(s string) (map[string]string, error) { - settings := make(map[string]string) - - nameMap := map[string]string{ - "dbname": "database", - } - - for len(s) > 0 { - var key, val string - eqIdx := strings.IndexRune(s, '=') - if eqIdx < 0 { - return nil, errors.New("invalid keyword/value") - } - - key = strings.Trim(s[:eqIdx], " \t\n\r\v\f") - s = strings.TrimLeft(s[eqIdx+1:], " \t\n\r\v\f") - if len(s) == 0 { - } else if s[0] != '\'' { - end := 0 - for ; end < len(s); end++ { - if asciiSpace[s[end]] == 1 { - break - } - if s[end] == '\\' { - end++ - if end == len(s) { - return nil, errors.New("invalid backslash") - } - } - } - val = strings.Replace(strings.Replace(s[:end], "\\\\", "\\", -1), "\\'", "'", -1) - if end == len(s) { - s = "" - } else { - s = s[end+1:] - } - } else { // quoted string - s = s[1:] - end := 0 - for ; end < len(s); end++ { - if s[end] == '\'' { - break - } - if s[end] == '\\' { - end++ - } - } - if end == len(s) { - return nil, errors.New("unterminated quoted string in connection info string") - } - val = strings.Replace(strings.Replace(s[:end], "\\\\", "\\", -1), "\\'", "'", -1) - if end == len(s) { - s = "" - } else { - s = s[end+1:] - } - } - - if k, ok := nameMap[key]; ok { - key = k - } - - if key == "" { - return nil, errors.New("invalid keyword/value") - } - - settings[key] = val - } - - return settings, nil -} - -func parseServiceSettings(servicefilePath, serviceName string) (map[string]string, error) { - servicefile, err := pgservicefile.ReadServicefile(servicefilePath) - if err != nil { - return nil, fmt.Errorf("failed to read service file: %v", servicefilePath) - } - - service, err := servicefile.GetService(serviceName) - if err != nil { - return nil, fmt.Errorf("unable to find service: %v", serviceName) - } - - nameMap := map[string]string{ - "dbname": "database", - } - - settings := make(map[string]string, len(service.Settings)) - for k, v := range service.Settings { - if k2, present := nameMap[k]; present { - k = k2 - } - settings[k] = v - } - - return settings, nil -} - -// configTLS uses libpq's TLS parameters to construct []*tls.Config. It is -// necessary to allow returning multiple TLS configs as sslmode "allow" and -// "prefer" allow fallback. -func configTLS(settings map[string]string, thisHost string, parseConfigOptions ParseConfigOptions) ([]*tls.Config, error) { - host := thisHost - sslmode := settings["sslmode"] - sslrootcert := settings["sslrootcert"] - sslcert := settings["sslcert"] - sslkey := settings["sslkey"] - sslpassword := settings["sslpassword"] - sslsni := settings["sslsni"] - - // Match libpq default behavior - if sslmode == "" { - sslmode = "prefer" - } - if sslsni == "" { - sslsni = "1" - } - - tlsConfig := &tls.Config{} - - if sslrootcert != "" { - var caCertPool *x509.CertPool - - if sslrootcert == "system" { - var err error - - caCertPool, err = x509.SystemCertPool() - if err != nil { - return nil, fmt.Errorf("unable to load system certificate pool: %w", err) - } - - sslmode = "verify-full" - } else { - caCertPool = x509.NewCertPool() - - caPath := sslrootcert - caCert, err := os.ReadFile(caPath) - if err != nil { - return nil, fmt.Errorf("unable to read CA file: %w", err) - } - - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, errors.New("unable to add CA to cert pool") - } - } - - tlsConfig.RootCAs = caCertPool - tlsConfig.ClientCAs = caCertPool - } - - switch sslmode { - case "disable": - return []*tls.Config{nil}, nil - case "allow", "prefer": - tlsConfig.InsecureSkipVerify = true - case "require": - // According to PostgreSQL documentation, if a root CA file exists, - // the behavior of sslmode=require should be the same as that of verify-ca - // - // See https://www.postgresql.org/docs/12/libpq-ssl.html - if sslrootcert != "" { - goto nextCase - } - tlsConfig.InsecureSkipVerify = true - break - nextCase: - fallthrough - case "verify-ca": - // Don't perform the default certificate verification because it - // will verify the hostname. Instead, verify the server's - // certificate chain ourselves in VerifyPeerCertificate and - // ignore the server name. This emulates libpq's verify-ca - // behavior. - // - // See https://github.com/golang/go/issues/21971#issuecomment-332693931 - // and https://pkg.go.dev/crypto/tls?tab=doc#example-Config-VerifyPeerCertificate - // for more info. - tlsConfig.InsecureSkipVerify = true - tlsConfig.VerifyPeerCertificate = func(certificates [][]byte, _ [][]*x509.Certificate) error { - certs := make([]*x509.Certificate, len(certificates)) - for i, asn1Data := range certificates { - cert, err := x509.ParseCertificate(asn1Data) - if err != nil { - return errors.New("failed to parse certificate from server: " + err.Error()) - } - certs[i] = cert - } - - // Leave DNSName empty to skip hostname verification. - opts := x509.VerifyOptions{ - Roots: tlsConfig.RootCAs, - Intermediates: x509.NewCertPool(), - } - // Skip the first cert because it's the leaf. All others - // are intermediates. - for _, cert := range certs[1:] { - opts.Intermediates.AddCert(cert) - } - _, err := certs[0].Verify(opts) - return err - } - case "verify-full": - tlsConfig.ServerName = host - default: - return nil, errors.New("sslmode is invalid") - } - - if (sslcert != "" && sslkey == "") || (sslcert == "" && sslkey != "") { - return nil, errors.New(`both "sslcert" and "sslkey" are required`) - } - - if sslcert != "" && sslkey != "" { - buf, err := os.ReadFile(sslkey) - if err != nil { - return nil, fmt.Errorf("unable to read sslkey: %w", err) - } - block, _ := pem.Decode(buf) - if block == nil { - return nil, errors.New("failed to decode sslkey") - } - var pemKey []byte - var decryptedKey []byte - var decryptedError error - // If PEM is encrypted, attempt to decrypt using pass phrase - if x509.IsEncryptedPEMBlock(block) { - // Attempt decryption with pass phrase - // NOTE: only supports RSA (PKCS#1) - if sslpassword != "" { - decryptedKey, decryptedError = x509.DecryptPEMBlock(block, []byte(sslpassword)) - } - //if sslpassword not provided or has decryption error when use it - //try to find sslpassword with callback function - if sslpassword == "" || decryptedError != nil { - if parseConfigOptions.GetSSLPassword != nil { - sslpassword = parseConfigOptions.GetSSLPassword(context.Background()) - } - if sslpassword == "" { - return nil, fmt.Errorf("unable to find sslpassword") - } - } - decryptedKey, decryptedError = x509.DecryptPEMBlock(block, []byte(sslpassword)) - // Should we also provide warning for PKCS#1 needed? - if decryptedError != nil { - return nil, fmt.Errorf("unable to decrypt key: %w", err) - } - - pemBytes := pem.Block{ - Type: "RSA PRIVATE KEY", - Bytes: decryptedKey, - } - pemKey = pem.EncodeToMemory(&pemBytes) - } else { - pemKey = pem.EncodeToMemory(block) - } - certfile, err := os.ReadFile(sslcert) - if err != nil { - return nil, fmt.Errorf("unable to read cert: %w", err) - } - cert, err := tls.X509KeyPair(certfile, pemKey) - if err != nil { - return nil, fmt.Errorf("unable to load cert: %w", err) - } - tlsConfig.Certificates = []tls.Certificate{cert} - } - - // Set Server Name Indication (SNI), if enabled by connection parameters. - // Per RFC 6066, do not set it if the host is a literal IP address (IPv4 - // or IPv6). - if sslsni == "1" && net.ParseIP(host) == nil { - tlsConfig.ServerName = host - } - - switch sslmode { - case "allow": - return []*tls.Config{nil, tlsConfig}, nil - case "prefer": - return []*tls.Config{tlsConfig, nil}, nil - case "require", "verify-ca", "verify-full": - return []*tls.Config{tlsConfig}, nil - default: - panic("BUG: bad sslmode should already have been caught") - } -} - -func parsePort(s string) (uint16, error) { - port, err := strconv.ParseUint(s, 10, 16) - if err != nil { - return 0, err - } - if port < 1 || port > math.MaxUint16 { - return 0, errors.New("outside range") - } - return uint16(port), nil -} - -func makeDefaultDialer() *net.Dialer { - // rely on GOLANG KeepAlive settings - return &net.Dialer{} -} - -func makeDefaultResolver() *net.Resolver { - return net.DefaultResolver -} - -func parseConnectTimeoutSetting(s string) (time.Duration, error) { - timeout, err := strconv.ParseInt(s, 10, 64) - if err != nil { - return 0, err - } - if timeout < 0 { - return 0, errors.New("negative timeout") - } - return time.Duration(timeout) * time.Second, nil -} - -func makeConnectTimeoutDialFunc(timeout time.Duration) DialFunc { - d := makeDefaultDialer() - d.Timeout = timeout - return d.DialContext -} - -// ValidateConnectTargetSessionAttrsReadWrite is a ValidateConnectFunc that implements libpq compatible -// target_session_attrs=read-write. -func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error { - result, err := pgConn.Exec(ctx, "show transaction_read_only").ReadAll() - if err != nil { - return err - } - - if string(result[0].Rows[0][0]) == "on" { - return errors.New("read only connection") - } - - return nil -} - -// ValidateConnectTargetSessionAttrsReadOnly is a ValidateConnectFunc that implements libpq compatible -// target_session_attrs=read-only. -func ValidateConnectTargetSessionAttrsReadOnly(ctx context.Context, pgConn *PgConn) error { - result, err := pgConn.Exec(ctx, "show transaction_read_only").ReadAll() - if err != nil { - return err - } - - if string(result[0].Rows[0][0]) != "on" { - return errors.New("connection is not read only") - } - - return nil -} - -// ValidateConnectTargetSessionAttrsStandby is a ValidateConnectFunc that implements libpq compatible -// target_session_attrs=standby. -func ValidateConnectTargetSessionAttrsStandby(ctx context.Context, pgConn *PgConn) error { - result, err := pgConn.Exec(ctx, "select pg_is_in_recovery()").ReadAll() - if err != nil { - return err - } - - if string(result[0].Rows[0][0]) != "t" { - return errors.New("server is not in hot standby mode") - } - - return nil -} - -// ValidateConnectTargetSessionAttrsPrimary is a ValidateConnectFunc that implements libpq compatible -// target_session_attrs=primary. -func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgConn) error { - result, err := pgConn.Exec(ctx, "select pg_is_in_recovery()").ReadAll() - if err != nil { - return err - } - - if string(result[0].Rows[0][0]) == "t" { - return errors.New("server is in standby mode") - } - - return nil -} - -// ValidateConnectTargetSessionAttrsPreferStandby is a ValidateConnectFunc that implements libpq compatible -// target_session_attrs=prefer-standby. -func ValidateConnectTargetSessionAttrsPreferStandby(ctx context.Context, pgConn *PgConn) error { - result, err := pgConn.Exec(ctx, "select pg_is_in_recovery()").ReadAll() - if err != nil { - return err - } - - if string(result[0].Rows[0][0]) != "t" { - return &NotPreferredError{err: errors.New("server is not in hot standby mode")} - } - - return nil -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go b/vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go deleted file mode 100644 index db8884eb8..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/ctxwatch/context_watcher.go +++ /dev/null @@ -1,80 +0,0 @@ -package ctxwatch - -import ( - "context" - "sync" -) - -// ContextWatcher watches a context and performs an action when the context is canceled. It can watch one context at a -// time. -type ContextWatcher struct { - handler Handler - unwatchChan chan struct{} - - lock sync.Mutex - watchInProgress bool - onCancelWasCalled bool -} - -// NewContextWatcher returns a ContextWatcher. onCancel will be called when a watched context is canceled. -// OnUnwatchAfterCancel will be called when Unwatch is called and the watched context had already been canceled and -// onCancel called. -func NewContextWatcher(handler Handler) *ContextWatcher { - cw := &ContextWatcher{ - handler: handler, - unwatchChan: make(chan struct{}), - } - - return cw -} - -// Watch starts watching ctx. If ctx is canceled then the onCancel function passed to NewContextWatcher will be called. -func (cw *ContextWatcher) Watch(ctx context.Context) { - cw.lock.Lock() - defer cw.lock.Unlock() - - if cw.watchInProgress { - panic("Watch already in progress") - } - - cw.onCancelWasCalled = false - - if ctx.Done() != nil { - cw.watchInProgress = true - go func() { - select { - case <-ctx.Done(): - cw.handler.HandleCancel(ctx) - cw.onCancelWasCalled = true - <-cw.unwatchChan - case <-cw.unwatchChan: - } - }() - } else { - cw.watchInProgress = false - } -} - -// Unwatch stops watching the previously watched context. If the onCancel function passed to NewContextWatcher was -// called then onUnwatchAfterCancel will also be called. -func (cw *ContextWatcher) Unwatch() { - cw.lock.Lock() - defer cw.lock.Unlock() - - if cw.watchInProgress { - cw.unwatchChan <- struct{}{} - if cw.onCancelWasCalled { - cw.handler.HandleUnwatchAfterCancel() - } - cw.watchInProgress = false - } -} - -type Handler interface { - // HandleCancel is called when the context that a ContextWatcher is currently watching is canceled. canceledCtx is the - // context that was canceled. - HandleCancel(canceledCtx context.Context) - - // HandleUnwatchAfterCancel is called when a ContextWatcher that called HandleCancel on this Handler is unwatched. - HandleUnwatchAfterCancel() -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/defaults.go b/vendor/github.com/jackc/pgx/v5/pgconn/defaults.go deleted file mode 100644 index 1dd514ff4..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/defaults.go +++ /dev/null @@ -1,63 +0,0 @@ -//go:build !windows -// +build !windows - -package pgconn - -import ( - "os" - "os/user" - "path/filepath" -) - -func defaultSettings() map[string]string { - settings := make(map[string]string) - - settings["host"] = defaultHost() - settings["port"] = "5432" - - // Default to the OS user name. Purposely ignoring err getting user name from - // OS. The client application will simply have to specify the user in that - // case (which they typically will be doing anyway). - user, err := user.Current() - if err == nil { - settings["user"] = user.Username - settings["passfile"] = filepath.Join(user.HomeDir, ".pgpass") - settings["servicefile"] = filepath.Join(user.HomeDir, ".pg_service.conf") - sslcert := filepath.Join(user.HomeDir, ".postgresql", "postgresql.crt") - sslkey := filepath.Join(user.HomeDir, ".postgresql", "postgresql.key") - if _, err := os.Stat(sslcert); err == nil { - if _, err := os.Stat(sslkey); err == nil { - // Both the cert and key must be present to use them, or do not use either - settings["sslcert"] = sslcert - settings["sslkey"] = sslkey - } - } - sslrootcert := filepath.Join(user.HomeDir, ".postgresql", "root.crt") - if _, err := os.Stat(sslrootcert); err == nil { - settings["sslrootcert"] = sslrootcert - } - } - - settings["target_session_attrs"] = "any" - - return settings -} - -// defaultHost attempts to mimic libpq's default host. libpq uses the default unix socket location on *nix and localhost -// on Windows. The default socket location is compiled into libpq. Since pgx does not have access to that default it -// checks the existence of common locations. -func defaultHost() string { - candidatePaths := []string{ - "/var/run/postgresql", // Debian - "/private/tmp", // OSX - homebrew - "/tmp", // standard PostgreSQL - } - - for _, path := range candidatePaths { - if _, err := os.Stat(path); err == nil { - return path - } - } - - return "localhost" -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go b/vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go deleted file mode 100644 index 33b4a1ff8..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/defaults_windows.go +++ /dev/null @@ -1,57 +0,0 @@ -package pgconn - -import ( - "os" - "os/user" - "path/filepath" - "strings" -) - -func defaultSettings() map[string]string { - settings := make(map[string]string) - - settings["host"] = defaultHost() - settings["port"] = "5432" - - // Default to the OS user name. Purposely ignoring err getting user name from - // OS. The client application will simply have to specify the user in that - // case (which they typically will be doing anyway). - user, err := user.Current() - appData := os.Getenv("APPDATA") - if err == nil { - // Windows gives us the username here as `DOMAIN\user` or `LOCALPCNAME\user`, - // but the libpq default is just the `user` portion, so we strip off the first part. - username := user.Username - if strings.Contains(username, "\\") { - username = username[strings.LastIndex(username, "\\")+1:] - } - - settings["user"] = username - settings["passfile"] = filepath.Join(appData, "postgresql", "pgpass.conf") - settings["servicefile"] = filepath.Join(user.HomeDir, ".pg_service.conf") - sslcert := filepath.Join(appData, "postgresql", "postgresql.crt") - sslkey := filepath.Join(appData, "postgresql", "postgresql.key") - if _, err := os.Stat(sslcert); err == nil { - if _, err := os.Stat(sslkey); err == nil { - // Both the cert and key must be present to use them, or do not use either - settings["sslcert"] = sslcert - settings["sslkey"] = sslkey - } - } - sslrootcert := filepath.Join(appData, "postgresql", "root.crt") - if _, err := os.Stat(sslrootcert); err == nil { - settings["sslrootcert"] = sslrootcert - } - } - - settings["target_session_attrs"] = "any" - - return settings -} - -// defaultHost attempts to mimic libpq's default host. libpq uses the default unix socket location on *nix and localhost -// on Windows. The default socket location is compiled into libpq. Since pgx does not have access to that default it -// checks the existence of common locations. -func defaultHost() string { - return "localhost" -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/doc.go b/vendor/github.com/jackc/pgx/v5/pgconn/doc.go deleted file mode 100644 index 701375019..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/doc.go +++ /dev/null @@ -1,38 +0,0 @@ -// Package pgconn is a low-level PostgreSQL database driver. -/* -pgconn provides lower level access to a PostgreSQL connection than a database/sql or pgx connection. It operates at -nearly the same level is the C library libpq. - -Establishing a Connection - -Use Connect to establish a connection. It accepts a connection string in URL or keyword/value format and will read the -environment for libpq style environment variables. - -Executing a Query - -ExecParams and ExecPrepared execute a single query. They return readers that iterate over each row. The Read method -reads all rows into memory. - -Executing Multiple Queries in a Single Round Trip - -Exec and ExecBatch can execute multiple queries in a single round trip. They return readers that iterate over each query -result. The ReadAll method reads all query results into memory. - -Pipeline Mode - -Pipeline mode allows sending queries without having read the results of previously sent queries. It allows control of -exactly how many and when network round trips occur. - -Context Support - -All potentially blocking operations take a context.Context. The default behavior when a context is canceled is for the -method to immediately return. In most circumstances, this will also close the underlying connection. This behavior can -be customized by using BuildContextWatcherHandler on the Config to create a ctxwatch.Handler with different behavior. -This can be especially useful when queries that are frequently canceled and the overhead of creating new connections is -a problem. DeadlineContextWatcherHandler and CancelRequestContextWatcherHandler can be used to introduce a delay before -interrupting the query in such a way as to close the connection. - -The CancelRequest method may be used to request the PostgreSQL server cancel an in-progress query without forcing the -client to abort. -*/ -package pgconn diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/errors.go b/vendor/github.com/jackc/pgx/v5/pgconn/errors.go deleted file mode 100644 index ec4a6d47c..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/errors.go +++ /dev/null @@ -1,248 +0,0 @@ -package pgconn - -import ( - "context" - "errors" - "fmt" - "net" - "net/url" - "regexp" - "strings" -) - -// SafeToRetry checks if the err is guaranteed to have occurred before sending any data to the server. -func SafeToRetry(err error) bool { - var retryableErr interface{ SafeToRetry() bool } - if errors.As(err, &retryableErr) { - return retryableErr.SafeToRetry() - } - return false -} - -// Timeout checks if err was caused by a timeout. To be specific, it is true if err was caused within pgconn by a -// context.DeadlineExceeded or an implementer of net.Error where Timeout() is true. -func Timeout(err error) bool { - var timeoutErr *errTimeout - return errors.As(err, &timeoutErr) -} - -// PgError represents an error reported by the PostgreSQL server. See -// http://www.postgresql.org/docs/11/static/protocol-error-fields.html for -// detailed field description. -type PgError struct { - Severity string - SeverityUnlocalized string - Code string - Message string - Detail string - Hint string - Position int32 - InternalPosition int32 - InternalQuery string - Where string - SchemaName string - TableName string - ColumnName string - DataTypeName string - ConstraintName string - File string - Line int32 - Routine string -} - -func (pe *PgError) Error() string { - return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")" -} - -// SQLState returns the SQLState of the error. -func (pe *PgError) SQLState() string { - return pe.Code -} - -// ConnectError is the error returned when a connection attempt fails. -type ConnectError struct { - Config *Config // The configuration that was used in the connection attempt. - err error -} - -func (e *ConnectError) Error() string { - prefix := fmt.Sprintf("failed to connect to `user=%s database=%s`:", e.Config.User, e.Config.Database) - details := e.err.Error() - if strings.Contains(details, "\n") { - return prefix + "\n\t" + strings.ReplaceAll(details, "\n", "\n\t") - } else { - return prefix + " " + details - } -} - -func (e *ConnectError) Unwrap() error { - return e.err -} - -type perDialConnectError struct { - address string - originalHostname string - err error -} - -func (e *perDialConnectError) Error() string { - return fmt.Sprintf("%s (%s): %s", e.address, e.originalHostname, e.err.Error()) -} - -func (e *perDialConnectError) Unwrap() error { - return e.err -} - -type connLockError struct { - status string -} - -func (e *connLockError) SafeToRetry() bool { - return true // a lock failure by definition happens before the connection is used. -} - -func (e *connLockError) Error() string { - return e.status -} - -// ParseConfigError is the error returned when a connection string cannot be parsed. -type ParseConfigError struct { - ConnString string // The connection string that could not be parsed. - msg string - err error -} - -func (e *ParseConfigError) Error() string { - // Now that ParseConfigError is public and ConnString is available to the developer, perhaps it would be better only - // return a static string. That would ensure that the error message cannot leak a password. The ConnString field would - // allow access to the original string if desired and Unwrap would allow access to the underlying error. - connString := redactPW(e.ConnString) - if e.err == nil { - return fmt.Sprintf("cannot parse `%s`: %s", connString, e.msg) - } - return fmt.Sprintf("cannot parse `%s`: %s (%s)", connString, e.msg, e.err.Error()) -} - -func (e *ParseConfigError) Unwrap() error { - return e.err -} - -func normalizeTimeoutError(ctx context.Context, err error) error { - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - if ctx.Err() == context.Canceled { - // Since the timeout was caused by a context cancellation, the actual error is context.Canceled not the timeout error. - return context.Canceled - } else if ctx.Err() == context.DeadlineExceeded { - return &errTimeout{err: ctx.Err()} - } else { - return &errTimeout{err: netErr} - } - } - return err -} - -type pgconnError struct { - msg string - err error - safeToRetry bool -} - -func (e *pgconnError) Error() string { - if e.msg == "" { - return e.err.Error() - } - if e.err == nil { - return e.msg - } - return fmt.Sprintf("%s: %s", e.msg, e.err.Error()) -} - -func (e *pgconnError) SafeToRetry() bool { - return e.safeToRetry -} - -func (e *pgconnError) Unwrap() error { - return e.err -} - -// errTimeout occurs when an error was caused by a timeout. Specifically, it wraps an error which is -// context.Canceled, context.DeadlineExceeded, or an implementer of net.Error where Timeout() is true. -type errTimeout struct { - err error -} - -func (e *errTimeout) Error() string { - return fmt.Sprintf("timeout: %s", e.err.Error()) -} - -func (e *errTimeout) SafeToRetry() bool { - return SafeToRetry(e.err) -} - -func (e *errTimeout) Unwrap() error { - return e.err -} - -type contextAlreadyDoneError struct { - err error -} - -func (e *contextAlreadyDoneError) Error() string { - return fmt.Sprintf("context already done: %s", e.err.Error()) -} - -func (e *contextAlreadyDoneError) SafeToRetry() bool { - return true -} - -func (e *contextAlreadyDoneError) Unwrap() error { - return e.err -} - -// newContextAlreadyDoneError double-wraps a context error in `contextAlreadyDoneError` and `errTimeout`. -func newContextAlreadyDoneError(ctx context.Context) (err error) { - return &errTimeout{&contextAlreadyDoneError{err: ctx.Err()}} -} - -func redactPW(connString string) string { - if strings.HasPrefix(connString, "postgres://") || strings.HasPrefix(connString, "postgresql://") { - if u, err := url.Parse(connString); err == nil { - return redactURL(u) - } - } - quotedKV := regexp.MustCompile(`password='[^']*'`) - connString = quotedKV.ReplaceAllLiteralString(connString, "password=xxxxx") - plainKV := regexp.MustCompile(`password=[^ ]*`) - connString = plainKV.ReplaceAllLiteralString(connString, "password=xxxxx") - brokenURL := regexp.MustCompile(`:[^:@]+?@`) - connString = brokenURL.ReplaceAllLiteralString(connString, ":xxxxxx@") - return connString -} - -func redactURL(u *url.URL) string { - if u == nil { - return "" - } - if _, pwSet := u.User.Password(); pwSet { - u.User = url.UserPassword(u.User.Username(), "xxxxx") - } - return u.String() -} - -type NotPreferredError struct { - err error - safeToRetry bool -} - -func (e *NotPreferredError) Error() string { - return fmt.Sprintf("standby server not found: %s", e.err.Error()) -} - -func (e *NotPreferredError) SafeToRetry() bool { - return e.safeToRetry -} - -func (e *NotPreferredError) Unwrap() error { - return e.err -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go b/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go deleted file mode 100644 index e65c2c2bf..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go +++ /dev/null @@ -1,139 +0,0 @@ -// Package bgreader provides a io.Reader that can optionally buffer reads in the background. -package bgreader - -import ( - "io" - "sync" - - "github.com/jackc/pgx/v5/internal/iobufpool" -) - -const ( - StatusStopped = iota - StatusRunning - StatusStopping -) - -// BGReader is an io.Reader that can optionally buffer reads in the background. It is safe for concurrent use. -type BGReader struct { - r io.Reader - - cond *sync.Cond - status int32 - readResults []readResult -} - -type readResult struct { - buf *[]byte - err error -} - -// Start starts the backgrounder reader. If the background reader is already running this is a no-op. The background -// reader will stop automatically when the underlying reader returns an error. -func (r *BGReader) Start() { - r.cond.L.Lock() - defer r.cond.L.Unlock() - - switch r.status { - case StatusStopped: - r.status = StatusRunning - go r.bgRead() - case StatusRunning: - // no-op - case StatusStopping: - r.status = StatusRunning - } -} - -// Stop tells the background reader to stop after the in progress Read returns. It is safe to call Stop when the -// background reader is not running. -func (r *BGReader) Stop() { - r.cond.L.Lock() - defer r.cond.L.Unlock() - - switch r.status { - case StatusStopped: - // no-op - case StatusRunning: - r.status = StatusStopping - case StatusStopping: - // no-op - } -} - -// Status returns the current status of the background reader. -func (r *BGReader) Status() int32 { - r.cond.L.Lock() - defer r.cond.L.Unlock() - return r.status -} - -func (r *BGReader) bgRead() { - keepReading := true - for keepReading { - buf := iobufpool.Get(8192) - n, err := r.r.Read(*buf) - *buf = (*buf)[:n] - - r.cond.L.Lock() - r.readResults = append(r.readResults, readResult{buf: buf, err: err}) - if r.status == StatusStopping || err != nil { - r.status = StatusStopped - keepReading = false - } - r.cond.L.Unlock() - r.cond.Broadcast() - } -} - -// Read implements the io.Reader interface. -func (r *BGReader) Read(p []byte) (int, error) { - r.cond.L.Lock() - defer r.cond.L.Unlock() - - if len(r.readResults) > 0 { - return r.readFromReadResults(p) - } - - // There are no unread background read results and the background reader is stopped. - if r.status == StatusStopped { - return r.r.Read(p) - } - - // Wait for results from the background reader - for len(r.readResults) == 0 { - r.cond.Wait() - } - return r.readFromReadResults(p) -} - -// readBackgroundResults reads a result previously read by the background reader. r.cond.L must be held. -func (r *BGReader) readFromReadResults(p []byte) (int, error) { - buf := r.readResults[0].buf - var err error - - n := copy(p, *buf) - if n == len(*buf) { - err = r.readResults[0].err - iobufpool.Put(buf) - if len(r.readResults) == 1 { - r.readResults = nil - } else { - r.readResults = r.readResults[1:] - } - } else { - *buf = (*buf)[n:] - r.readResults[0].buf = buf - } - - return n, err -} - -func New(r io.Reader) *BGReader { - return &BGReader{ - r: r, - cond: &sync.Cond{ - L: &sync.Mutex{}, - }, - } -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/krb5.go b/vendor/github.com/jackc/pgx/v5/pgconn/krb5.go deleted file mode 100644 index 3c1af3477..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/krb5.go +++ /dev/null @@ -1,100 +0,0 @@ -package pgconn - -import ( - "errors" - "fmt" - - "github.com/jackc/pgx/v5/pgproto3" -) - -// NewGSSFunc creates a GSS authentication provider, for use with -// RegisterGSSProvider. -type NewGSSFunc func() (GSS, error) - -var newGSS NewGSSFunc - -// RegisterGSSProvider registers a GSS authentication provider. For example, if -// you need to use Kerberos to authenticate with your server, add this to your -// main package: -// -// import "github.com/otan/gopgkrb5" -// -// func init() { -// pgconn.RegisterGSSProvider(func() (pgconn.GSS, error) { return gopgkrb5.NewGSS() }) -// } -func RegisterGSSProvider(newGSSArg NewGSSFunc) { - newGSS = newGSSArg -} - -// GSS provides GSSAPI authentication (e.g., Kerberos). -type GSS interface { - GetInitToken(host string, service string) ([]byte, error) - GetInitTokenFromSPN(spn string) ([]byte, error) - Continue(inToken []byte) (done bool, outToken []byte, err error) -} - -func (c *PgConn) gssAuth() error { - if newGSS == nil { - return errors.New("kerberos error: no GSSAPI provider registered, see https://github.com/otan/gopgkrb5") - } - cli, err := newGSS() - if err != nil { - return err - } - - var nextData []byte - if c.config.KerberosSpn != "" { - // Use the supplied SPN if provided. - nextData, err = cli.GetInitTokenFromSPN(c.config.KerberosSpn) - } else { - // Allow the kerberos service name to be overridden - service := "postgres" - if c.config.KerberosSrvName != "" { - service = c.config.KerberosSrvName - } - nextData, err = cli.GetInitToken(c.config.Host, service) - } - if err != nil { - return err - } - - for { - gssResponse := &pgproto3.GSSResponse{ - Data: nextData, - } - c.frontend.Send(gssResponse) - err = c.flushWithPotentialWriteReadDeadlock() - if err != nil { - return err - } - resp, err := c.rxGSSContinue() - if err != nil { - return err - } - var done bool - done, nextData, err = cli.Continue(resp.Data) - if err != nil { - return err - } - if done { - break - } - } - return nil -} - -func (c *PgConn) rxGSSContinue() (*pgproto3.AuthenticationGSSContinue, error) { - msg, err := c.receiveMessage() - if err != nil { - return nil, err - } - - switch m := msg.(type) { - case *pgproto3.AuthenticationGSSContinue: - return m, nil - case *pgproto3.ErrorResponse: - return nil, ErrorResponseToPgError(m) - } - - return nil, fmt.Errorf("expected AuthenticationGSSContinue message but received unexpected message %T", msg) -} diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go deleted file mode 100644 index 7efb522a4..000000000 --- a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go +++ /dev/null @@ -1,2346 +0,0 @@ -package pgconn - -import ( - "context" - "crypto/md5" - "crypto/tls" - "encoding/binary" - "encoding/hex" - "errors" - "fmt" - "io" - "math" - "net" - "strconv" - "strings" - "sync" - "time" - - "github.com/jackc/pgx/v5/internal/iobufpool" - "github.com/jackc/pgx/v5/internal/pgio" - "github.com/jackc/pgx/v5/pgconn/ctxwatch" - "github.com/jackc/pgx/v5/pgconn/internal/bgreader" - "github.com/jackc/pgx/v5/pgproto3" -) - -const ( - connStatusUninitialized = iota - connStatusConnecting - connStatusClosed - connStatusIdle - connStatusBusy -) - -// Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from -// LISTEN/NOTIFY notification. -type Notice PgError - -// Notification is a message received from the PostgreSQL LISTEN/NOTIFY system -type Notification struct { - PID uint32 // backend pid that sent the notification - Channel string // channel from which notification was received - Payload string -} - -// DialFunc is a function that can be used to connect to a PostgreSQL server. -type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) - -// LookupFunc is a function that can be used to lookup IPs addrs from host. Optionally an ip:port combination can be -// returned in order to override the connection string's port. -type LookupFunc func(ctx context.Context, host string) (addrs []string, err error) - -// BuildFrontendFunc is a function that can be used to create Frontend implementation for connection. -type BuildFrontendFunc func(r io.Reader, w io.Writer) *pgproto3.Frontend - -// PgErrorHandler is a function that handles errors returned from Postgres. This function must return true to keep -// the connection open. Returning false will cause the connection to be closed immediately. You should return -// false on any FATAL-severity errors. This will not receive network errors. The *PgConn is provided so the handler is -// aware of the origin of the error, but it must not invoke any query method. -type PgErrorHandler func(*PgConn, *PgError) bool - -// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at -// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin -// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY -// notification. -type NoticeHandler func(*PgConn, *Notice) - -// NotificationHandler is a function that can handle notifications received from the PostgreSQL server. Notifications -// can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is -// aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from a -// notice event. -type NotificationHandler func(*PgConn, *Notification) - -// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage. -type PgConn struct { - conn net.Conn - pid uint32 // backend pid - secretKey uint32 // key to use to send a cancel query message to the server - parameterStatuses map[string]string // parameters that have been reported by the server - txStatus byte - frontend *pgproto3.Frontend - bgReader *bgreader.BGReader - slowWriteTimer *time.Timer - bgReaderStarted chan struct{} - - customData map[string]any - - config *Config - - status byte // One of connStatus* constants - - bufferingReceive bool - bufferingReceiveMux sync.Mutex - bufferingReceiveMsg pgproto3.BackendMessage - bufferingReceiveErr error - - peekedMsg pgproto3.BackendMessage - - // Reusable / preallocated resources - resultReader ResultReader - multiResultReader MultiResultReader - pipeline Pipeline - contextWatcher *ctxwatch.ContextWatcher - fieldDescriptions [16]FieldDescription - - cleanupDone chan struct{} -} - -// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value -// format) to provide configuration. See documentation for [ParseConfig] for details. ctx can be used to cancel a -// connect attempt. -func Connect(ctx context.Context, connString string) (*PgConn, error) { - config, err := ParseConfig(connString) - if err != nil { - return nil, err - } - - return ConnectConfig(ctx, config) -} - -// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or keyword/value -// format) and ParseConfigOptions to provide additional configuration. See documentation for [ParseConfig] for details. -// ctx can be used to cancel a connect attempt. -func ConnectWithOptions(ctx context.Context, connString string, parseConfigOptions ParseConfigOptions) (*PgConn, error) { - config, err := ParseConfigWithOptions(connString, parseConfigOptions) - if err != nil { - return nil, err - } - - return ConnectConfig(ctx, config) -} - -// Connect establishes a connection to a PostgreSQL server using config. config must have been constructed with -// [ParseConfig]. ctx can be used to cancel a connect attempt. -// -// If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An -// authentication error will terminate the chain of attempts (like libpq: -// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. -func ConnectConfig(ctx context.Context, config *Config) (*PgConn, error) { - // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from - // zero values. - if !config.createdByParseConfig { - panic("config must be created by ParseConfig") - } - - var allErrors []error - - connectConfigs, errs := buildConnectOneConfigs(ctx, config) - if len(errs) > 0 { - allErrors = append(allErrors, errs...) - } - - if len(connectConfigs) == 0 { - return nil, &ConnectError{Config: config, err: fmt.Errorf("hostname resolving error: %w", errors.Join(allErrors...))} - } - - pgConn, errs := connectPreferred(ctx, config, connectConfigs) - if len(errs) > 0 { - allErrors = append(allErrors, errs...) - return nil, &ConnectError{Config: config, err: errors.Join(allErrors...)} - } - - if config.AfterConnect != nil { - err := config.AfterConnect(ctx, pgConn) - if err != nil { - pgConn.conn.Close() - return nil, &ConnectError{Config: config, err: fmt.Errorf("AfterConnect error: %w", err)} - } - } - - return pgConn, nil -} - -// buildConnectOneConfigs resolves hostnames and builds a list of connectOneConfigs to try connecting to. It returns a -// slice of successfully resolved connectOneConfigs and a slice of errors. It is possible for both slices to contain -// values if some hosts were successfully resolved and others were not. -func buildConnectOneConfigs(ctx context.Context, config *Config) ([]*connectOneConfig, []error) { - // Simplify usage by treating primary config and fallbacks the same. - fallbackConfigs := []*FallbackConfig{ - { - Host: config.Host, - Port: config.Port, - TLSConfig: config.TLSConfig, - }, - } - fallbackConfigs = append(fallbackConfigs, config.Fallbacks...) - - var configs []*connectOneConfig - - var allErrors []error - - for _, fb := range fallbackConfigs { - // skip resolve for unix sockets - if isAbsolutePath(fb.Host) { - network, address := NetworkAddress(fb.Host, fb.Port) - configs = append(configs, &connectOneConfig{ - network: network, - address: address, - originalHostname: fb.Host, - tlsConfig: fb.TLSConfig, - }) - - continue - } - - ips, err := config.LookupFunc(ctx, fb.Host) - if err != nil { - allErrors = append(allErrors, err) - continue - } - - for _, ip := range ips { - splitIP, splitPort, err := net.SplitHostPort(ip) - if err == nil { - port, err := strconv.ParseUint(splitPort, 10, 16) - if err != nil { - return nil, []error{fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err)} - } - network, address := NetworkAddress(splitIP, uint16(port)) - configs = append(configs, &connectOneConfig{ - network: network, - address: address, - originalHostname: fb.Host, - tlsConfig: fb.TLSConfig, - }) - } else { - network, address := NetworkAddress(ip, fb.Port) - configs = append(configs, &connectOneConfig{ - network: network, - address: address, - originalHostname: fb.Host, - tlsConfig: fb.TLSConfig, - }) - } - } - } - - return configs, allErrors -} - -// connectPreferred attempts to connect to the preferred host from connectOneConfigs. The connections are attempted in -// order. If a connection is successful it is returned. If no connection is successful then all errors are returned. If -// a connection attempt returns a [NotPreferredError], then that host will be used if no other hosts are successful. -func connectPreferred(ctx context.Context, config *Config, connectOneConfigs []*connectOneConfig) (*PgConn, []error) { - octx := ctx - var allErrors []error - - var fallbackConnectOneConfig *connectOneConfig - for i, c := range connectOneConfigs { - // ConnectTimeout restricts the whole connection process. - if config.ConnectTimeout != 0 { - // create new context first time or when previous host was different - if i == 0 || (connectOneConfigs[i].address != connectOneConfigs[i-1].address) { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(octx, config.ConnectTimeout) - defer cancel() - } - } else { - ctx = octx - } - - pgConn, err := connectOne(ctx, config, c, false) - if pgConn != nil { - return pgConn, nil - } - - allErrors = append(allErrors, err) - - var pgErr *PgError - if errors.As(err, &pgErr) { - const ERRCODE_INVALID_PASSWORD = "28P01" // wrong password - const ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION = "28000" // wrong password or bad pg_hba.conf settings - const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist - const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege - if pgErr.Code == ERRCODE_INVALID_PASSWORD || - pgErr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && c.tlsConfig != nil || - pgErr.Code == ERRCODE_INVALID_CATALOG_NAME || - pgErr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE { - return nil, allErrors - } - } - - var npErr *NotPreferredError - if errors.As(err, &npErr) { - fallbackConnectOneConfig = c - } - } - - if fallbackConnectOneConfig != nil { - pgConn, err := connectOne(ctx, config, fallbackConnectOneConfig, true) - if err == nil { - return pgConn, nil - } - allErrors = append(allErrors, err) - } - - return nil, allErrors -} - -// connectOne makes one connection attempt to a single host. -func connectOne(ctx context.Context, config *Config, connectConfig *connectOneConfig, - ignoreNotPreferredErr bool, -) (*PgConn, error) { - pgConn := new(PgConn) - pgConn.config = config - pgConn.cleanupDone = make(chan struct{}) - pgConn.customData = make(map[string]any) - - var err error - - newPerDialConnectError := func(msg string, err error) *perDialConnectError { - err = normalizeTimeoutError(ctx, err) - e := &perDialConnectError{address: connectConfig.address, originalHostname: connectConfig.originalHostname, err: fmt.Errorf("%s: %w", msg, err)} - return e - } - - pgConn.conn, err = config.DialFunc(ctx, connectConfig.network, connectConfig.address) - if err != nil { - return nil, newPerDialConnectError("dial error", err) - } - - if connectConfig.tlsConfig != nil { - pgConn.contextWatcher = ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: pgConn.conn}) - pgConn.contextWatcher.Watch(ctx) - tlsConn, err := startTLS(pgConn.conn, connectConfig.tlsConfig) - pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS. - if err != nil { - pgConn.conn.Close() - return nil, newPerDialConnectError("tls error", err) - } - - pgConn.conn = tlsConn - } - - pgConn.contextWatcher = ctxwatch.NewContextWatcher(config.BuildContextWatcherHandler(pgConn)) - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - - pgConn.parameterStatuses = make(map[string]string) - pgConn.status = connStatusConnecting - pgConn.bgReader = bgreader.New(pgConn.conn) - pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), - func() { - pgConn.bgReader.Start() - pgConn.bgReaderStarted <- struct{}{} - }, - ) - pgConn.slowWriteTimer.Stop() - pgConn.bgReaderStarted = make(chan struct{}) - pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn) - - startupMsg := pgproto3.StartupMessage{ - ProtocolVersion: pgproto3.ProtocolVersionNumber, - Parameters: make(map[string]string), - } - - // Copy default run-time params - for k, v := range config.RuntimeParams { - startupMsg.Parameters[k] = v - } - - startupMsg.Parameters["user"] = config.User - if config.Database != "" { - startupMsg.Parameters["database"] = config.Database - } - - pgConn.frontend.Send(&startupMsg) - if err := pgConn.flushWithPotentialWriteReadDeadlock(); err != nil { - pgConn.conn.Close() - return nil, newPerDialConnectError("failed to write startup message", err) - } - - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.conn.Close() - if err, ok := err.(*PgError); ok { - return nil, newPerDialConnectError("server error", err) - } - return nil, newPerDialConnectError("failed to receive message", err) - } - - switch msg := msg.(type) { - case *pgproto3.BackendKeyData: - pgConn.pid = msg.ProcessID - pgConn.secretKey = msg.SecretKey - - case *pgproto3.AuthenticationOk: - case *pgproto3.AuthenticationCleartextPassword: - err = pgConn.txPasswordMessage(pgConn.config.Password) - if err != nil { - pgConn.conn.Close() - return nil, newPerDialConnectError("failed to write password message", err) - } - case *pgproto3.AuthenticationMD5Password: - digestedPassword := "md5" + hexMD5(hexMD5(pgConn.config.Password+pgConn.config.User)+string(msg.Salt[:])) - err = pgConn.txPasswordMessage(digestedPassword) - if err != nil { - pgConn.conn.Close() - return nil, newPerDialConnectError("failed to write password message", err) - } - case *pgproto3.AuthenticationSASL: - err = pgConn.scramAuth(msg.AuthMechanisms) - if err != nil { - pgConn.conn.Close() - return nil, newPerDialConnectError("failed SASL auth", err) - } - case *pgproto3.AuthenticationGSS: - err = pgConn.gssAuth() - if err != nil { - pgConn.conn.Close() - return nil, newPerDialConnectError("failed GSS auth", err) - } - case *pgproto3.ReadyForQuery: - pgConn.status = connStatusIdle - if config.ValidateConnect != nil { - // ValidateConnect may execute commands that cause the context to be watched again. Unwatch first to avoid - // the watch already in progress panic. This is that last thing done by this method so there is no need to - // restart the watch after ValidateConnect returns. - // - // See https://github.com/jackc/pgconn/issues/40. - pgConn.contextWatcher.Unwatch() - - err := config.ValidateConnect(ctx, pgConn) - if err != nil { - if _, ok := err.(*NotPreferredError); ignoreNotPreferredErr && ok { - return pgConn, nil - } - pgConn.conn.Close() - return nil, newPerDialConnectError("ValidateConnect failed", err) - } - } - return pgConn, nil - case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse: - // handled by ReceiveMessage - case *pgproto3.ErrorResponse: - pgConn.conn.Close() - return nil, newPerDialConnectError("server error", ErrorResponseToPgError(msg)) - default: - pgConn.conn.Close() - return nil, newPerDialConnectError("received unexpected message", err) - } - } -} - -func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) { - err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103}) - if err != nil { - return nil, err - } - - response := make([]byte, 1) - if _, err = io.ReadFull(conn, response); err != nil { - return nil, err - } - - if response[0] != 'S' { - return nil, errors.New("server refused TLS connection") - } - - return tls.Client(conn, tlsConfig), nil -} - -func (pgConn *PgConn) txPasswordMessage(password string) (err error) { - pgConn.frontend.Send(&pgproto3.PasswordMessage{Password: password}) - return pgConn.flushWithPotentialWriteReadDeadlock() -} - -func hexMD5(s string) string { - hash := md5.New() - io.WriteString(hash, s) - return hex.EncodeToString(hash.Sum(nil)) -} - -func (pgConn *PgConn) signalMessage() chan struct{} { - if pgConn.bufferingReceive { - panic("BUG: signalMessage when already in progress") - } - - pgConn.bufferingReceive = true - pgConn.bufferingReceiveMux.Lock() - - ch := make(chan struct{}) - go func() { - pgConn.bufferingReceiveMsg, pgConn.bufferingReceiveErr = pgConn.frontend.Receive() - pgConn.bufferingReceiveMux.Unlock() - close(ch) - }() - - return ch -} - -// ReceiveMessage receives one wire protocol message from the PostgreSQL server. It must only be used when the -// connection is not busy. e.g. It is an error to call ReceiveMessage while reading the result of a query. The messages -// are still handled by the core pgconn message handling system so receiving a NotificationResponse will still trigger -// the OnNotification callback. -// -// This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. -// See https://www.postgresql.org/docs/current/protocol.html. -func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return nil, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - msg, err := pgConn.receiveMessage() - if err != nil { - err = &pgconnError{ - msg: "receive message failed", - err: normalizeTimeoutError(ctx, err), - safeToRetry: true, - } - } - return msg, err -} - -// peekMessage peeks at the next message without setting up context cancellation. -func (pgConn *PgConn) peekMessage() (pgproto3.BackendMessage, error) { - if pgConn.peekedMsg != nil { - return pgConn.peekedMsg, nil - } - - var msg pgproto3.BackendMessage - var err error - if pgConn.bufferingReceive { - pgConn.bufferingReceiveMux.Lock() - msg = pgConn.bufferingReceiveMsg - err = pgConn.bufferingReceiveErr - pgConn.bufferingReceiveMux.Unlock() - pgConn.bufferingReceive = false - - // If a timeout error happened in the background try the read again. - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - msg, err = pgConn.frontend.Receive() - } - } else { - msg, err = pgConn.frontend.Receive() - } - - if err != nil { - // Close on anything other than timeout error - everything else is fatal - var netErr net.Error - isNetErr := errors.As(err, &netErr) - if !(isNetErr && netErr.Timeout()) { - pgConn.asyncClose() - } - - return nil, err - } - - pgConn.peekedMsg = msg - return msg, nil -} - -// receiveMessage receives a message without setting up context cancellation -func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) { - msg, err := pgConn.peekMessage() - if err != nil { - return nil, err - } - pgConn.peekedMsg = nil - - switch msg := msg.(type) { - case *pgproto3.ReadyForQuery: - pgConn.txStatus = msg.TxStatus - case *pgproto3.ParameterStatus: - pgConn.parameterStatuses[msg.Name] = msg.Value - case *pgproto3.ErrorResponse: - err := ErrorResponseToPgError(msg) - if pgConn.config.OnPgError != nil && !pgConn.config.OnPgError(pgConn, err) { - pgConn.status = connStatusClosed - pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. - close(pgConn.cleanupDone) - return nil, err - } - case *pgproto3.NoticeResponse: - if pgConn.config.OnNotice != nil { - pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg)) - } - case *pgproto3.NotificationResponse: - if pgConn.config.OnNotification != nil { - pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload}) - } - } - - return msg, nil -} - -// Conn returns the underlying net.Conn. This rarely necessary. If the connection will be directly used for reading or -// writing then SyncConn should usually be called before Conn. -func (pgConn *PgConn) Conn() net.Conn { - return pgConn.conn -} - -// PID returns the backend PID. -func (pgConn *PgConn) PID() uint32 { - return pgConn.pid -} - -// TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message. -// -// Possible return values: -// -// 'I' - idle / not in transaction -// 'T' - in a transaction -// 'E' - in a failed transaction -// -// See https://www.postgresql.org/docs/current/protocol-message-formats.html. -func (pgConn *PgConn) TxStatus() byte { - return pgConn.txStatus -} - -// SecretKey returns the backend secret key used to send a cancel query message to the server. -func (pgConn *PgConn) SecretKey() uint32 { - return pgConn.secretKey -} - -// Frontend returns the underlying *pgproto3.Frontend. This rarely necessary. -func (pgConn *PgConn) Frontend() *pgproto3.Frontend { - return pgConn.frontend -} - -// Close closes a connection. It is safe to call Close on an already closed connection. Close attempts a clean close by -// sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The -// underlying net.Conn.Close() will always be called regardless of any other errors. -func (pgConn *PgConn) Close(ctx context.Context) error { - if pgConn.status == connStatusClosed { - return nil - } - pgConn.status = connStatusClosed - - defer close(pgConn.cleanupDone) - defer pgConn.conn.Close() - - if ctx != context.Background() { - // Close may be called while a cancellable query is in progress. This will most often be triggered by panic when - // a defer closes the connection (possibly indirectly via a transaction or a connection pool). Unwatch to end any - // previous watch. It is safe to Unwatch regardless of whether a watch is already is progress. - // - // See https://github.com/jackc/pgconn/issues/29 - pgConn.contextWatcher.Unwatch() - - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - // Ignore any errors sending Terminate message and waiting for server to close connection. - // This mimics the behavior of libpq PQfinish. It calls closePGconn which calls sendTerminateConn which purposefully - // ignores errors. - // - // See https://github.com/jackc/pgx/issues/637 - pgConn.frontend.Send(&pgproto3.Terminate{}) - pgConn.flushWithPotentialWriteReadDeadlock() - - return pgConn.conn.Close() -} - -// asyncClose marks the connection as closed and asynchronously sends a cancel query message and closes the underlying -// connection. -func (pgConn *PgConn) asyncClose() { - if pgConn.status == connStatusClosed { - return - } - pgConn.status = connStatusClosed - - go func() { - defer close(pgConn.cleanupDone) - defer pgConn.conn.Close() - - deadline := time.Now().Add(time.Second * 15) - - ctx, cancel := context.WithDeadline(context.Background(), deadline) - defer cancel() - - pgConn.CancelRequest(ctx) - - pgConn.conn.SetDeadline(deadline) - - pgConn.frontend.Send(&pgproto3.Terminate{}) - pgConn.flushWithPotentialWriteReadDeadlock() - }() -} - -// CleanupDone returns a channel that will be closed after all underlying resources have been cleaned up. A closed -// connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing -// yet. This is because certain errors such as a context cancellation require that the interrupted function call return -// immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are -// closed asynchronously. -// -// This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while -// an old connection is still being cleaned up and thereby exceeding the maximum pool size. -func (pgConn *PgConn) CleanupDone() chan (struct{}) { - return pgConn.cleanupDone -} - -// IsClosed reports if the connection has been closed. -// -// CleanupDone() can be used to determine if all cleanup has been completed. -func (pgConn *PgConn) IsClosed() bool { - return pgConn.status < connStatusIdle -} - -// IsBusy reports if the connection is busy. -func (pgConn *PgConn) IsBusy() bool { - return pgConn.status == connStatusBusy -} - -// lock locks the connection. -func (pgConn *PgConn) lock() error { - switch pgConn.status { - case connStatusBusy: - return &connLockError{status: "conn busy"} // This only should be possible in case of an application bug. - case connStatusClosed: - return &connLockError{status: "conn closed"} - case connStatusUninitialized: - return &connLockError{status: "conn uninitialized"} - } - pgConn.status = connStatusBusy - return nil -} - -func (pgConn *PgConn) unlock() { - switch pgConn.status { - case connStatusBusy: - pgConn.status = connStatusIdle - case connStatusClosed: - default: - panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package. - } -} - -// ParameterStatus returns the value of a parameter reported by the server (e.g. -// server_version). Returns an empty string for unknown parameters. -func (pgConn *PgConn) ParameterStatus(key string) string { - return pgConn.parameterStatuses[key] -} - -// CommandTag is the status text returned by PostgreSQL for a query. -type CommandTag struct { - s string -} - -// NewCommandTag makes a CommandTag from s. -func NewCommandTag(s string) CommandTag { - return CommandTag{s: s} -} - -// RowsAffected returns the number of rows affected. If the CommandTag was not -// for a row affecting command (e.g. "CREATE TABLE") then it returns 0. -func (ct CommandTag) RowsAffected() int64 { - // Find last non-digit - idx := -1 - for i := len(ct.s) - 1; i >= 0; i-- { - if ct.s[i] >= '0' && ct.s[i] <= '9' { - idx = i - } else { - break - } - } - - if idx == -1 { - return 0 - } - - var n int64 - for _, b := range ct.s[idx:] { - n = n*10 + int64(b-'0') - } - - return n -} - -func (ct CommandTag) String() string { - return ct.s -} - -// Insert is true if the command tag starts with "INSERT". -func (ct CommandTag) Insert() bool { - return strings.HasPrefix(ct.s, "INSERT") -} - -// Update is true if the command tag starts with "UPDATE". -func (ct CommandTag) Update() bool { - return strings.HasPrefix(ct.s, "UPDATE") -} - -// Delete is true if the command tag starts with "DELETE". -func (ct CommandTag) Delete() bool { - return strings.HasPrefix(ct.s, "DELETE") -} - -// Select is true if the command tag starts with "SELECT". -func (ct CommandTag) Select() bool { - return strings.HasPrefix(ct.s, "SELECT") -} - -type FieldDescription struct { - Name string - TableOID uint32 - TableAttributeNumber uint16 - DataTypeOID uint32 - DataTypeSize int16 - TypeModifier int32 - Format int16 -} - -func (pgConn *PgConn) convertRowDescription(dst []FieldDescription, rd *pgproto3.RowDescription) []FieldDescription { - if cap(dst) >= len(rd.Fields) { - dst = dst[:len(rd.Fields):len(rd.Fields)] - } else { - dst = make([]FieldDescription, len(rd.Fields)) - } - - for i := range rd.Fields { - dst[i].Name = string(rd.Fields[i].Name) - dst[i].TableOID = rd.Fields[i].TableOID - dst[i].TableAttributeNumber = rd.Fields[i].TableAttributeNumber - dst[i].DataTypeOID = rd.Fields[i].DataTypeOID - dst[i].DataTypeSize = rd.Fields[i].DataTypeSize - dst[i].TypeModifier = rd.Fields[i].TypeModifier - dst[i].Format = rd.Fields[i].Format - } - - return dst -} - -type StatementDescription struct { - Name string - SQL string - ParamOIDs []uint32 - Fields []FieldDescription -} - -// Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This -// allows Prepare to also to describe statements without creating a server-side prepared statement. -// -// Prepare does not send a PREPARE statement to the server. It uses the PostgreSQL Parse and Describe protocol messages -// directly. -func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return nil, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - pgConn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) - pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) - pgConn.frontend.SendSync(&pgproto3.Sync{}) - err := pgConn.flushWithPotentialWriteReadDeadlock() - if err != nil { - pgConn.asyncClose() - return nil, err - } - - psd := &StatementDescription{Name: name, SQL: sql} - - var parseErr error - -readloop: - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return nil, normalizeTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.ParameterDescription: - psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) - copy(psd.ParamOIDs, msg.ParameterOIDs) - case *pgproto3.RowDescription: - psd.Fields = pgConn.convertRowDescription(nil, msg) - case *pgproto3.ErrorResponse: - parseErr = ErrorResponseToPgError(msg) - case *pgproto3.ReadyForQuery: - break readloop - } - } - - if parseErr != nil { - return nil, parseErr - } - return psd, nil -} - -// Deallocate deallocates a prepared statement. -// -// Deallocate does not send a DEALLOCATE statement to the server. It uses the PostgreSQL Close protocol message -// directly. This has slightly different behavior than executing DEALLOCATE statement. -// - Deallocate can succeed in an aborted transaction. -// - Deallocating a non-existent prepared statement is not an error. -func (pgConn *PgConn) Deallocate(ctx context.Context, name string) error { - if err := pgConn.lock(); err != nil { - return err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - pgConn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) - pgConn.frontend.SendSync(&pgproto3.Sync{}) - err := pgConn.flushWithPotentialWriteReadDeadlock() - if err != nil { - pgConn.asyncClose() - return err - } - - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return normalizeTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.ErrorResponse: - return ErrorResponseToPgError(msg) - case *pgproto3.ReadyForQuery: - return nil - } - } -} - -// ErrorResponseToPgError converts a wire protocol error message to a *PgError. -func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError { - return &PgError{ - Severity: msg.Severity, - SeverityUnlocalized: msg.SeverityUnlocalized, - Code: string(msg.Code), - Message: string(msg.Message), - Detail: string(msg.Detail), - Hint: msg.Hint, - Position: msg.Position, - InternalPosition: msg.InternalPosition, - InternalQuery: string(msg.InternalQuery), - Where: string(msg.Where), - SchemaName: string(msg.SchemaName), - TableName: string(msg.TableName), - ColumnName: string(msg.ColumnName), - DataTypeName: string(msg.DataTypeName), - ConstraintName: msg.ConstraintName, - File: string(msg.File), - Line: msg.Line, - Routine: string(msg.Routine), - } -} - -func noticeResponseToNotice(msg *pgproto3.NoticeResponse) *Notice { - pgerr := ErrorResponseToPgError((*pgproto3.ErrorResponse)(msg)) - return (*Notice)(pgerr) -} - -// CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel -// request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there -// is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9 -func (pgConn *PgConn) CancelRequest(ctx context.Context) error { - // Open a cancellation request to the same server. The address is taken from the net.Conn directly instead of reusing - // the connection config. This is important in high availability configurations where fallback connections may be - // specified or DNS may be used to load balance. - serverAddr := pgConn.conn.RemoteAddr() - var serverNetwork string - var serverAddress string - if serverAddr.Network() == "unix" { - // for unix sockets, RemoteAddr() calls getpeername() which returns the name the - // server passed to bind(). For Postgres, this is always a relative path "./.s.PGSQL.5432" - // so connecting to it will fail. Fall back to the config's value - serverNetwork, serverAddress = NetworkAddress(pgConn.config.Host, pgConn.config.Port) - } else { - serverNetwork, serverAddress = serverAddr.Network(), serverAddr.String() - } - cancelConn, err := pgConn.config.DialFunc(ctx, serverNetwork, serverAddress) - if err != nil { - // In case of unix sockets, RemoteAddr() returns only the file part of the path. If the - // first connect failed, try the config. - if serverAddr.Network() != "unix" { - return err - } - serverNetwork, serverAddr := NetworkAddress(pgConn.config.Host, pgConn.config.Port) - cancelConn, err = pgConn.config.DialFunc(ctx, serverNetwork, serverAddr) - if err != nil { - return err - } - } - defer cancelConn.Close() - - if ctx != context.Background() { - contextWatcher := ctxwatch.NewContextWatcher(&DeadlineContextWatcherHandler{Conn: cancelConn}) - contextWatcher.Watch(ctx) - defer contextWatcher.Unwatch() - } - - buf := make([]byte, 16) - binary.BigEndian.PutUint32(buf[0:4], 16) - binary.BigEndian.PutUint32(buf[4:8], 80877102) - binary.BigEndian.PutUint32(buf[8:12], pgConn.pid) - binary.BigEndian.PutUint32(buf[12:16], pgConn.secretKey) - - if _, err := cancelConn.Write(buf); err != nil { - return fmt.Errorf("write to connection for cancellation: %w", err) - } - - // Wait for the cancel request to be acknowledged by the server. - // It copies the behavior of the libpq: https://github.com/postgres/postgres/blob/REL_16_0/src/interfaces/libpq/fe-connect.c#L4946-L4960 - _, _ = cancelConn.Read(buf) - - return nil -} - -// WaitForNotification waits for a LISTEN/NOTIFY message to be received. It returns an error if a notification was not -// received. -func (pgConn *PgConn) WaitForNotification(ctx context.Context) error { - if err := pgConn.lock(); err != nil { - return err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return newContextAlreadyDoneError(ctx) - default: - } - - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - for { - msg, err := pgConn.receiveMessage() - if err != nil { - return normalizeTimeoutError(ctx, err) - } - - switch msg.(type) { - case *pgproto3.NotificationResponse: - return nil - } - } -} - -// Exec executes SQL via the PostgreSQL simple query protocol. SQL may contain multiple queries. Execution is -// implicitly wrapped in a transaction unless a transaction is already in progress or SQL contains transaction control -// statements. -// -// Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries. -func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader { - if err := pgConn.lock(); err != nil { - return &MultiResultReader{ - closed: true, - err: err, - } - } - - pgConn.multiResultReader = MultiResultReader{ - pgConn: pgConn, - ctx: ctx, - } - multiResult := &pgConn.multiResultReader - if ctx != context.Background() { - select { - case <-ctx.Done(): - multiResult.closed = true - multiResult.err = newContextAlreadyDoneError(ctx) - pgConn.unlock() - return multiResult - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) - err := pgConn.flushWithPotentialWriteReadDeadlock() - if err != nil { - pgConn.asyncClose() - pgConn.contextWatcher.Unwatch() - multiResult.closed = true - multiResult.err = err - pgConn.unlock() - return multiResult - } - - return multiResult -} - -// ExecParams executes a command via the PostgreSQL extended query protocol. -// -// sql is a SQL command string. It may only contain one query. Parameter substitution is positional using $1, $2, $3, -// etc. -// -// paramValues are the parameter values. It must be encoded in the format given by paramFormats. -// -// paramOIDs is a slice of data type OIDs for paramValues. If paramOIDs is nil, the server will infer the data type for -// all parameters. Any paramOID element that is 0 that will cause the server to infer the data type for that parameter. -// ExecParams will panic if len(paramOIDs) is not 0, 1, or len(paramValues). -// -// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or -// binary format. If paramFormats is nil all params are text format. ExecParams will panic if -// len(paramFormats) is not 0, 1, or len(paramValues). -// -// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or -// binary format. If resultFormats is nil all results will be in text format. -// -// ResultReader must be closed before PgConn can be used again. -func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader { - result := pgConn.execExtendedPrefix(ctx, paramValues) - if result.closed { - return result - } - - pgConn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) - pgConn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) - - pgConn.execExtendedSuffix(result) - - return result -} - -// ExecPrepared enqueues the execution of a prepared statement via the PostgreSQL extended query protocol. -// -// paramValues are the parameter values. It must be encoded in the format given by paramFormats. -// -// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or -// binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if -// len(paramFormats) is not 0, 1, or len(paramValues). -// -// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or -// binary format. If resultFormats is nil all results will be in text format. -// -// ResultReader must be closed before PgConn can be used again. -func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader { - result := pgConn.execExtendedPrefix(ctx, paramValues) - if result.closed { - return result - } - - pgConn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) - - pgConn.execExtendedSuffix(result) - - return result -} - -func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]byte) *ResultReader { - pgConn.resultReader = ResultReader{ - pgConn: pgConn, - ctx: ctx, - } - result := &pgConn.resultReader - - if err := pgConn.lock(); err != nil { - result.concludeCommand(CommandTag{}, err) - result.closed = true - return result - } - - if len(paramValues) > math.MaxUint16 { - result.concludeCommand(CommandTag{}, fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16)) - result.closed = true - pgConn.unlock() - return result - } - - if ctx != context.Background() { - select { - case <-ctx.Done(): - result.concludeCommand(CommandTag{}, newContextAlreadyDoneError(ctx)) - result.closed = true - pgConn.unlock() - return result - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - return result -} - -func (pgConn *PgConn) execExtendedSuffix(result *ResultReader) { - pgConn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) - pgConn.frontend.SendExecute(&pgproto3.Execute{}) - pgConn.frontend.SendSync(&pgproto3.Sync{}) - - err := pgConn.flushWithPotentialWriteReadDeadlock() - if err != nil { - pgConn.asyncClose() - result.concludeCommand(CommandTag{}, err) - pgConn.contextWatcher.Unwatch() - result.closed = true - pgConn.unlock() - return - } - - result.readUntilRowDescription() -} - -// CopyTo executes the copy command sql and copies the results to w. -func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error) { - if err := pgConn.lock(); err != nil { - return CommandTag{}, err - } - - if ctx != context.Background() { - select { - case <-ctx.Done(): - pgConn.unlock() - return CommandTag{}, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - // Send copy to command - pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) - - err := pgConn.flushWithPotentialWriteReadDeadlock() - if err != nil { - pgConn.asyncClose() - pgConn.unlock() - return CommandTag{}, err - } - - // Read results - var commandTag CommandTag - var pgErr error - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return CommandTag{}, normalizeTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.CopyDone: - case *pgproto3.CopyData: - _, err := w.Write(msg.Data) - if err != nil { - pgConn.asyncClose() - return CommandTag{}, err - } - case *pgproto3.ReadyForQuery: - pgConn.unlock() - return commandTag, pgErr - case *pgproto3.CommandComplete: - commandTag = pgConn.makeCommandTag(msg.CommandTag) - case *pgproto3.ErrorResponse: - pgErr = ErrorResponseToPgError(msg) - } - } -} - -// CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server. -// -// Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r -// could still block. -func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) { - if err := pgConn.lock(); err != nil { - return CommandTag{}, err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return CommandTag{}, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - // Send copy from query - pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) - err := pgConn.flushWithPotentialWriteReadDeadlock() - if err != nil { - pgConn.asyncClose() - return CommandTag{}, err - } - - // Send copy data - abortCopyChan := make(chan struct{}) - copyErrChan := make(chan error, 1) - signalMessageChan := pgConn.signalMessage() - var wg sync.WaitGroup - wg.Add(1) - - go func() { - defer wg.Done() - buf := iobufpool.Get(65536) - defer iobufpool.Put(buf) - (*buf)[0] = 'd' - - for { - n, readErr := r.Read((*buf)[5:cap(*buf)]) - if n > 0 { - *buf = (*buf)[0 : n+5] - pgio.SetInt32((*buf)[1:], int32(n+4)) - - writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf) - if writeErr != nil { - // Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. Not - // setting pgConn.status or closing pgConn.cleanupDone for the same reason. - pgConn.conn.Close() - - copyErrChan <- writeErr - return - } - } - if readErr != nil { - copyErrChan <- readErr - return - } - - select { - case <-abortCopyChan: - return - default: - } - } - }() - - var pgErr error - var copyErr error - for copyErr == nil && pgErr == nil { - select { - case copyErr = <-copyErrChan: - case <-signalMessageChan: - // If pgConn.receiveMessage encounters an error it will call pgConn.asyncClose. But that is a race condition with - // the goroutine. So instead check pgConn.bufferingReceiveErr which will have been set by the signalMessage. If an - // error is found then forcibly close the connection without sending the Terminate message. - if err := pgConn.bufferingReceiveErr; err != nil { - pgConn.status = connStatusClosed - pgConn.conn.Close() - close(pgConn.cleanupDone) - return CommandTag{}, normalizeTimeoutError(ctx, err) - } - msg, _ := pgConn.receiveMessage() - - switch msg := msg.(type) { - case *pgproto3.ErrorResponse: - pgErr = ErrorResponseToPgError(msg) - default: - signalMessageChan = pgConn.signalMessage() - } - } - } - close(abortCopyChan) - // Make sure io goroutine finishes before writing. - wg.Wait() - - if copyErr == io.EOF || pgErr != nil { - pgConn.frontend.Send(&pgproto3.CopyDone{}) - } else { - pgConn.frontend.Send(&pgproto3.CopyFail{Message: copyErr.Error()}) - } - err = pgConn.flushWithPotentialWriteReadDeadlock() - if err != nil { - pgConn.asyncClose() - return CommandTag{}, err - } - - // Read results - var commandTag CommandTag - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return CommandTag{}, normalizeTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.ReadyForQuery: - return commandTag, pgErr - case *pgproto3.CommandComplete: - commandTag = pgConn.makeCommandTag(msg.CommandTag) - case *pgproto3.ErrorResponse: - pgErr = ErrorResponseToPgError(msg) - } - } -} - -// MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch. -type MultiResultReader struct { - pgConn *PgConn - ctx context.Context - pipeline *Pipeline - - rr *ResultReader - - closed bool - err error -} - -// ReadAll reads all available results. Calling ReadAll is mutually exclusive with all other MultiResultReader methods. -func (mrr *MultiResultReader) ReadAll() ([]*Result, error) { - var results []*Result - - for mrr.NextResult() { - results = append(results, mrr.ResultReader().Read()) - } - err := mrr.Close() - - return results, err -} - -func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) { - msg, err := mrr.pgConn.receiveMessage() - if err != nil { - mrr.pgConn.contextWatcher.Unwatch() - mrr.err = normalizeTimeoutError(mrr.ctx, err) - mrr.closed = true - mrr.pgConn.asyncClose() - return nil, mrr.err - } - - switch msg := msg.(type) { - case *pgproto3.ReadyForQuery: - mrr.closed = true - if mrr.pipeline != nil { - mrr.pipeline.expectedReadyForQueryCount-- - } else { - mrr.pgConn.contextWatcher.Unwatch() - mrr.pgConn.unlock() - } - case *pgproto3.ErrorResponse: - mrr.err = ErrorResponseToPgError(msg) - } - - return msg, nil -} - -// NextResult returns advances the MultiResultReader to the next result and returns true if a result is available. -func (mrr *MultiResultReader) NextResult() bool { - for !mrr.closed && mrr.err == nil { - msg, err := mrr.receiveMessage() - if err != nil { - return false - } - - switch msg := msg.(type) { - case *pgproto3.RowDescription: - mrr.pgConn.resultReader = ResultReader{ - pgConn: mrr.pgConn, - multiResultReader: mrr, - ctx: mrr.ctx, - fieldDescriptions: mrr.pgConn.convertRowDescription(mrr.pgConn.fieldDescriptions[:], msg), - } - - mrr.rr = &mrr.pgConn.resultReader - return true - case *pgproto3.CommandComplete: - mrr.pgConn.resultReader = ResultReader{ - commandTag: mrr.pgConn.makeCommandTag(msg.CommandTag), - commandConcluded: true, - closed: true, - } - mrr.rr = &mrr.pgConn.resultReader - return true - case *pgproto3.EmptyQueryResponse: - return false - } - } - - return false -} - -// ResultReader returns the current ResultReader. -func (mrr *MultiResultReader) ResultReader() *ResultReader { - return mrr.rr -} - -// Close closes the MultiResultReader and returns the first error that occurred during the MultiResultReader's use. -func (mrr *MultiResultReader) Close() error { - for !mrr.closed { - _, err := mrr.receiveMessage() - if err != nil { - return mrr.err - } - } - - return mrr.err -} - -// ResultReader is a reader for the result of a single query. -type ResultReader struct { - pgConn *PgConn - multiResultReader *MultiResultReader - pipeline *Pipeline - ctx context.Context - - fieldDescriptions []FieldDescription - rowValues [][]byte - commandTag CommandTag - commandConcluded bool - closed bool - err error -} - -// Result is the saved query response that is returned by calling Read on a ResultReader. -type Result struct { - FieldDescriptions []FieldDescription - Rows [][][]byte - CommandTag CommandTag - Err error -} - -// Read saves the query response to a Result. -func (rr *ResultReader) Read() *Result { - br := &Result{} - - for rr.NextRow() { - if br.FieldDescriptions == nil { - br.FieldDescriptions = make([]FieldDescription, len(rr.FieldDescriptions())) - copy(br.FieldDescriptions, rr.FieldDescriptions()) - } - - values := rr.Values() - row := make([][]byte, len(values)) - for i := range row { - if values[i] != nil { - row[i] = make([]byte, len(values[i])) - copy(row[i], values[i]) - } - } - br.Rows = append(br.Rows, row) - } - - br.CommandTag, br.Err = rr.Close() - - return br -} - -// NextRow advances the ResultReader to the next row and returns true if a row is available. -func (rr *ResultReader) NextRow() bool { - for !rr.commandConcluded { - msg, err := rr.receiveMessage() - if err != nil { - return false - } - - switch msg := msg.(type) { - case *pgproto3.DataRow: - rr.rowValues = msg.Values - return true - } - } - - return false -} - -// FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until -// the ResultReader is closed. It may return nil (for example, if the query did not return a result set or an error was -// encountered.) -func (rr *ResultReader) FieldDescriptions() []FieldDescription { - return rr.fieldDescriptions -} - -// Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only -// valid until the next NextRow call or the ResultReader is closed. -func (rr *ResultReader) Values() [][]byte { - return rr.rowValues -} - -// Close consumes any remaining result data and returns the command tag or -// error. -func (rr *ResultReader) Close() (CommandTag, error) { - if rr.closed { - return rr.commandTag, rr.err - } - rr.closed = true - - for !rr.commandConcluded { - _, err := rr.receiveMessage() - if err != nil { - return CommandTag{}, rr.err - } - } - - if rr.multiResultReader == nil && rr.pipeline == nil { - for { - msg, err := rr.receiveMessage() - if err != nil { - return CommandTag{}, rr.err - } - - switch msg := msg.(type) { - // Detect a deferred constraint violation where the ErrorResponse is sent after CommandComplete. - case *pgproto3.ErrorResponse: - rr.err = ErrorResponseToPgError(msg) - case *pgproto3.ReadyForQuery: - rr.pgConn.contextWatcher.Unwatch() - rr.pgConn.unlock() - return rr.commandTag, rr.err - } - } - } - - return rr.commandTag, rr.err -} - -// readUntilRowDescription ensures the ResultReader's fieldDescriptions are loaded. It does not return an error as any -// error will be stored in the ResultReader. -func (rr *ResultReader) readUntilRowDescription() { - for !rr.commandConcluded { - // Peek before receive to avoid consuming a DataRow if the result set does not include a RowDescription method. - // This should never happen under normal pgconn usage, but it is possible if SendBytes and ReceiveResults are - // manually used to construct a query that does not issue a describe statement. - msg, _ := rr.pgConn.peekMessage() - if _, ok := msg.(*pgproto3.DataRow); ok { - return - } - - // Consume the message - msg, _ = rr.receiveMessage() - if _, ok := msg.(*pgproto3.RowDescription); ok { - return - } - } -} - -func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error) { - if rr.multiResultReader == nil { - msg, err = rr.pgConn.receiveMessage() - } else { - msg, err = rr.multiResultReader.receiveMessage() - } - - if err != nil { - err = normalizeTimeoutError(rr.ctx, err) - rr.concludeCommand(CommandTag{}, err) - rr.pgConn.contextWatcher.Unwatch() - rr.closed = true - if rr.multiResultReader == nil { - rr.pgConn.asyncClose() - } - - return nil, rr.err - } - - switch msg := msg.(type) { - case *pgproto3.RowDescription: - rr.fieldDescriptions = rr.pgConn.convertRowDescription(rr.pgConn.fieldDescriptions[:], msg) - case *pgproto3.CommandComplete: - rr.concludeCommand(rr.pgConn.makeCommandTag(msg.CommandTag), nil) - case *pgproto3.EmptyQueryResponse: - rr.concludeCommand(CommandTag{}, nil) - case *pgproto3.ErrorResponse: - rr.concludeCommand(CommandTag{}, ErrorResponseToPgError(msg)) - } - - return msg, nil -} - -func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) { - // Keep the first error that is recorded. Store the error before checking if the command is already concluded to - // allow for receiving an error after CommandComplete but before ReadyForQuery. - if err != nil && rr.err == nil { - rr.err = err - } - - if rr.commandConcluded { - return - } - - rr.commandTag = commandTag - rr.rowValues = nil - rr.commandConcluded = true -} - -// Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip. -type Batch struct { - buf []byte - err error -} - -// ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions. -func (batch *Batch) ExecParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { - if batch.err != nil { - return - } - - batch.buf, batch.err = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(batch.buf) - if batch.err != nil { - return - } - batch.ExecPrepared("", paramValues, paramFormats, resultFormats) -} - -// ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions. -func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { - if batch.err != nil { - return - } - - batch.buf, batch.err = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(batch.buf) - if batch.err != nil { - return - } - - batch.buf, batch.err = (&pgproto3.Describe{ObjectType: 'P'}).Encode(batch.buf) - if batch.err != nil { - return - } - - batch.buf, batch.err = (&pgproto3.Execute{}).Encode(batch.buf) - if batch.err != nil { - return - } -} - -// ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a -// transaction is already in progress or SQL contains transaction control statements. This is a simpler way of executing -// multiple queries in a single round trip than using pipeline mode. -func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader { - if batch.err != nil { - return &MultiResultReader{ - closed: true, - err: batch.err, - } - } - - if err := pgConn.lock(); err != nil { - return &MultiResultReader{ - closed: true, - err: err, - } - } - - pgConn.multiResultReader = MultiResultReader{ - pgConn: pgConn, - ctx: ctx, - } - multiResult := &pgConn.multiResultReader - - if ctx != context.Background() { - select { - case <-ctx.Done(): - multiResult.closed = true - multiResult.err = newContextAlreadyDoneError(ctx) - pgConn.unlock() - return multiResult - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - batch.buf, batch.err = (&pgproto3.Sync{}).Encode(batch.buf) - if batch.err != nil { - multiResult.closed = true - multiResult.err = batch.err - pgConn.unlock() - return multiResult - } - - pgConn.enterPotentialWriteReadDeadlock() - defer pgConn.exitPotentialWriteReadDeadlock() - _, err := pgConn.conn.Write(batch.buf) - if err != nil { - multiResult.closed = true - multiResult.err = err - pgConn.unlock() - return multiResult - } - - return multiResult -} - -// EscapeString escapes a string such that it can safely be interpolated into a SQL command string. It does not include -// the surrounding single quotes. -// -// The current implementation requires that standard_conforming_strings=on and client_encoding="UTF8". If these -// conditions are not met an error will be returned. It is possible these restrictions will be lifted in the future. -func (pgConn *PgConn) EscapeString(s string) (string, error) { - if pgConn.ParameterStatus("standard_conforming_strings") != "on" { - return "", errors.New("EscapeString must be run with standard_conforming_strings=on") - } - - if pgConn.ParameterStatus("client_encoding") != "UTF8" { - return "", errors.New("EscapeString must be run with client_encoding=UTF8") - } - - return strings.Replace(s, "'", "''", -1), nil -} - -// CheckConn checks the underlying connection without writing any bytes. This is currently implemented by doing a read -// with a very short deadline. This can be useful because a TCP connection can be broken such that a write will appear -// to succeed even though it will never actually reach the server. Reading immediately before a write will detect this -// condition. If this is done immediately before sending a query it reduces the chances a query will be sent that fails -// without the client knowing whether the server received it or not. -// -// Deprecated: CheckConn is deprecated in favor of Ping. CheckConn cannot detect all types of broken connections where -// the write would still appear to succeed. Prefer Ping unless on a high latency connection. -func (pgConn *PgConn) CheckConn() error { - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) - defer cancel() - - _, err := pgConn.ReceiveMessage(ctx) - if err != nil { - if !Timeout(err) { - return err - } - } - - return nil -} - -// Ping pings the server. This can be useful because a TCP connection can be broken such that a write will appear to -// succeed even though it will never actually reach the server. Pinging immediately before sending a query reduces the -// chances a query will be sent that fails without the client knowing whether the server received it or not. -func (pgConn *PgConn) Ping(ctx context.Context) error { - return pgConn.Exec(ctx, "-- ping").Close() -} - -// makeCommandTag makes a CommandTag. It does not retain a reference to buf or buf's underlying memory. -func (pgConn *PgConn) makeCommandTag(buf []byte) CommandTag { - return CommandTag{s: string(buf)} -} - -// enterPotentialWriteReadDeadlock must be called before a write that could deadlock if the server is simultaneously -// blocked writing to us. -func (pgConn *PgConn) enterPotentialWriteReadDeadlock() { - // The time to wait is somewhat arbitrary. A Write should only take as long as the syscall and memcpy to the OS - // outbound network buffer unless the buffer is full (which potentially is a block). It needs to be long enough for - // the normal case, but short enough not to kill performance if a block occurs. - // - // In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is - // ineffective. - if pgConn.slowWriteTimer.Reset(15 * time.Millisecond) { - panic("BUG: slow write timer already active") - } -} - -// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock. -func (pgConn *PgConn) exitPotentialWriteReadDeadlock() { - if !pgConn.slowWriteTimer.Stop() { - // The timer starts its function in a separate goroutine. It is necessary to ensure the background reader has - // started before calling Stop. Otherwise, the background reader may not be stopped. That on its own is not a - // serious problem. But what is a serious problem is that the background reader may start at an inopportune time in - // a subsequent query. For example, if a subsequent query was canceled then a deadline may be set on the net.Conn to - // interrupt an in-progress read. After the read is interrupted, but before the deadline is cleared, the background - // reader could start and read a deadline error. Then the next query would receive the an unexpected deadline error. - <-pgConn.bgReaderStarted - pgConn.bgReader.Stop() - } -} - -func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error { - pgConn.enterPotentialWriteReadDeadlock() - defer pgConn.exitPotentialWriteReadDeadlock() - err := pgConn.frontend.Flush() - return err -} - -// SyncConn prepares the underlying net.Conn for direct use. PgConn may internally buffer reads or use goroutines for -// background IO. This means that any direct use of the underlying net.Conn may be corrupted if a read is already -// buffered or a read is in progress. SyncConn drains read buffers and stops background IO. In some cases this may -// require sending a ping to the server. ctx can be used to cancel this operation. This should be called before any -// operation that will use the underlying net.Conn directly. e.g. Before Conn() or Hijack(). -// -// This should not be confused with the PostgreSQL protocol Sync message. -func (pgConn *PgConn) SyncConn(ctx context.Context) error { - for i := 0; i < 10; i++ { - if pgConn.bgReader.Status() == bgreader.StatusStopped && pgConn.frontend.ReadBufferLen() == 0 { - return nil - } - - err := pgConn.Ping(ctx) - if err != nil { - return fmt.Errorf("SyncConn: Ping failed while syncing conn: %w", err) - } - } - - // This should never happen. Only way I can imagine this occurring is if the server is constantly sending data such as - // LISTEN/NOTIFY or log notifications such that we never can get an empty buffer. - return errors.New("SyncConn: conn never synchronized") -} - -// CustomData returns a map that can be used to associate custom data with the connection. -func (pgConn *PgConn) CustomData() map[string]any { - return pgConn.customData -} - -// HijackedConn is the result of hijacking a connection. -// -// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning -// compatibility. -type HijackedConn struct { - Conn net.Conn - PID uint32 // backend pid - SecretKey uint32 // key to use to send a cancel query message to the server - ParameterStatuses map[string]string // parameters that have been reported by the server - TxStatus byte - Frontend *pgproto3.Frontend - Config *Config - CustomData map[string]any -} - -// Hijack extracts the internal connection data. pgConn must be in an idle state. SyncConn should be called immediately -// before Hijack. pgConn is unusable after hijacking. Hijacking is typically only useful when using pgconn to establish -// a connection, but taking complete control of the raw connection after that (e.g. a load balancer or proxy). -// -// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning -// compatibility. -func (pgConn *PgConn) Hijack() (*HijackedConn, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - pgConn.status = connStatusClosed - - return &HijackedConn{ - Conn: pgConn.conn, - PID: pgConn.pid, - SecretKey: pgConn.secretKey, - ParameterStatuses: pgConn.parameterStatuses, - TxStatus: pgConn.txStatus, - Frontend: pgConn.frontend, - Config: pgConn.config, - CustomData: pgConn.customData, - }, nil -} - -// Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of -// PgConn.Hijack. The connection must be in an idle state. -// -// hc.Frontend is replaced by a new pgproto3.Frontend built by hc.Config.BuildFrontend. -// -// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning -// compatibility. -func Construct(hc *HijackedConn) (*PgConn, error) { - pgConn := &PgConn{ - conn: hc.Conn, - pid: hc.PID, - secretKey: hc.SecretKey, - parameterStatuses: hc.ParameterStatuses, - txStatus: hc.TxStatus, - frontend: hc.Frontend, - config: hc.Config, - customData: hc.CustomData, - - status: connStatusIdle, - - cleanupDone: make(chan struct{}), - } - - pgConn.contextWatcher = ctxwatch.NewContextWatcher(hc.Config.BuildContextWatcherHandler(pgConn)) - pgConn.bgReader = bgreader.New(pgConn.conn) - pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), - func() { - pgConn.bgReader.Start() - pgConn.bgReaderStarted <- struct{}{} - }, - ) - pgConn.slowWriteTimer.Stop() - pgConn.bgReaderStarted = make(chan struct{}) - pgConn.frontend = hc.Config.BuildFrontend(pgConn.bgReader, pgConn.conn) - - return pgConn, nil -} - -// Pipeline represents a connection in pipeline mode. -// -// SendPrepare, SendQueryParams, and SendQueryPrepared queue requests to the server. These requests are not written until -// pipeline is flushed by Flush or Sync. Sync must be called after the last request is queued. Requests between -// synchronization points are implicitly transactional unless explicit transaction control statements have been issued. -// -// The context the pipeline was started with is in effect for the entire life of the Pipeline. -// -// For a deeper understanding of pipeline mode see the PostgreSQL documentation for the extended query protocol -// (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) and the libpq pipeline mode -// (https://www.postgresql.org/docs/current/libpq-pipeline-mode.html). -type Pipeline struct { - conn *PgConn - ctx context.Context - - expectedReadyForQueryCount int - pendingSync bool - - err error - closed bool -} - -// PipelineSync is returned by GetResults when a ReadyForQuery message is received. -type PipelineSync struct{} - -// CloseComplete is returned by GetResults when a CloseComplete message is received. -type CloseComplete struct{} - -// StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent -// to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection -// to normal mode. While in pipeline mode, no methods that communicate with the server may be called except -// CancelRequest and Close. ctx is in effect for entire life of the *Pipeline. -// -// Prefer ExecBatch when only sending one group of queries at once. -func (pgConn *PgConn) StartPipeline(ctx context.Context) *Pipeline { - if err := pgConn.lock(); err != nil { - return &Pipeline{ - closed: true, - err: err, - } - } - - pgConn.pipeline = Pipeline{ - conn: pgConn, - ctx: ctx, - } - pipeline := &pgConn.pipeline - - if ctx != context.Background() { - select { - case <-ctx.Done(): - pipeline.closed = true - pipeline.err = newContextAlreadyDoneError(ctx) - pgConn.unlock() - return pipeline - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - return pipeline -} - -// SendPrepare is the pipeline version of *PgConn.Prepare. -func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) { - if p.closed { - return - } - p.pendingSync = true - - p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) - p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) -} - -// SendDeallocate deallocates a prepared statement. -func (p *Pipeline) SendDeallocate(name string) { - if p.closed { - return - } - p.pendingSync = true - - p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) -} - -// SendQueryParams is the pipeline version of *PgConn.QueryParams. -func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { - if p.closed { - return - } - p.pendingSync = true - - p.conn.frontend.SendParse(&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}) - p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) - p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) - p.conn.frontend.SendExecute(&pgproto3.Execute{}) -} - -// SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared. -func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { - if p.closed { - return - } - p.pendingSync = true - - p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) - p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) - p.conn.frontend.SendExecute(&pgproto3.Execute{}) -} - -// Flush flushes the queued requests without establishing a synchronization point. -func (p *Pipeline) Flush() error { - if p.closed { - if p.err != nil { - return p.err - } - return errors.New("pipeline closed") - } - - err := p.conn.flushWithPotentialWriteReadDeadlock() - if err != nil { - err = normalizeTimeoutError(p.ctx, err) - - p.conn.asyncClose() - - p.conn.contextWatcher.Unwatch() - p.conn.unlock() - p.closed = true - p.err = err - return err - } - - return nil -} - -// Sync establishes a synchronization point and flushes the queued requests. -func (p *Pipeline) Sync() error { - if p.closed { - if p.err != nil { - return p.err - } - return errors.New("pipeline closed") - } - - p.conn.frontend.SendSync(&pgproto3.Sync{}) - err := p.Flush() - if err != nil { - return err - } - - p.pendingSync = false - p.expectedReadyForQueryCount++ - - return nil -} - -// GetResults gets the next results. If results are present, results may be a *ResultReader, *StatementDescription, or -// *PipelineSync. If an ErrorResponse is received from the server, results will be nil and err will be a *PgError. If no -// results are available, results and err will both be nil. -func (p *Pipeline) GetResults() (results any, err error) { - if p.closed { - if p.err != nil { - return nil, p.err - } - return nil, errors.New("pipeline closed") - } - - if p.expectedReadyForQueryCount == 0 { - return nil, nil - } - - return p.getResults() -} - -func (p *Pipeline) getResults() (results any, err error) { - for { - msg, err := p.conn.receiveMessage() - if err != nil { - p.closed = true - p.err = err - p.conn.asyncClose() - return nil, normalizeTimeoutError(p.ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.RowDescription: - p.conn.resultReader = ResultReader{ - pgConn: p.conn, - pipeline: p, - ctx: p.ctx, - fieldDescriptions: p.conn.convertRowDescription(p.conn.fieldDescriptions[:], msg), - } - return &p.conn.resultReader, nil - case *pgproto3.CommandComplete: - p.conn.resultReader = ResultReader{ - commandTag: p.conn.makeCommandTag(msg.CommandTag), - commandConcluded: true, - closed: true, - } - return &p.conn.resultReader, nil - case *pgproto3.ParseComplete: - peekedMsg, err := p.conn.peekMessage() - if err != nil { - p.conn.asyncClose() - return nil, normalizeTimeoutError(p.ctx, err) - } - if _, ok := peekedMsg.(*pgproto3.ParameterDescription); ok { - return p.getResultsPrepare() - } - case *pgproto3.CloseComplete: - return &CloseComplete{}, nil - case *pgproto3.ReadyForQuery: - p.expectedReadyForQueryCount-- - return &PipelineSync{}, nil - case *pgproto3.ErrorResponse: - pgErr := ErrorResponseToPgError(msg) - return nil, pgErr - } - - } -} - -func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) { - psd := &StatementDescription{} - - for { - msg, err := p.conn.receiveMessage() - if err != nil { - p.conn.asyncClose() - return nil, normalizeTimeoutError(p.ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.ParameterDescription: - psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) - copy(psd.ParamOIDs, msg.ParameterOIDs) - case *pgproto3.RowDescription: - psd.Fields = p.conn.convertRowDescription(nil, msg) - return psd, nil - - // NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING - // clause. - case *pgproto3.NoData: - return psd, nil - - // These should never happen here. But don't take chances that could lead to a deadlock. - case *pgproto3.ErrorResponse: - pgErr := ErrorResponseToPgError(msg) - return nil, pgErr - case *pgproto3.CommandComplete: - p.conn.asyncClose() - return nil, errors.New("BUG: received CommandComplete while handling Describe") - case *pgproto3.ReadyForQuery: - p.conn.asyncClose() - return nil, errors.New("BUG: received ReadyForQuery while handling Describe") - } - } -} - -// Close closes the pipeline and returns the connection to normal mode. -func (p *Pipeline) Close() error { - if p.closed { - return p.err - } - - p.closed = true - - if p.pendingSync { - p.conn.asyncClose() - p.err = errors.New("pipeline has unsynced requests") - p.conn.contextWatcher.Unwatch() - p.conn.unlock() - - return p.err - } - - for p.expectedReadyForQueryCount > 0 { - _, err := p.getResults() - if err != nil { - p.err = err - var pgErr *PgError - if !errors.As(err, &pgErr) { - p.conn.asyncClose() - break - } - } - } - - p.conn.contextWatcher.Unwatch() - p.conn.unlock() - - return p.err -} - -// DeadlineContextWatcherHandler handles canceled contexts by setting a deadline on a net.Conn. -type DeadlineContextWatcherHandler struct { - Conn net.Conn - - // DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled. - DeadlineDelay time.Duration -} - -func (h *DeadlineContextWatcherHandler) HandleCancel(ctx context.Context) { - h.Conn.SetDeadline(time.Now().Add(h.DeadlineDelay)) -} - -func (h *DeadlineContextWatcherHandler) HandleUnwatchAfterCancel() { - h.Conn.SetDeadline(time.Time{}) -} - -// CancelRequestContextWatcherHandler handles canceled contexts by sending a cancel request to the server. It also sets -// a deadline on a net.Conn as a fallback. -type CancelRequestContextWatcherHandler struct { - Conn *PgConn - - // CancelRequestDelay is the delay before sending the cancel request to the server. - CancelRequestDelay time.Duration - - // DeadlineDelay is the delay to set on the deadline set on net.Conn when the context is canceled. - DeadlineDelay time.Duration - - cancelFinishedChan chan struct{} - handleUnwatchAfterCancelCalled func() -} - -func (h *CancelRequestContextWatcherHandler) HandleCancel(context.Context) { - h.cancelFinishedChan = make(chan struct{}) - var handleUnwatchedAfterCancelCalledCtx context.Context - handleUnwatchedAfterCancelCalledCtx, h.handleUnwatchAfterCancelCalled = context.WithCancel(context.Background()) - - deadline := time.Now().Add(h.DeadlineDelay) - h.Conn.conn.SetDeadline(deadline) - - go func() { - defer close(h.cancelFinishedChan) - - select { - case <-handleUnwatchedAfterCancelCalledCtx.Done(): - return - case <-time.After(h.CancelRequestDelay): - } - - cancelRequestCtx, cancel := context.WithDeadline(handleUnwatchedAfterCancelCalledCtx, deadline) - defer cancel() - h.Conn.CancelRequest(cancelRequestCtx) - - // CancelRequest is inherently racy. Even though the cancel request has been received by the server at this point, - // it hasn't necessarily been delivered to the other connection. If we immediately return and the connection is - // immediately used then it is possible the CancelRequest will actually cancel our next query. The - // TestCancelRequestContextWatcherHandler Stress test can produce this error without the sleep below. The sleep time - // is arbitrary, but should be sufficient to prevent this error case. - time.Sleep(100 * time.Millisecond) - }() -} - -func (h *CancelRequestContextWatcherHandler) HandleUnwatchAfterCancel() { - h.handleUnwatchAfterCancelCalled() - <-h.cancelFinishedChan - - h.Conn.conn.SetDeadline(time.Time{}) -} |