diff options
Diffstat (limited to 'vendor/github.com/jackc/pgconn')
-rw-r--r-- | vendor/github.com/jackc/pgconn/.gitignore | 3 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/CHANGELOG.md | 136 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/LICENSE | 22 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/README.md | 56 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/auth_scram.go | 266 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/config.go | 783 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/defaults.go | 64 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/defaults_windows.go | 59 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/doc.go | 29 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/errors.go | 221 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/internal/ctxwatch/context_watcher.go | 73 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/pgconn.go | 1723 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/stmtcache/lru.go | 165 | ||||
-rw-r--r-- | vendor/github.com/jackc/pgconn/stmtcache/stmtcache.go | 58 |
14 files changed, 0 insertions, 3658 deletions
diff --git a/vendor/github.com/jackc/pgconn/.gitignore b/vendor/github.com/jackc/pgconn/.gitignore deleted file mode 100644 index e980f5555..000000000 --- a/vendor/github.com/jackc/pgconn/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -.envrc -vendor/ -.vscode diff --git a/vendor/github.com/jackc/pgconn/CHANGELOG.md b/vendor/github.com/jackc/pgconn/CHANGELOG.md deleted file mode 100644 index a37eecfe1..000000000 --- a/vendor/github.com/jackc/pgconn/CHANGELOG.md +++ /dev/null @@ -1,136 +0,0 @@ -# 1.11.0 (February 7, 2022) - -* Support port in ip from LookupFunc to override config (James Hartig) -* Fix TLS connection timeout (Blake Embrey) -* Add support for read-only, primary, standby, prefer-standby target_session_attributes (Oscar) -* Fix connect when receiving NoticeResponse - -# 1.10.1 (November 20, 2021) - -* Close without waiting for response (Kei Kamikawa) -* Save waiting for network round-trip in CopyFrom (Rueian) -* Fix concurrency issue with ContextWatcher -* LRU.Get always checks context for cancellation / expiration (Georges Varouchas) - -# 1.10.0 (July 24, 2021) - -* net.Timeout errors are no longer returned when a query is canceled via context. A wrapped context error is returned. - -# 1.9.0 (July 10, 2021) - -* pgconn.Timeout only is true for errors originating in pgconn (Michael Darr) -* Add defaults for sslcert, sslkey, and sslrootcert (Joshua Brindle) -* Solve issue with 'sslmode=verify-full' when there are multiple hosts (mgoddard) -* Fix default host when parsing URL without host but with port -* Allow dbname query parameter in URL conn string -* Update underlying dependencies - -# 1.8.1 (March 25, 2021) - -* Better connection string sanitization (ip.novikov) -* Use proper pgpass location on Windows (Moshe Katz) -* Use errors instead of golang.org/x/xerrors -* Resume fallback on server error in Connect (Andrey Borodin) - -# 1.8.0 (December 3, 2020) - -* Add StatementErrored method to stmtcache.Cache. This allows the cache to purge invalidated prepared statements. (Ethan Pailes) - -# 1.7.2 (November 3, 2020) - -* Fix data value slices into work buffer with capacities larger than length. - -# 1.7.1 (October 31, 2020) - -* Do not asyncClose after receiving FATAL error from PostgreSQL server - -# 1.7.0 (September 26, 2020) - -* Exec(Params|Prepared) return ResultReader with FieldDescriptions loaded -* Add ReceiveResults (Sebastiaan Mannem) -* Fix parsing DSN connection with bad backslash -* Add PgConn.CleanupDone so connection pools can determine when async close is complete - -# 1.6.4 (July 29, 2020) - -* Fix deadlock on error after CommandComplete but before ReadyForQuery -* Fix panic on parsing DSN with trailing '=' - -# 1.6.3 (July 22, 2020) - -* Fix error message after AppendCertsFromPEM failure (vahid-sohrabloo) - -# 1.6.2 (July 14, 2020) - -* Update pgservicefile library - -# 1.6.1 (June 27, 2020) - -* Update golang.org/x/crypto to latest -* Update golang.org/x/text to 0.3.3 -* Fix error handling for bad PGSERVICE definition -* Redact passwords in ParseConfig errors (Lukas Vogel) - -# 1.6.0 (June 6, 2020) - -* Fix panic when closing conn during cancellable query -* Fix behavior of sslmode=require with sslrootcert present (Petr Jediný) -* Fix field descriptions available after command concluded (Tobias Salzmann) -* Support connect_timeout (georgysavva) -* Handle IPv6 in connection URLs (Lukas Vogel) -* Fix ValidateConnect with cancelable context -* Improve CopyFrom performance -* Add Config.Copy (georgysavva) - -# 1.5.0 (March 30, 2020) - -* Update golang.org/x/crypto for security fix -* Implement "verify-ca" SSL mode (Greg Curtis) - -# 1.4.0 (March 7, 2020) - -* Fix ExecParams and ExecPrepared handling of empty query. -* Support reading config from PostgreSQL service files. - -# 1.3.2 (February 14, 2020) - -* Update chunkreader to v2.0.1 for optimized default buffer size. - -# 1.3.1 (February 5, 2020) - -* Fix CopyFrom deadlock when multiple NoticeResponse received during copy - -# 1.3.0 (January 23, 2020) - -* Add Hijack and Construct. -* Update pgproto3 to v2.0.1. - -# 1.2.1 (January 13, 2020) - -* Fix data race in context cancellation introduced in v1.2.0. - -# 1.2.0 (January 11, 2020) - -## Features - -* Add Insert(), Update(), Delete(), and Select() statement type query methods to CommandTag. -* Add PgError.SQLState method. This could be used for compatibility with other drivers and databases. - -## Performance - -* Improve performance when context.Background() is used. (bakape) -* CommandTag.RowsAffected is faster and does not allocate. - -## Fixes - -* Try to cancel any in-progress query when a conn is closed by ctx cancel. -* Handle NoticeResponse during CopyFrom. -* Ignore errors sending Terminate message while closing connection. This mimics the behavior of libpq PGfinish. - -# 1.1.0 (October 12, 2019) - -* Add PgConn.IsBusy() method. - -# 1.0.1 (September 19, 2019) - -* Fix statement cache not properly cleaning discarded statements. diff --git a/vendor/github.com/jackc/pgconn/LICENSE b/vendor/github.com/jackc/pgconn/LICENSE deleted file mode 100644 index aebadd6c4..000000000 --- a/vendor/github.com/jackc/pgconn/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -Copyright (c) 2019-2021 Jack Christensen - -MIT License - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/jackc/pgconn/README.md b/vendor/github.com/jackc/pgconn/README.md deleted file mode 100644 index 1c698a118..000000000 --- a/vendor/github.com/jackc/pgconn/README.md +++ /dev/null @@ -1,56 +0,0 @@ -[](https://godoc.org/github.com/jackc/pgconn) - - -# pgconn - -Package pgconn is a low-level PostgreSQL database driver. It operates at nearly the same level as the C library libpq. -It is primarily intended to serve as the foundation for higher level libraries such as https://github.com/jackc/pgx. -Applications should handle normal queries with a higher level library and only use pgconn directly when required for -low-level access to PostgreSQL functionality. - -## Example Usage - -```go -pgConn, err := pgconn.Connect(context.Background(), os.Getenv("DATABASE_URL")) -if err != nil { - log.Fatalln("pgconn failed to connect:", err) -} -defer pgConn.Close(context.Background()) - -result := pgConn.ExecParams(context.Background(), "SELECT email FROM users WHERE id=$1", [][]byte{[]byte("123")}, nil, nil, nil) -for result.NextRow() { - fmt.Println("User 123 has email:", string(result.Values()[0])) -} -_, err = result.Close() -if err != nil { - log.Fatalln("failed reading result:", err) -} -``` - -## Testing - -The pgconn tests require a PostgreSQL database. It will connect to the database specified in the `PGX_TEST_CONN_STRING` -environment variable. The `PGX_TEST_CONN_STRING` environment variable can be a URL or DSN. In addition, the standard `PG*` -environment variables will be respected. Consider using [direnv](https://github.com/direnv/direnv) to simplify -environment variable handling. - -### Example Test Environment - -Connect to your PostgreSQL server and run: - -``` -create database pgx_test; -``` - -Now you can run the tests: - -```bash -PGX_TEST_CONN_STRING="host=/var/run/postgresql dbname=pgx_test" go test ./... -``` - -### Connection and Authentication Tests - -Pgconn supports multiple connection types and means of authentication. These tests are optional. They -will only run if the appropriate environment variable is set. Run `go test -v | grep SKIP` to see if any tests are being -skipped. Most developers will not need to enable these tests. See `ci/setup_test.bash` for an example set up if you need change -authentication code. diff --git a/vendor/github.com/jackc/pgconn/auth_scram.go b/vendor/github.com/jackc/pgconn/auth_scram.go deleted file mode 100644 index 6a143fcdc..000000000 --- a/vendor/github.com/jackc/pgconn/auth_scram.go +++ /dev/null @@ -1,266 +0,0 @@ -// SCRAM-SHA-256 authentication -// -// Resources: -// https://tools.ietf.org/html/rfc5802 -// https://tools.ietf.org/html/rfc8265 -// https://www.postgresql.org/docs/current/sasl-authentication.html -// -// Inspiration drawn from other implementations: -// https://github.com/lib/pq/pull/608 -// https://github.com/lib/pq/pull/788 -// https://github.com/lib/pq/pull/833 - -package pgconn - -import ( - "bytes" - "crypto/hmac" - "crypto/rand" - "crypto/sha256" - "encoding/base64" - "errors" - "fmt" - "strconv" - - "github.com/jackc/pgproto3/v2" - "golang.org/x/crypto/pbkdf2" - "golang.org/x/text/secure/precis" -) - -const clientNonceLen = 18 - -// Perform SCRAM authentication. -func (c *PgConn) scramAuth(serverAuthMechanisms []string) error { - sc, err := newScramClient(serverAuthMechanisms, c.config.Password) - if err != nil { - return err - } - - // Send client-first-message in a SASLInitialResponse - saslInitialResponse := &pgproto3.SASLInitialResponse{ - AuthMechanism: "SCRAM-SHA-256", - Data: sc.clientFirstMessage(), - } - _, err = c.conn.Write(saslInitialResponse.Encode(nil)) - if err != nil { - return err - } - - // Receive server-first-message payload in a AuthenticationSASLContinue. - saslContinue, err := c.rxSASLContinue() - if err != nil { - return err - } - err = sc.recvServerFirstMessage(saslContinue.Data) - if err != nil { - return err - } - - // Send client-final-message in a SASLResponse - saslResponse := &pgproto3.SASLResponse{ - Data: []byte(sc.clientFinalMessage()), - } - _, err = c.conn.Write(saslResponse.Encode(nil)) - if err != nil { - return err - } - - // Receive server-final-message payload in a AuthenticationSASLFinal. - saslFinal, err := c.rxSASLFinal() - if err != nil { - return err - } - return sc.recvServerFinalMessage(saslFinal.Data) -} - -func (c *PgConn) rxSASLContinue() (*pgproto3.AuthenticationSASLContinue, error) { - msg, err := c.receiveMessage() - if err != nil { - return nil, err - } - saslContinue, ok := msg.(*pgproto3.AuthenticationSASLContinue) - if ok { - return saslContinue, nil - } - - return nil, errors.New("expected AuthenticationSASLContinue message but received unexpected message") -} - -func (c *PgConn) rxSASLFinal() (*pgproto3.AuthenticationSASLFinal, error) { - msg, err := c.receiveMessage() - if err != nil { - return nil, err - } - saslFinal, ok := msg.(*pgproto3.AuthenticationSASLFinal) - if ok { - return saslFinal, nil - } - - return nil, errors.New("expected AuthenticationSASLFinal message but received unexpected message") -} - -type scramClient struct { - serverAuthMechanisms []string - password []byte - clientNonce []byte - - clientFirstMessageBare []byte - - serverFirstMessage []byte - clientAndServerNonce []byte - salt []byte - iterations int - - saltedPassword []byte - authMessage []byte -} - -func newScramClient(serverAuthMechanisms []string, password string) (*scramClient, error) { - sc := &scramClient{ - serverAuthMechanisms: serverAuthMechanisms, - } - - // Ensure server supports SCRAM-SHA-256 - hasScramSHA256 := false - for _, mech := range sc.serverAuthMechanisms { - if mech == "SCRAM-SHA-256" { - hasScramSHA256 = true - break - } - } - if !hasScramSHA256 { - return nil, errors.New("server does not support SCRAM-SHA-256") - } - - // precis.OpaqueString is equivalent to SASLprep for password. - var err error - sc.password, err = precis.OpaqueString.Bytes([]byte(password)) - if err != nil { - // PostgreSQL allows passwords invalid according to SCRAM / SASLprep. - sc.password = []byte(password) - } - - buf := make([]byte, clientNonceLen) - _, err = rand.Read(buf) - if err != nil { - return nil, err - } - sc.clientNonce = make([]byte, base64.RawStdEncoding.EncodedLen(len(buf))) - base64.RawStdEncoding.Encode(sc.clientNonce, buf) - - return sc, nil -} - -func (sc *scramClient) clientFirstMessage() []byte { - sc.clientFirstMessageBare = []byte(fmt.Sprintf("n=,r=%s", sc.clientNonce)) - return []byte(fmt.Sprintf("n,,%s", sc.clientFirstMessageBare)) -} - -func (sc *scramClient) recvServerFirstMessage(serverFirstMessage []byte) error { - sc.serverFirstMessage = serverFirstMessage - buf := serverFirstMessage - if !bytes.HasPrefix(buf, []byte("r=")) { - return errors.New("invalid SCRAM server-first-message received from server: did not include r=") - } - buf = buf[2:] - - idx := bytes.IndexByte(buf, ',') - if idx == -1 { - return errors.New("invalid SCRAM server-first-message received from server: did not include s=") - } - sc.clientAndServerNonce = buf[:idx] - buf = buf[idx+1:] - - if !bytes.HasPrefix(buf, []byte("s=")) { - return errors.New("invalid SCRAM server-first-message received from server: did not include s=") - } - buf = buf[2:] - - idx = bytes.IndexByte(buf, ',') - if idx == -1 { - return errors.New("invalid SCRAM server-first-message received from server: did not include i=") - } - saltStr := buf[:idx] - buf = buf[idx+1:] - - if !bytes.HasPrefix(buf, []byte("i=")) { - return errors.New("invalid SCRAM server-first-message received from server: did not include i=") - } - buf = buf[2:] - iterationsStr := buf - - var err error - sc.salt, err = base64.StdEncoding.DecodeString(string(saltStr)) - if err != nil { - return fmt.Errorf("invalid SCRAM salt received from server: %w", err) - } - - sc.iterations, err = strconv.Atoi(string(iterationsStr)) - if err != nil || sc.iterations <= 0 { - return fmt.Errorf("invalid SCRAM iteration count received from server: %w", err) - } - - if !bytes.HasPrefix(sc.clientAndServerNonce, sc.clientNonce) { - return errors.New("invalid SCRAM nonce: did not start with client nonce") - } - - if len(sc.clientAndServerNonce) <= len(sc.clientNonce) { - return errors.New("invalid SCRAM nonce: did not include server nonce") - } - - return nil -} - -func (sc *scramClient) clientFinalMessage() string { - clientFinalMessageWithoutProof := []byte(fmt.Sprintf("c=biws,r=%s", sc.clientAndServerNonce)) - - sc.saltedPassword = pbkdf2.Key([]byte(sc.password), sc.salt, sc.iterations, 32, sha256.New) - sc.authMessage = bytes.Join([][]byte{sc.clientFirstMessageBare, sc.serverFirstMessage, clientFinalMessageWithoutProof}, []byte(",")) - - clientProof := computeClientProof(sc.saltedPassword, sc.authMessage) - - return fmt.Sprintf("%s,p=%s", clientFinalMessageWithoutProof, clientProof) -} - -func (sc *scramClient) recvServerFinalMessage(serverFinalMessage []byte) error { - if !bytes.HasPrefix(serverFinalMessage, []byte("v=")) { - return errors.New("invalid SCRAM server-final-message received from server") - } - - serverSignature := serverFinalMessage[2:] - - if !hmac.Equal(serverSignature, computeServerSignature(sc.saltedPassword, sc.authMessage)) { - return errors.New("invalid SCRAM ServerSignature received from server") - } - - return nil -} - -func computeHMAC(key, msg []byte) []byte { - mac := hmac.New(sha256.New, key) - mac.Write(msg) - return mac.Sum(nil) -} - -func computeClientProof(saltedPassword, authMessage []byte) []byte { - clientKey := computeHMAC(saltedPassword, []byte("Client Key")) - storedKey := sha256.Sum256(clientKey) - clientSignature := computeHMAC(storedKey[:], authMessage) - - clientProof := make([]byte, len(clientSignature)) - for i := 0; i < len(clientSignature); i++ { - clientProof[i] = clientKey[i] ^ clientSignature[i] - } - - buf := make([]byte, base64.StdEncoding.EncodedLen(len(clientProof))) - base64.StdEncoding.Encode(buf, clientProof) - return buf -} - -func computeServerSignature(saltedPassword []byte, authMessage []byte) []byte { - serverKey := computeHMAC(saltedPassword, []byte("Server Key")) - serverSignature := computeHMAC(serverKey, authMessage) - buf := make([]byte, base64.StdEncoding.EncodedLen(len(serverSignature))) - base64.StdEncoding.Encode(buf, serverSignature) - return buf -} diff --git a/vendor/github.com/jackc/pgconn/config.go b/vendor/github.com/jackc/pgconn/config.go deleted file mode 100644 index 0eab23af9..000000000 --- a/vendor/github.com/jackc/pgconn/config.go +++ /dev/null @@ -1,783 +0,0 @@ -package pgconn - -import ( - "context" - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - "io" - "io/ioutil" - "math" - "net" - "net/url" - "os" - "path/filepath" - "strconv" - "strings" - "time" - - "github.com/jackc/chunkreader/v2" - "github.com/jackc/pgpassfile" - "github.com/jackc/pgproto3/v2" - "github.com/jackc/pgservicefile" -) - -type AfterConnectFunc func(ctx context.Context, pgconn *PgConn) error -type ValidateConnectFunc func(ctx context.Context, pgconn *PgConn) error - -// Config is the settings used to establish a connection to a PostgreSQL server. It must be created by ParseConfig. A -// manually initialized Config will cause ConnectConfig to panic. -type Config struct { - Host string // host (e.g. localhost) or absolute path to unix domain socket directory (e.g. /private/tmp) - Port uint16 - Database string - User string - Password string - TLSConfig *tls.Config // nil disables TLS - ConnectTimeout time.Duration - DialFunc DialFunc // e.g. net.Dialer.DialContext - LookupFunc LookupFunc // e.g. net.Resolver.LookupHost - BuildFrontend BuildFrontendFunc - RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name) - - Fallbacks []*FallbackConfig - - // ValidateConnect is called during a connection attempt after a successful authentication with the PostgreSQL server. - // It can be used to validate that the server is acceptable. If this returns an error the connection is closed and the next - // fallback config is tried. This allows implementing high availability behavior such as libpq does with target_session_attrs. - ValidateConnect ValidateConnectFunc - - // AfterConnect is called after ValidateConnect. It can be used to set up the connection (e.g. Set session variables - // or prepare statements). If this returns an error the connection attempt fails. - AfterConnect AfterConnectFunc - - // OnNotice is a callback function called when a notice response is received. - OnNotice NoticeHandler - - // OnNotification is a callback function called when a notification from the LISTEN/NOTIFY system is received. - OnNotification NotificationHandler - - createdByParseConfig bool // Used to enforce created by ParseConfig rule. -} - -// Copy returns a deep copy of the config that is safe to use and modify. -// The only exception is the TLSConfig field: -// according to the tls.Config docs it must not be modified after creation. -func (c *Config) Copy() *Config { - newConf := new(Config) - *newConf = *c - if newConf.TLSConfig != nil { - newConf.TLSConfig = c.TLSConfig.Clone() - } - if newConf.RuntimeParams != nil { - newConf.RuntimeParams = make(map[string]string, len(c.RuntimeParams)) - for k, v := range c.RuntimeParams { - newConf.RuntimeParams[k] = v - } - } - if newConf.Fallbacks != nil { - newConf.Fallbacks = make([]*FallbackConfig, len(c.Fallbacks)) - for i, fallback := range c.Fallbacks { - newFallback := new(FallbackConfig) - *newFallback = *fallback - if newFallback.TLSConfig != nil { - newFallback.TLSConfig = fallback.TLSConfig.Clone() - } - newConf.Fallbacks[i] = newFallback - } - } - return newConf -} - -// FallbackConfig is additional settings to attempt a connection with when the primary Config fails to establish a -// network connection. It is used for TLS fallback such as sslmode=prefer and high availability (HA) connections. -type FallbackConfig struct { - Host string // host (e.g. localhost) or path to unix domain socket directory (e.g. /private/tmp) - Port uint16 - TLSConfig *tls.Config // nil disables TLS -} - -// NetworkAddress converts a PostgreSQL host and port into network and address suitable for use with -// net.Dial. -func NetworkAddress(host string, port uint16) (network, address string) { - if strings.HasPrefix(host, "/") { - network = "unix" - address = filepath.Join(host, ".s.PGSQL.") + strconv.FormatInt(int64(port), 10) - } else { - network = "tcp" - address = net.JoinHostPort(host, strconv.Itoa(int(port))) - } - return network, address -} - -// ParseConfig builds a *Config with similar behavior to the PostgreSQL standard C library libpq. It uses the same -// defaults as libpq (e.g. port=5432) and understands most PG* environment variables. ParseConfig closely matches -// the parsing behavior of libpq. connString may either be in URL format or keyword = value format (DSN style). See -// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING for details. connString also may be -// empty to only read from the environment. If a password is not supplied it will attempt to read the .pgpass file. -// -// # Example DSN -// user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca -// -// # Example URL -// postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca -// -// The returned *Config may be modified. However, it is strongly recommended that any configuration that can be done -// through the connection string be done there. In particular the fields Host, Port, TLSConfig, and Fallbacks can be -// interdependent (e.g. TLSConfig needs knowledge of the host to validate the server certificate). These fields should -// not be modified individually. They should all be modified or all left unchanged. -// -// ParseConfig supports specifying multiple hosts in similar manner to libpq. Host and port may include comma separated -// values that will be tried in order. This can be used as part of a high availability system. See -// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS for more information. -// -// # Example URL -// postgres://jack:secret@foo.example.com:5432,bar.example.com:5432/mydb -// -// ParseConfig currently recognizes the following environment variable and their parameter key word equivalents passed -// via database URL or DSN: -// -// PGHOST -// PGPORT -// PGDATABASE -// PGUSER -// PGPASSWORD -// PGPASSFILE -// PGSERVICE -// PGSERVICEFILE -// PGSSLMODE -// PGSSLCERT -// PGSSLKEY -// PGSSLROOTCERT -// PGAPPNAME -// PGCONNECT_TIMEOUT -// PGTARGETSESSIONATTRS -// -// See http://www.postgresql.org/docs/11/static/libpq-envars.html for details on the meaning of environment variables. -// -// See https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-PARAMKEYWORDS for parameter key word names. They are -// usually but not always the environment variable name downcased and without the "PG" prefix. -// -// Important Security Notes: -// -// ParseConfig tries to match libpq behavior with regard to PGSSLMODE. This includes defaulting to "prefer" behavior if -// not set. -// -// See http://www.postgresql.org/docs/11/static/libpq-ssl.html#LIBPQ-SSL-PROTECTION for details on what level of -// security each sslmode provides. -// -// The sslmode "prefer" (the default), sslmode "allow", and multiple hosts are implemented via the Fallbacks field of -// the Config struct. If TLSConfig is manually changed it will not affect the fallbacks. For example, in the case of -// sslmode "prefer" this means it will first try the main Config settings which use TLS, then it will try the fallback -// which does not use TLS. This can lead to an unexpected unencrypted connection if the main TLS config is manually -// changed later but the unencrypted fallback is present. Ensure there are no stale fallbacks when manually setting -// TLCConfig. -// -// Other known differences with libpq: -// -// If a host name resolves into multiple addresses, libpq will try all addresses. pgconn will only try the first. -// -// When multiple hosts are specified, libpq allows them to have different passwords set via the .pgpass file. pgconn -// does not. -// -// In addition, ParseConfig accepts the following options: -// -// min_read_buffer_size -// The minimum size of the internal read buffer. Default 8192. -// servicefile -// libpq only reads servicefile from the PGSERVICEFILE environment variable. ParseConfig accepts servicefile as a -// part of the connection string. -func ParseConfig(connString string) (*Config, error) { - defaultSettings := defaultSettings() - envSettings := parseEnvSettings() - - connStringSettings := make(map[string]string) - if connString != "" { - var err error - // connString may be a database URL or a DSN - if strings.HasPrefix(connString, "postgres://") || strings.HasPrefix(connString, "postgresql://") { - connStringSettings, err = parseURLSettings(connString) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "failed to parse as URL", err: err} - } - } else { - connStringSettings, err = parseDSNSettings(connString) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "failed to parse as DSN", err: err} - } - } - } - - settings := mergeSettings(defaultSettings, envSettings, connStringSettings) - if service, present := settings["service"]; present { - serviceSettings, err := parseServiceSettings(settings["servicefile"], service) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "failed to read service", err: err} - } - - settings = mergeSettings(defaultSettings, envSettings, serviceSettings, connStringSettings) - } - - minReadBufferSize, err := strconv.ParseInt(settings["min_read_buffer_size"], 10, 32) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "cannot parse min_read_buffer_size", err: err} - } - - config := &Config{ - createdByParseConfig: true, - Database: settings["database"], - User: settings["user"], - Password: settings["password"], - RuntimeParams: make(map[string]string), - BuildFrontend: makeDefaultBuildFrontendFunc(int(minReadBufferSize)), - } - - if connectTimeoutSetting, present := settings["connect_timeout"]; present { - connectTimeout, err := parseConnectTimeoutSetting(connectTimeoutSetting) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "invalid connect_timeout", err: err} - } - config.ConnectTimeout = connectTimeout - config.DialFunc = makeConnectTimeoutDialFunc(connectTimeout) - } else { - defaultDialer := makeDefaultDialer() - config.DialFunc = defaultDialer.DialContext - } - - config.LookupFunc = makeDefaultResolver().LookupHost - - notRuntimeParams := map[string]struct{}{ - "host": {}, - "port": {}, - "database": {}, - "user": {}, - "password": {}, - "passfile": {}, - "connect_timeout": {}, - "sslmode": {}, - "sslkey": {}, - "sslcert": {}, - "sslrootcert": {}, - "target_session_attrs": {}, - "min_read_buffer_size": {}, - "service": {}, - "servicefile": {}, - } - - for k, v := range settings { - if _, present := notRuntimeParams[k]; present { - continue - } - config.RuntimeParams[k] = v - } - - fallbacks := []*FallbackConfig{} - - hosts := strings.Split(settings["host"], ",") - ports := strings.Split(settings["port"], ",") - - for i, host := range hosts { - var portStr string - if i < len(ports) { - portStr = ports[i] - } else { - portStr = ports[0] - } - - port, err := parsePort(portStr) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "invalid port", err: err} - } - - var tlsConfigs []*tls.Config - - // Ignore TLS settings if Unix domain socket like libpq - if network, _ := NetworkAddress(host, port); network == "unix" { - tlsConfigs = append(tlsConfigs, nil) - } else { - var err error - tlsConfigs, err = configTLS(settings, host) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "failed to configure TLS", err: err} - } - } - - for _, tlsConfig := range tlsConfigs { - fallbacks = append(fallbacks, &FallbackConfig{ - Host: host, - Port: port, - TLSConfig: tlsConfig, - }) - } - } - - config.Host = fallbacks[0].Host - config.Port = fallbacks[0].Port - config.TLSConfig = fallbacks[0].TLSConfig - config.Fallbacks = fallbacks[1:] - - passfile, err := pgpassfile.ReadPassfile(settings["passfile"]) - if err == nil { - if config.Password == "" { - host := config.Host - if network, _ := NetworkAddress(config.Host, config.Port); network == "unix" { - host = "localhost" - } - - config.Password = passfile.FindPassword(host, strconv.Itoa(int(config.Port)), config.Database, config.User) - } - } - - switch tsa := settings["target_session_attrs"]; tsa { - case "read-write": - config.ValidateConnect = ValidateConnectTargetSessionAttrsReadWrite - case "read-only": - config.ValidateConnect = ValidateConnectTargetSessionAttrsReadOnly - case "primary": - config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary - case "standby": - config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby - case "any", "prefer-standby": - // do nothing - default: - return nil, &parseConfigError{connString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)} - } - - return config, nil -} - -func mergeSettings(settingSets ...map[string]string) map[string]string { - settings := make(map[string]string) - - for _, s2 := range settingSets { - for k, v := range s2 { - settings[k] = v - } - } - - return settings -} - -func parseEnvSettings() map[string]string { - settings := make(map[string]string) - - nameMap := map[string]string{ - "PGHOST": "host", - "PGPORT": "port", - "PGDATABASE": "database", - "PGUSER": "user", - "PGPASSWORD": "password", - "PGPASSFILE": "passfile", - "PGAPPNAME": "application_name", - "PGCONNECT_TIMEOUT": "connect_timeout", - "PGSSLMODE": "sslmode", - "PGSSLKEY": "sslkey", - "PGSSLCERT": "sslcert", - "PGSSLROOTCERT": "sslrootcert", - "PGTARGETSESSIONATTRS": "target_session_attrs", - "PGSERVICE": "service", - "PGSERVICEFILE": "servicefile", - } - - for envname, realname := range nameMap { - value := os.Getenv(envname) - if value != "" { - settings[realname] = value - } - } - - return settings -} - -func parseURLSettings(connString string) (map[string]string, error) { - settings := make(map[string]string) - - url, err := url.Parse(connString) - if err != nil { - return nil, err - } - - if url.User != nil { - settings["user"] = url.User.Username() - if password, present := url.User.Password(); present { - settings["password"] = password - } - } - - // Handle multiple host:port's in url.Host by splitting them into host,host,host and port,port,port. - var hosts []string - var ports []string - for _, host := range strings.Split(url.Host, ",") { - if host == "" { - continue - } - if isIPOnly(host) { - hosts = append(hosts, strings.Trim(host, "[]")) - continue - } - h, p, err := net.SplitHostPort(host) - if err != nil { - return nil, fmt.Errorf("failed to split host:port in '%s', err: %w", host, err) - } - if h != "" { - hosts = append(hosts, h) - } - if p != "" { - ports = append(ports, p) - } - } - if len(hosts) > 0 { - settings["host"] = strings.Join(hosts, ",") - } - if len(ports) > 0 { - settings["port"] = strings.Join(ports, ",") - } - - database := strings.TrimLeft(url.Path, "/") - if database != "" { - settings["database"] = database - } - - nameMap := map[string]string{ - "dbname": "database", - } - - for k, v := range url.Query() { - if k2, present := nameMap[k]; present { - k = k2 - } - - settings[k] = v[0] - } - - return settings, nil -} - -func isIPOnly(host string) bool { - return net.ParseIP(strings.Trim(host, "[]")) != nil || !strings.Contains(host, ":") -} - -var asciiSpace = [256]uint8{'\t': 1, '\n': 1, '\v': 1, '\f': 1, '\r': 1, ' ': 1} - -func parseDSNSettings(s string) (map[string]string, error) { - settings := make(map[string]string) - - nameMap := map[string]string{ - "dbname": "database", - } - - for len(s) > 0 { - var key, val string - eqIdx := strings.IndexRune(s, '=') - if eqIdx < 0 { - return nil, errors.New("invalid dsn") - } - - key = strings.Trim(s[:eqIdx], " \t\n\r\v\f") - s = strings.TrimLeft(s[eqIdx+1:], " \t\n\r\v\f") - if len(s) == 0 { - } else if s[0] != '\'' { - end := 0 - for ; end < len(s); end++ { - if asciiSpace[s[end]] == 1 { - break - } - if s[end] == '\\' { - end++ - if end == len(s) { - return nil, errors.New("invalid backslash") - } - } - } - val = strings.Replace(strings.Replace(s[:end], "\\\\", "\\", -1), "\\'", "'", -1) - if end == len(s) { - s = "" - } else { - s = s[end+1:] - } - } else { // quoted string - s = s[1:] - end := 0 - for ; end < len(s); end++ { - if s[end] == '\'' { - break - } - if s[end] == '\\' { - end++ - } - } - if end == len(s) { - return nil, errors.New("unterminated quoted string in connection info string") - } - val = strings.Replace(strings.Replace(s[:end], "\\\\", "\\", -1), "\\'", "'", -1) - if end == len(s) { - s = "" - } else { - s = s[end+1:] - } - } - - if k, ok := nameMap[key]; ok { - key = k - } - - if key == "" { - return nil, errors.New("invalid dsn") - } - - settings[key] = val - } - - return settings, nil -} - -func parseServiceSettings(servicefilePath, serviceName string) (map[string]string, error) { - servicefile, err := pgservicefile.ReadServicefile(servicefilePath) - if err != nil { - return nil, fmt.Errorf("failed to read service file: %v", servicefilePath) - } - - service, err := servicefile.GetService(serviceName) - if err != nil { - return nil, fmt.Errorf("unable to find service: %v", serviceName) - } - - nameMap := map[string]string{ - "dbname": "database", - } - - settings := make(map[string]string, len(service.Settings)) - for k, v := range service.Settings { - if k2, present := nameMap[k]; present { - k = k2 - } - settings[k] = v - } - - return settings, nil -} - -// configTLS uses libpq's TLS parameters to construct []*tls.Config. It is -// necessary to allow returning multiple TLS configs as sslmode "allow" and -// "prefer" allow fallback. -func configTLS(settings map[string]string, thisHost string) ([]*tls.Config, error) { - host := thisHost - sslmode := settings["sslmode"] - sslrootcert := settings["sslrootcert"] - sslcert := settings["sslcert"] - sslkey := settings["sslkey"] - - // Match libpq default behavior - if sslmode == "" { - sslmode = "prefer" - } - - tlsConfig := &tls.Config{} - - switch sslmode { - case "disable": - return []*tls.Config{nil}, nil - case "allow", "prefer": - tlsConfig.InsecureSkipVerify = true - case "require": - // According to PostgreSQL documentation, if a root CA file exists, - // the behavior of sslmode=require should be the same as that of verify-ca - // - // See https://www.postgresql.org/docs/12/libpq-ssl.html - if sslrootcert != "" { - goto nextCase - } - tlsConfig.InsecureSkipVerify = true - break - nextCase: - fallthrough - case "verify-ca": - // Don't perform the default certificate verification because it - // will verify the hostname. Instead, verify the server's - // certificate chain ourselves in VerifyPeerCertificate and - // ignore the server name. This emulates libpq's verify-ca - // behavior. - // - // See https://github.com/golang/go/issues/21971#issuecomment-332693931 - // and https://pkg.go.dev/crypto/tls?tab=doc#example-Config-VerifyPeerCertificate - // for more info. - tlsConfig.InsecureSkipVerify = true - tlsConfig.VerifyPeerCertificate = func(certificates [][]byte, _ [][]*x509.Certificate) error { - certs := make([]*x509.Certificate, len(certificates)) - for i, asn1Data := range certificates { - cert, err := x509.ParseCertificate(asn1Data) - if err != nil { - return errors.New("failed to parse certificate from server: " + err.Error()) - } - certs[i] = cert - } - - // Leave DNSName empty to skip hostname verification. - opts := x509.VerifyOptions{ - Roots: tlsConfig.RootCAs, - Intermediates: x509.NewCertPool(), - } - // Skip the first cert because it's the leaf. All others - // are intermediates. - for _, cert := range certs[1:] { - opts.Intermediates.AddCert(cert) - } - _, err := certs[0].Verify(opts) - return err - } - case "verify-full": - tlsConfig.ServerName = host - default: - return nil, errors.New("sslmode is invalid") - } - - if sslrootcert != "" { - caCertPool := x509.NewCertPool() - - caPath := sslrootcert - caCert, err := ioutil.ReadFile(caPath) - if err != nil { - return nil, fmt.Errorf("unable to read CA file: %w", err) - } - - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, errors.New("unable to add CA to cert pool") - } - - tlsConfig.RootCAs = caCertPool - tlsConfig.ClientCAs = caCertPool - } - - if (sslcert != "" && sslkey == "") || (sslcert == "" && sslkey != "") { - return nil, errors.New(`both "sslcert" and "sslkey" are required`) - } - - if sslcert != "" && sslkey != "" { - cert, err := tls.LoadX509KeyPair(sslcert, sslkey) - if err != nil { - return nil, fmt.Errorf("unable to read cert: %w", err) - } - - tlsConfig.Certificates = []tls.Certificate{cert} - } - - switch sslmode { - case "allow": - return []*tls.Config{nil, tlsConfig}, nil - case "prefer": - return []*tls.Config{tlsConfig, nil}, nil - case "require", "verify-ca", "verify-full": - return []*tls.Config{tlsConfig}, nil - default: - panic("BUG: bad sslmode should already have been caught") - } -} - -func parsePort(s string) (uint16, error) { - port, err := strconv.ParseUint(s, 10, 16) - if err != nil { - return 0, err - } - if port < 1 || port > math.MaxUint16 { - return 0, errors.New("outside range") - } - return uint16(port), nil -} - -func makeDefaultDialer() *net.Dialer { - return &net.Dialer{KeepAlive: 5 * time.Minute} -} - -func makeDefaultResolver() *net.Resolver { - return net.DefaultResolver -} - -func makeDefaultBuildFrontendFunc(minBufferLen int) BuildFrontendFunc { - return func(r io.Reader, w io.Writer) Frontend { - cr, err := chunkreader.NewConfig(r, chunkreader.Config{MinBufLen: minBufferLen}) - if err != nil { - panic(fmt.Sprintf("BUG: chunkreader.NewConfig failed: %v", err)) - } - frontend := pgproto3.NewFrontend(cr, w) - - return frontend - } -} - -func parseConnectTimeoutSetting(s string) (time.Duration, error) { - timeout, err := strconv.ParseInt(s, 10, 64) - if err != nil { - return 0, err - } - if timeout < 0 { - return 0, errors.New("negative timeout") - } - return time.Duration(timeout) * time.Second, nil -} - -func makeConnectTimeoutDialFunc(timeout time.Duration) DialFunc { - d := makeDefaultDialer() - d.Timeout = timeout - return d.DialContext -} - -// ValidateConnectTargetSessionAttrsReadWrite is an ValidateConnectFunc that implements libpq compatible -// target_session_attrs=read-write. -func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error { - result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read() - if result.Err != nil { - return result.Err - } - - if string(result.Rows[0][0]) == "on" { - return errors.New("read only connection") - } - - return nil -} - -// ValidateConnectTargetSessionAttrsReadOnly is an ValidateConnectFunc that implements libpq compatible -// target_session_attrs=read-only. -func ValidateConnectTargetSessionAttrsReadOnly(ctx context.Context, pgConn *PgConn) error { - result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read() - if result.Err != nil { - return result.Err - } - - if string(result.Rows[0][0]) != "on" { - return errors.New("connection is not read only") - } - - return nil -} - -// ValidateConnectTargetSessionAttrsStandby is an ValidateConnectFunc that implements libpq compatible -// target_session_attrs=standby. -func ValidateConnectTargetSessionAttrsStandby(ctx context.Context, pgConn *PgConn) error { - result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read() - if result.Err != nil { - return result.Err - } - - if string(result.Rows[0][0]) != "t" { - return errors.New("server is not in hot standby mode") - } - - return nil -} - -// ValidateConnectTargetSessionAttrsPrimary is an ValidateConnectFunc that implements libpq compatible -// target_session_attrs=primary. -func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgConn) error { - result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read() - if result.Err != nil { - return result.Err - } - - if string(result.Rows[0][0]) == "t" { - return errors.New("server is in standby mode") - } - - return nil -} diff --git a/vendor/github.com/jackc/pgconn/defaults.go b/vendor/github.com/jackc/pgconn/defaults.go deleted file mode 100644 index f69cad317..000000000 --- a/vendor/github.com/jackc/pgconn/defaults.go +++ /dev/null @@ -1,64 +0,0 @@ -// +build !windows - -package pgconn - -import ( - "os" - "os/user" - "path/filepath" -) - -func defaultSettings() map[string]string { - settings := make(map[string]string) - - settings["host"] = defaultHost() - settings["port"] = "5432" - - // Default to the OS user name. Purposely ignoring err getting user name from - // OS. The client application will simply have to specify the user in that - // case (which they typically will be doing anyway). - user, err := user.Current() - if err == nil { - settings["user"] = user.Username - settings["passfile"] = filepath.Join(user.HomeDir, ".pgpass") - settings["servicefile"] = filepath.Join(user.HomeDir, ".pg_service.conf") - sslcert := filepath.Join(user.HomeDir, ".postgresql", "postgresql.crt") - sslkey := filepath.Join(user.HomeDir, ".postgresql", "postgresql.key") - if _, err := os.Stat(sslcert); err == nil { - if _, err := os.Stat(sslkey); err == nil { - // Both the cert and key must be present to use them, or do not use either - settings["sslcert"] = sslcert - settings["sslkey"] = sslkey - } - } - sslrootcert := filepath.Join(user.HomeDir, ".postgresql", "root.crt") - if _, err := os.Stat(sslrootcert); err == nil { - settings["sslrootcert"] = sslrootcert - } - } - - settings["target_session_attrs"] = "any" - - settings["min_read_buffer_size"] = "8192" - - return settings -} - -// defaultHost attempts to mimic libpq's default host. libpq uses the default unix socket location on *nix and localhost -// on Windows. The default socket location is compiled into libpq. Since pgx does not have access to that default it -// checks the existence of common locations. -func defaultHost() string { - candidatePaths := []string{ - "/var/run/postgresql", // Debian - "/private/tmp", // OSX - homebrew - "/tmp", // standard PostgreSQL - } - - for _, path := range candidatePaths { - if _, err := os.Stat(path); err == nil { - return path - } - } - - return "localhost" -} diff --git a/vendor/github.com/jackc/pgconn/defaults_windows.go b/vendor/github.com/jackc/pgconn/defaults_windows.go deleted file mode 100644 index 71eb77dba..000000000 --- a/vendor/github.com/jackc/pgconn/defaults_windows.go +++ /dev/null @@ -1,59 +0,0 @@ -package pgconn - -import ( - "os" - "os/user" - "path/filepath" - "strings" -) - -func defaultSettings() map[string]string { - settings := make(map[string]string) - - settings["host"] = defaultHost() - settings["port"] = "5432" - - // Default to the OS user name. Purposely ignoring err getting user name from - // OS. The client application will simply have to specify the user in that - // case (which they typically will be doing anyway). - user, err := user.Current() - appData := os.Getenv("APPDATA") - if err == nil { - // Windows gives us the username here as `DOMAIN\user` or `LOCALPCNAME\user`, - // but the libpq default is just the `user` portion, so we strip off the first part. - username := user.Username - if strings.Contains(username, "\\") { - username = username[strings.LastIndex(username, "\\")+1:] - } - - settings["user"] = username - settings["passfile"] = filepath.Join(appData, "postgresql", "pgpass.conf") - settings["servicefile"] = filepath.Join(user.HomeDir, ".pg_service.conf") - sslcert := filepath.Join(appData, "postgresql", "postgresql.crt") - sslkey := filepath.Join(appData, "postgresql", "postgresql.key") - if _, err := os.Stat(sslcert); err == nil { - if _, err := os.Stat(sslkey); err == nil { - // Both the cert and key must be present to use them, or do not use either - settings["sslcert"] = sslcert - settings["sslkey"] = sslkey - } - } - sslrootcert := filepath.Join(appData, "postgresql", "root.crt") - if _, err := os.Stat(sslrootcert); err == nil { - settings["sslrootcert"] = sslrootcert - } - } - - settings["target_session_attrs"] = "any" - - settings["min_read_buffer_size"] = "8192" - - return settings -} - -// defaultHost attempts to mimic libpq's default host. libpq uses the default unix socket location on *nix and localhost -// on Windows. The default socket location is compiled into libpq. Since pgx does not have access to that default it -// checks the existence of common locations. -func defaultHost() string { - return "localhost" -} diff --git a/vendor/github.com/jackc/pgconn/doc.go b/vendor/github.com/jackc/pgconn/doc.go deleted file mode 100644 index cde58cd89..000000000 --- a/vendor/github.com/jackc/pgconn/doc.go +++ /dev/null @@ -1,29 +0,0 @@ -// Package pgconn is a low-level PostgreSQL database driver. -/* -pgconn provides lower level access to a PostgreSQL connection than a database/sql or pgx connection. It operates at -nearly the same level is the C library libpq. - -Establishing a Connection - -Use Connect to establish a connection. It accepts a connection string in URL or DSN and will read the environment for -libpq style environment variables. - -Executing a Query - -ExecParams and ExecPrepared execute a single query. They return readers that iterate over each row. The Read method -reads all rows into memory. - -Executing Multiple Queries in a Single Round Trip - -Exec and ExecBatch can execute multiple queries in a single round trip. They return readers that iterate over each query -result. The ReadAll method reads all query results into memory. - -Context Support - -All potentially blocking operations take a context.Context. If a context is canceled while the method is in progress the -method immediately returns. In most circumstances, this will close the underlying connection. - -The CancelRequest method may be used to request the PostgreSQL server cancel an in-progress query without forcing the -client to abort. -*/ -package pgconn diff --git a/vendor/github.com/jackc/pgconn/errors.go b/vendor/github.com/jackc/pgconn/errors.go deleted file mode 100644 index a32b29c92..000000000 --- a/vendor/github.com/jackc/pgconn/errors.go +++ /dev/null @@ -1,221 +0,0 @@ -package pgconn - -import ( - "context" - "errors" - "fmt" - "net" - "net/url" - "regexp" - "strings" -) - -// SafeToRetry checks if the err is guaranteed to have occurred before sending any data to the server. -func SafeToRetry(err error) bool { - if e, ok := err.(interface{ SafeToRetry() bool }); ok { - return e.SafeToRetry() - } - return false -} - -// Timeout checks if err was was caused by a timeout. To be specific, it is true if err was caused within pgconn by a -// context.Canceled, context.DeadlineExceeded or an implementer of net.Error where Timeout() is true. -func Timeout(err error) bool { - var timeoutErr *errTimeout - return errors.As(err, &timeoutErr) -} - -// PgError represents an error reported by the PostgreSQL server. See -// http://www.postgresql.org/docs/11/static/protocol-error-fields.html for -// detailed field description. -type PgError struct { - Severity string - Code string - Message string - Detail string - Hint string - Position int32 - InternalPosition int32 - InternalQuery string - Where string - SchemaName string - TableName string - ColumnName string - DataTypeName string - ConstraintName string - File string - Line int32 - Routine string -} - -func (pe *PgError) Error() string { - return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")" -} - -// SQLState returns the SQLState of the error. -func (pe *PgError) SQLState() string { - return pe.Code -} - -type connectError struct { - config *Config - msg string - err error -} - -func (e *connectError) Error() string { - sb := &strings.Builder{} - fmt.Fprintf(sb, "failed to connect to `host=%s user=%s database=%s`: %s", e.config.Host, e.config.User, e.config.Database, e.msg) - if e.err != nil { - fmt.Fprintf(sb, " (%s)", e.err.Error()) - } - return sb.String() -} - -func (e *connectError) Unwrap() error { - return e.err -} - -type connLockError struct { - status string -} - -func (e *connLockError) SafeToRetry() bool { - return true // a lock failure by definition happens before the connection is used. -} - -func (e *connLockError) Error() string { - return e.status -} - -type parseConfigError struct { - connString string - msg string - err error -} - -func (e *parseConfigError) Error() string { - connString := redactPW(e.connString) - if e.err == nil { - return fmt.Sprintf("cannot parse `%s`: %s", connString, e.msg) - } - return fmt.Sprintf("cannot parse `%s`: %s (%s)", connString, e.msg, e.err.Error()) -} - -func (e *parseConfigError) Unwrap() error { - return e.err -} - -// preferContextOverNetTimeoutError returns ctx.Err() if ctx.Err() is present and err is a net.Error with Timeout() == -// true. Otherwise returns err. -func preferContextOverNetTimeoutError(ctx context.Context, err error) error { - if err, ok := err.(net.Error); ok && err.Timeout() && ctx.Err() != nil { - return &errTimeout{err: ctx.Err()} - } - return err -} - -type pgconnError struct { - msg string - err error - safeToRetry bool -} - -func (e *pgconnError) Error() string { - if e.msg == "" { - return e.err.Error() - } - if e.err == nil { - return e.msg - } - return fmt.Sprintf("%s: %s", e.msg, e.err.Error()) -} - -func (e *pgconnError) SafeToRetry() bool { - return e.safeToRetry -} - -func (e *pgconnError) Unwrap() error { - return e.err -} - -// errTimeout occurs when an error was caused by a timeout. Specifically, it wraps an error which is -// context.Canceled, context.DeadlineExceeded, or an implementer of net.Error where Timeout() is true. -type errTimeout struct { - err error -} - -func (e *errTimeout) Error() string { - return fmt.Sprintf("timeout: %s", e.err.Error()) -} - -func (e *errTimeout) SafeToRetry() bool { - return SafeToRetry(e.err) -} - -func (e *errTimeout) Unwrap() error { - return e.err -} - -type contextAlreadyDoneError struct { - err error -} - -func (e *contextAlreadyDoneError) Error() string { - return fmt.Sprintf("context already done: %s", e.err.Error()) -} - -func (e *contextAlreadyDoneError) SafeToRetry() bool { - return true -} - -func (e *contextAlreadyDoneError) Unwrap() error { - return e.err -} - -// newContextAlreadyDoneError double-wraps a context error in `contextAlreadyDoneError` and `errTimeout`. -func newContextAlreadyDoneError(ctx context.Context) (err error) { - return &errTimeout{&contextAlreadyDoneError{err: ctx.Err()}} -} - -type writeError struct { - err error - safeToRetry bool -} - -func (e *writeError) Error() string { - return fmt.Sprintf("write failed: %s", e.err.Error()) -} - -func (e *writeError) SafeToRetry() bool { - return e.safeToRetry -} - -func (e *writeError) Unwrap() error { - return e.err -} - -func redactPW(connString string) string { - if strings.HasPrefix(connString, "postgres://") || strings.HasPrefix(connString, "postgresql://") { - if u, err := url.Parse(connString); err == nil { - return redactURL(u) - } - } - quotedDSN := regexp.MustCompile(`password='[^']*'`) - connString = quotedDSN.ReplaceAllLiteralString(connString, "password=xxxxx") - plainDSN := regexp.MustCompile(`password=[^ ]*`) - connString = plainDSN.ReplaceAllLiteralString(connString, "password=xxxxx") - brokenURL := regexp.MustCompile(`:[^:@]+?@`) - connString = brokenURL.ReplaceAllLiteralString(connString, ":xxxxxx@") - return connString -} - -func redactURL(u *url.URL) string { - if u == nil { - return "" - } - if _, pwSet := u.User.Password(); pwSet { - u.User = url.UserPassword(u.User.Username(), "xxxxx") - } - return u.String() -} diff --git a/vendor/github.com/jackc/pgconn/internal/ctxwatch/context_watcher.go b/vendor/github.com/jackc/pgconn/internal/ctxwatch/context_watcher.go deleted file mode 100644 index b39cb3ee5..000000000 --- a/vendor/github.com/jackc/pgconn/internal/ctxwatch/context_watcher.go +++ /dev/null @@ -1,73 +0,0 @@ -package ctxwatch - -import ( - "context" - "sync" -) - -// ContextWatcher watches a context and performs an action when the context is canceled. It can watch one context at a -// time. -type ContextWatcher struct { - onCancel func() - onUnwatchAfterCancel func() - unwatchChan chan struct{} - - lock sync.Mutex - watchInProgress bool - onCancelWasCalled bool -} - -// NewContextWatcher returns a ContextWatcher. onCancel will be called when a watched context is canceled. -// OnUnwatchAfterCancel will be called when Unwatch is called and the watched context had already been canceled and -// onCancel called. -func NewContextWatcher(onCancel func(), onUnwatchAfterCancel func()) *ContextWatcher { - cw := &ContextWatcher{ - onCancel: onCancel, - onUnwatchAfterCancel: onUnwatchAfterCancel, - unwatchChan: make(chan struct{}), - } - - return cw -} - -// Watch starts watching ctx. If ctx is canceled then the onCancel function passed to NewContextWatcher will be called. -func (cw *ContextWatcher) Watch(ctx context.Context) { - cw.lock.Lock() - defer cw.lock.Unlock() - - if cw.watchInProgress { - panic("Watch already in progress") - } - - cw.onCancelWasCalled = false - - if ctx.Done() != nil { - cw.watchInProgress = true - go func() { - select { - case <-ctx.Done(): - cw.onCancel() - cw.onCancelWasCalled = true - <-cw.unwatchChan - case <-cw.unwatchChan: - } - }() - } else { - cw.watchInProgress = false - } -} - -// Unwatch stops watching the previously watched context. If the onCancel function passed to NewContextWatcher was -// called then onUnwatchAfterCancel will also be called. -func (cw *ContextWatcher) Unwatch() { - cw.lock.Lock() - defer cw.lock.Unlock() - - if cw.watchInProgress { - cw.unwatchChan <- struct{}{} - if cw.onCancelWasCalled { - cw.onUnwatchAfterCancel() - } - cw.watchInProgress = false - } -} diff --git a/vendor/github.com/jackc/pgconn/pgconn.go b/vendor/github.com/jackc/pgconn/pgconn.go deleted file mode 100644 index 7bf2f20ef..000000000 --- a/vendor/github.com/jackc/pgconn/pgconn.go +++ /dev/null @@ -1,1723 +0,0 @@ -package pgconn - -import ( - "context" - "crypto/md5" - "crypto/tls" - "encoding/binary" - "encoding/hex" - "errors" - "fmt" - "io" - "math" - "net" - "strconv" - "strings" - "sync" - "time" - - "github.com/jackc/pgconn/internal/ctxwatch" - "github.com/jackc/pgio" - "github.com/jackc/pgproto3/v2" -) - -const ( - connStatusUninitialized = iota - connStatusConnecting - connStatusClosed - connStatusIdle - connStatusBusy -) - -const wbufLen = 1024 - -// Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from -// LISTEN/NOTIFY notification. -type Notice PgError - -// Notification is a message received from the PostgreSQL LISTEN/NOTIFY system -type Notification struct { - PID uint32 // backend pid that sent the notification - Channel string // channel from which notification was received - Payload string -} - -// DialFunc is a function that can be used to connect to a PostgreSQL server. -type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) - -// LookupFunc is a function that can be used to lookup IPs addrs from host. Optionally an ip:port combination can be -// returned in order to override the connection string's port. -type LookupFunc func(ctx context.Context, host string) (addrs []string, err error) - -// BuildFrontendFunc is a function that can be used to create Frontend implementation for connection. -type BuildFrontendFunc func(r io.Reader, w io.Writer) Frontend - -// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at -// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin -// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY -// notification. -type NoticeHandler func(*PgConn, *Notice) - -// NotificationHandler is a function that can handle notifications received from the PostgreSQL server. Notifications -// can be received at any time, usually during handling of a query response. The *PgConn is provided so the handler is -// aware of the origin of the notice, but it must not invoke any query method. Be aware that this is distinct from a -// notice event. -type NotificationHandler func(*PgConn, *Notification) - -// Frontend used to receive messages from backend. -type Frontend interface { - Receive() (pgproto3.BackendMessage, error) -} - -// PgConn is a low-level PostgreSQL connection handle. It is not safe for concurrent usage. -type PgConn struct { - conn net.Conn // the underlying TCP or unix domain socket connection - pid uint32 // backend pid - secretKey uint32 // key to use to send a cancel query message to the server - parameterStatuses map[string]string // parameters that have been reported by the server - txStatus byte - frontend Frontend - - config *Config - - status byte // One of connStatus* constants - - bufferingReceive bool - bufferingReceiveMux sync.Mutex - bufferingReceiveMsg pgproto3.BackendMessage - bufferingReceiveErr error - - peekedMsg pgproto3.BackendMessage - - // Reusable / preallocated resources - wbuf []byte // write buffer - resultReader ResultReader - multiResultReader MultiResultReader - contextWatcher *ctxwatch.ContextWatcher - - cleanupDone chan struct{} -} - -// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format) -// to provide configuration. See documention for ParseConfig for details. ctx can be used to cancel a connect attempt. -func Connect(ctx context.Context, connString string) (*PgConn, error) { - config, err := ParseConfig(connString) - if err != nil { - return nil, err - } - - return ConnectConfig(ctx, config) -} - -// Connect establishes a connection to a PostgreSQL server using config. config must have been constructed with -// ParseConfig. ctx can be used to cancel a connect attempt. -// -// If config.Fallbacks are present they will sequentially be tried in case of error establishing network connection. An -// authentication error will terminate the chain of attempts (like libpq: -// https://www.postgresql.org/docs/11/libpq-connect.html#LIBPQ-MULTIPLE-HOSTS) and be returned as the error. Otherwise, -// if all attempts fail the last error is returned. -func ConnectConfig(ctx context.Context, config *Config) (pgConn *PgConn, err error) { - // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from - // zero values. - if !config.createdByParseConfig { - panic("config must be created by ParseConfig") - } - - // ConnectTimeout restricts the whole connection process. - if config.ConnectTimeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, config.ConnectTimeout) - defer cancel() - } - // Simplify usage by treating primary config and fallbacks the same. - fallbackConfigs := []*FallbackConfig{ - { - Host: config.Host, - Port: config.Port, - TLSConfig: config.TLSConfig, - }, - } - fallbackConfigs = append(fallbackConfigs, config.Fallbacks...) - - fallbackConfigs, err = expandWithIPs(ctx, config.LookupFunc, fallbackConfigs) - if err != nil { - return nil, &connectError{config: config, msg: "hostname resolving error", err: err} - } - - if len(fallbackConfigs) == 0 { - return nil, &connectError{config: config, msg: "hostname resolving error", err: errors.New("ip addr wasn't found")} - } - - for _, fc := range fallbackConfigs { - pgConn, err = connect(ctx, config, fc) - if err == nil { - break - } else if pgerr, ok := err.(*PgError); ok { - err = &connectError{config: config, msg: "server error", err: pgerr} - ERRCODE_INVALID_PASSWORD := "28P01" // worng password - ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION := "28000" // db does not exist - if pgerr.Code == ERRCODE_INVALID_PASSWORD || pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION { - break - } - } - } - - if err != nil { - return nil, err // no need to wrap in connectError because it will already be wrapped in all cases except PgError - } - - if config.AfterConnect != nil { - err := config.AfterConnect(ctx, pgConn) - if err != nil { - pgConn.conn.Close() - return nil, &connectError{config: config, msg: "AfterConnect error", err: err} - } - } - - return pgConn, nil -} - -func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*FallbackConfig) ([]*FallbackConfig, error) { - var configs []*FallbackConfig - - for _, fb := range fallbacks { - // skip resolve for unix sockets - if strings.HasPrefix(fb.Host, "/") { - configs = append(configs, &FallbackConfig{ - Host: fb.Host, - Port: fb.Port, - TLSConfig: fb.TLSConfig, - }) - - continue - } - - ips, err := lookupFn(ctx, fb.Host) - if err != nil { - return nil, err - } - - for _, ip := range ips { - splitIP, splitPort, err := net.SplitHostPort(ip) - if err == nil { - port, err := strconv.ParseUint(splitPort, 10, 16) - if err != nil { - return nil, fmt.Errorf("error parsing port (%s) from lookup: %w", splitPort, err) - } - configs = append(configs, &FallbackConfig{ - Host: splitIP, - Port: uint16(port), - TLSConfig: fb.TLSConfig, - }) - } else { - configs = append(configs, &FallbackConfig{ - Host: ip, - Port: fb.Port, - TLSConfig: fb.TLSConfig, - }) - } - } - } - - return configs, nil -} - -func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig) (*PgConn, error) { - pgConn := new(PgConn) - pgConn.config = config - pgConn.wbuf = make([]byte, 0, wbufLen) - pgConn.cleanupDone = make(chan struct{}) - - var err error - network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port) - netConn, err := config.DialFunc(ctx, network, address) - if err != nil { - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - err = &errTimeout{err: err} - } - return nil, &connectError{config: config, msg: "dial error", err: err} - } - - pgConn.conn = netConn - pgConn.contextWatcher = newContextWatcher(netConn) - pgConn.contextWatcher.Watch(ctx) - - if fallbackConfig.TLSConfig != nil { - tlsConn, err := startTLS(netConn, fallbackConfig.TLSConfig) - pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS. - if err != nil { - netConn.Close() - return nil, &connectError{config: config, msg: "tls error", err: err} - } - - pgConn.conn = tlsConn - pgConn.contextWatcher = newContextWatcher(tlsConn) - pgConn.contextWatcher.Watch(ctx) - } - - defer pgConn.contextWatcher.Unwatch() - - pgConn.parameterStatuses = make(map[string]string) - pgConn.status = connStatusConnecting - pgConn.frontend = config.BuildFrontend(pgConn.conn, pgConn.conn) - - startupMsg := pgproto3.StartupMessage{ - ProtocolVersion: pgproto3.ProtocolVersionNumber, - Parameters: make(map[string]string), - } - - // Copy default run-time params - for k, v := range config.RuntimeParams { - startupMsg.Parameters[k] = v - } - - startupMsg.Parameters["user"] = config.User - if config.Database != "" { - startupMsg.Parameters["database"] = config.Database - } - - if _, err := pgConn.conn.Write(startupMsg.Encode(pgConn.wbuf)); err != nil { - pgConn.conn.Close() - return nil, &connectError{config: config, msg: "failed to write startup message", err: err} - } - - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.conn.Close() - if err, ok := err.(*PgError); ok { - return nil, err - } - return nil, &connectError{config: config, msg: "failed to receive message", err: preferContextOverNetTimeoutError(ctx, err)} - } - - switch msg := msg.(type) { - case *pgproto3.BackendKeyData: - pgConn.pid = msg.ProcessID - pgConn.secretKey = msg.SecretKey - - case *pgproto3.AuthenticationOk: - case *pgproto3.AuthenticationCleartextPassword: - err = pgConn.txPasswordMessage(pgConn.config.Password) - if err != nil { - pgConn.conn.Close() - return nil, &connectError{config: config, msg: "failed to write password message", err: err} - } - case *pgproto3.AuthenticationMD5Password: - digestedPassword := "md5" + hexMD5(hexMD5(pgConn.config.Password+pgConn.config.User)+string(msg.Salt[:])) - err = pgConn.txPasswordMessage(digestedPassword) - if err != nil { - pgConn.conn.Close() - return nil, &connectError{config: config, msg: "failed to write password message", err: err} - } - case *pgproto3.AuthenticationSASL: - err = pgConn.scramAuth(msg.AuthMechanisms) - if err != nil { - pgConn.conn.Close() - return nil, &connectError{config: config, msg: "failed SASL auth", err: err} - } - - case *pgproto3.ReadyForQuery: - pgConn.status = connStatusIdle - if config.ValidateConnect != nil { - // ValidateConnect may execute commands that cause the context to be watched again. Unwatch first to avoid - // the watch already in progress panic. This is that last thing done by this method so there is no need to - // restart the watch after ValidateConnect returns. - // - // See https://github.com/jackc/pgconn/issues/40. - pgConn.contextWatcher.Unwatch() - - err := config.ValidateConnect(ctx, pgConn) - if err != nil { - pgConn.conn.Close() - return nil, &connectError{config: config, msg: "ValidateConnect failed", err: err} - } - } - return pgConn, nil - case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse: - // handled by ReceiveMessage - case *pgproto3.ErrorResponse: - pgConn.conn.Close() - return nil, ErrorResponseToPgError(msg) - default: - pgConn.conn.Close() - return nil, &connectError{config: config, msg: "received unexpected message", err: err} - } - } -} - -func newContextWatcher(conn net.Conn) *ctxwatch.ContextWatcher { - return ctxwatch.NewContextWatcher( - func() { conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) }, - func() { conn.SetDeadline(time.Time{}) }, - ) -} - -func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) { - err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103}) - if err != nil { - return nil, err - } - - response := make([]byte, 1) - if _, err = io.ReadFull(conn, response); err != nil { - return nil, err - } - - if response[0] != 'S' { - return nil, errors.New("server refused TLS connection") - } - - return tls.Client(conn, tlsConfig), nil -} - -func (pgConn *PgConn) txPasswordMessage(password string) (err error) { - msg := &pgproto3.PasswordMessage{Password: password} - _, err = pgConn.conn.Write(msg.Encode(pgConn.wbuf)) - return err -} - -func hexMD5(s string) string { - hash := md5.New() - io.WriteString(hash, s) - return hex.EncodeToString(hash.Sum(nil)) -} - -func (pgConn *PgConn) signalMessage() chan struct{} { - if pgConn.bufferingReceive { - panic("BUG: signalMessage when already in progress") - } - - pgConn.bufferingReceive = true - pgConn.bufferingReceiveMux.Lock() - - ch := make(chan struct{}) - go func() { - pgConn.bufferingReceiveMsg, pgConn.bufferingReceiveErr = pgConn.frontend.Receive() - pgConn.bufferingReceiveMux.Unlock() - close(ch) - }() - - return ch -} - -// SendBytes sends buf to the PostgreSQL server. It must only be used when the connection is not busy. e.g. It is as -// error to call SendBytes while reading the result of a query. -// -// This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. -// See https://www.postgresql.org/docs/current/protocol.html. -func (pgConn *PgConn) SendBytes(ctx context.Context, buf []byte) error { - if err := pgConn.lock(); err != nil { - return err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - n, err := pgConn.conn.Write(buf) - if err != nil { - pgConn.asyncClose() - return &writeError{err: err, safeToRetry: n == 0} - } - - return nil -} - -// ReceiveMessage receives one wire protocol message from the PostgreSQL server. It must only be used when the -// connection is not busy. e.g. It is an error to call ReceiveMessage while reading the result of a query. The messages -// are still handled by the core pgconn message handling system so receiving a NotificationResponse will still trigger -// the OnNotification callback. -// -// This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. -// See https://www.postgresql.org/docs/current/protocol.html. -func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessage, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return nil, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - msg, err := pgConn.receiveMessage() - if err != nil { - err = &pgconnError{ - msg: "receive message failed", - err: preferContextOverNetTimeoutError(ctx, err), - safeToRetry: true} - } - return msg, err -} - -// peekMessage peeks at the next message without setting up context cancellation. -func (pgConn *PgConn) peekMessage() (pgproto3.BackendMessage, error) { - if pgConn.peekedMsg != nil { - return pgConn.peekedMsg, nil - } - - var msg pgproto3.BackendMessage - var err error - if pgConn.bufferingReceive { - pgConn.bufferingReceiveMux.Lock() - msg = pgConn.bufferingReceiveMsg - err = pgConn.bufferingReceiveErr - pgConn.bufferingReceiveMux.Unlock() - pgConn.bufferingReceive = false - - // If a timeout error happened in the background try the read again. - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - msg, err = pgConn.frontend.Receive() - } - } else { - msg, err = pgConn.frontend.Receive() - } - - if err != nil { - // Close on anything other than timeout error - everything else is fatal - var netErr net.Error - isNetErr := errors.As(err, &netErr) - if !(isNetErr && netErr.Timeout()) { - pgConn.asyncClose() - } - - return nil, err - } - - pgConn.peekedMsg = msg - return msg, nil -} - -// receiveMessage receives a message without setting up context cancellation -func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) { - msg, err := pgConn.peekMessage() - if err != nil { - // Close on anything other than timeout error - everything else is fatal - var netErr net.Error - isNetErr := errors.As(err, &netErr) - if !(isNetErr && netErr.Timeout()) { - pgConn.asyncClose() - } - - return nil, err - } - pgConn.peekedMsg = nil - - switch msg := msg.(type) { - case *pgproto3.ReadyForQuery: - pgConn.txStatus = msg.TxStatus - case *pgproto3.ParameterStatus: - pgConn.parameterStatuses[msg.Name] = msg.Value - case *pgproto3.ErrorResponse: - if msg.Severity == "FATAL" { - pgConn.status = connStatusClosed - pgConn.conn.Close() // Ignore error as the connection is already broken and there is already an error to return. - close(pgConn.cleanupDone) - return nil, ErrorResponseToPgError(msg) - } - case *pgproto3.NoticeResponse: - if pgConn.config.OnNotice != nil { - pgConn.config.OnNotice(pgConn, noticeResponseToNotice(msg)) - } - case *pgproto3.NotificationResponse: - if pgConn.config.OnNotification != nil { - pgConn.config.OnNotification(pgConn, &Notification{PID: msg.PID, Channel: msg.Channel, Payload: msg.Payload}) - } - } - - return msg, nil -} - -// Conn returns the underlying net.Conn. -func (pgConn *PgConn) Conn() net.Conn { - return pgConn.conn -} - -// PID returns the backend PID. -func (pgConn *PgConn) PID() uint32 { - return pgConn.pid -} - -// TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message. -// -// Possible return values: -// 'I' - idle / not in transaction -// 'T' - in a transaction -// 'E' - in a failed transaction -// -// See https://www.postgresql.org/docs/current/protocol-message-formats.html. -func (pgConn *PgConn) TxStatus() byte { - return pgConn.txStatus -} - -// SecretKey returns the backend secret key used to send a cancel query message to the server. -func (pgConn *PgConn) SecretKey() uint32 { - return pgConn.secretKey -} - -// Close closes a connection. It is safe to call Close on a already closed connection. Close attempts a clean close by -// sending the exit message to PostgreSQL. However, this could block so ctx is available to limit the time to wait. The -// underlying net.Conn.Close() will always be called regardless of any other errors. -func (pgConn *PgConn) Close(ctx context.Context) error { - if pgConn.status == connStatusClosed { - return nil - } - pgConn.status = connStatusClosed - - defer close(pgConn.cleanupDone) - defer pgConn.conn.Close() - - if ctx != context.Background() { - // Close may be called while a cancellable query is in progress. This will most often be triggered by panic when - // a defer closes the connection (possibly indirectly via a transaction or a connection pool). Unwatch to end any - // previous watch. It is safe to Unwatch regardless of whether a watch is already is progress. - // - // See https://github.com/jackc/pgconn/issues/29 - pgConn.contextWatcher.Unwatch() - - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - // Ignore any errors sending Terminate message and waiting for server to close connection. - // This mimics the behavior of libpq PQfinish. It calls closePGconn which calls sendTerminateConn which purposefully - // ignores errors. - // - // See https://github.com/jackc/pgx/issues/637 - pgConn.conn.Write([]byte{'X', 0, 0, 0, 4}) - - return pgConn.conn.Close() -} - -// asyncClose marks the connection as closed and asynchronously sends a cancel query message and closes the underlying -// connection. -func (pgConn *PgConn) asyncClose() { - if pgConn.status == connStatusClosed { - return - } - pgConn.status = connStatusClosed - - go func() { - defer close(pgConn.cleanupDone) - defer pgConn.conn.Close() - - deadline := time.Now().Add(time.Second * 15) - - ctx, cancel := context.WithDeadline(context.Background(), deadline) - defer cancel() - - pgConn.CancelRequest(ctx) - - pgConn.conn.SetDeadline(deadline) - - pgConn.conn.Write([]byte{'X', 0, 0, 0, 4}) - }() -} - -// CleanupDone returns a channel that will be closed after all underlying resources have been cleaned up. A closed -// connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing -// yet. This is because certain errors such as a context cancellation require that the interrupted function call return -// immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are -// closed asynchronously. -// -// This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while -// an old connection is still being cleaned up and thereby exceeding the maximum pool size. -func (pgConn *PgConn) CleanupDone() chan (struct{}) { - return pgConn.cleanupDone -} - -// IsClosed reports if the connection has been closed. -// -// CleanupDone() can be used to determine if all cleanup has been completed. -func (pgConn *PgConn) IsClosed() bool { - return pgConn.status < connStatusIdle -} - -// IsBusy reports if the connection is busy. -func (pgConn *PgConn) IsBusy() bool { - return pgConn.status == connStatusBusy -} - -// lock locks the connection. -func (pgConn *PgConn) lock() error { - switch pgConn.status { - case connStatusBusy: - return &connLockError{status: "conn busy"} // This only should be possible in case of an application bug. - case connStatusClosed: - return &connLockError{status: "conn closed"} - case connStatusUninitialized: - return &connLockError{status: "conn uninitialized"} - } - pgConn.status = connStatusBusy - return nil -} - -func (pgConn *PgConn) unlock() { - switch pgConn.status { - case connStatusBusy: - pgConn.status = connStatusIdle - case connStatusClosed: - default: - panic("BUG: cannot unlock unlocked connection") // This should only be possible if there is a bug in this package. - } -} - -// ParameterStatus returns the value of a parameter reported by the server (e.g. -// server_version). Returns an empty string for unknown parameters. -func (pgConn *PgConn) ParameterStatus(key string) string { - return pgConn.parameterStatuses[key] -} - -// CommandTag is the result of an Exec function -type CommandTag []byte - -// RowsAffected returns the number of rows affected. If the CommandTag was not -// for a row affecting command (e.g. "CREATE TABLE") then it returns 0. -func (ct CommandTag) RowsAffected() int64 { - // Find last non-digit - idx := -1 - for i := len(ct) - 1; i >= 0; i-- { - if ct[i] >= '0' && ct[i] <= '9' { - idx = i - } else { - break - } - } - - if idx == -1 { - return 0 - } - - var n int64 - for _, b := range ct[idx:] { - n = n*10 + int64(b-'0') - } - - return n -} - -func (ct CommandTag) String() string { - return string(ct) -} - -// Insert is true if the command tag starts with "INSERT". -func (ct CommandTag) Insert() bool { - return len(ct) >= 6 && - ct[0] == 'I' && - ct[1] == 'N' && - ct[2] == 'S' && - ct[3] == 'E' && - ct[4] == 'R' && - ct[5] == 'T' -} - -// Update is true if the command tag starts with "UPDATE". -func (ct CommandTag) Update() bool { - return len(ct) >= 6 && - ct[0] == 'U' && - ct[1] == 'P' && - ct[2] == 'D' && - ct[3] == 'A' && - ct[4] == 'T' && - ct[5] == 'E' -} - -// Delete is true if the command tag starts with "DELETE". -func (ct CommandTag) Delete() bool { - return len(ct) >= 6 && - ct[0] == 'D' && - ct[1] == 'E' && - ct[2] == 'L' && - ct[3] == 'E' && - ct[4] == 'T' && - ct[5] == 'E' -} - -// Select is true if the command tag starts with "SELECT". -func (ct CommandTag) Select() bool { - return len(ct) >= 6 && - ct[0] == 'S' && - ct[1] == 'E' && - ct[2] == 'L' && - ct[3] == 'E' && - ct[4] == 'C' && - ct[5] == 'T' -} - -type StatementDescription struct { - Name string - SQL string - ParamOIDs []uint32 - Fields []pgproto3.FieldDescription -} - -// Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This -// allows Prepare to also to describe statements without creating a server-side prepared statement. -func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs []uint32) (*StatementDescription, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return nil, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - buf := pgConn.wbuf - buf = (&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}).Encode(buf) - buf = (&pgproto3.Describe{ObjectType: 'S', Name: name}).Encode(buf) - buf = (&pgproto3.Sync{}).Encode(buf) - - n, err := pgConn.conn.Write(buf) - if err != nil { - pgConn.asyncClose() - return nil, &writeError{err: err, safeToRetry: n == 0} - } - - psd := &StatementDescription{Name: name, SQL: sql} - - var parseErr error - -readloop: - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return nil, preferContextOverNetTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.ParameterDescription: - psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) - copy(psd.ParamOIDs, msg.ParameterOIDs) - case *pgproto3.RowDescription: - psd.Fields = make([]pgproto3.FieldDescription, len(msg.Fields)) - copy(psd.Fields, msg.Fields) - case *pgproto3.ErrorResponse: - parseErr = ErrorResponseToPgError(msg) - case *pgproto3.ReadyForQuery: - break readloop - } - } - - if parseErr != nil { - return nil, parseErr - } - return psd, nil -} - -// ErrorResponseToPgError converts a wire protocol error message to a *PgError. -func ErrorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError { - return &PgError{ - Severity: msg.Severity, - Code: string(msg.Code), - Message: string(msg.Message), - Detail: string(msg.Detail), - Hint: msg.Hint, - Position: msg.Position, - InternalPosition: msg.InternalPosition, - InternalQuery: string(msg.InternalQuery), - Where: string(msg.Where), - SchemaName: string(msg.SchemaName), - TableName: string(msg.TableName), - ColumnName: string(msg.ColumnName), - DataTypeName: string(msg.DataTypeName), - ConstraintName: msg.ConstraintName, - File: string(msg.File), - Line: msg.Line, - Routine: string(msg.Routine), - } -} - -func noticeResponseToNotice(msg *pgproto3.NoticeResponse) *Notice { - pgerr := ErrorResponseToPgError((*pgproto3.ErrorResponse)(msg)) - return (*Notice)(pgerr) -} - -// CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel -// request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there -// is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9 -func (pgConn *PgConn) CancelRequest(ctx context.Context) error { - // Open a cancellation request to the same server. The address is taken from the net.Conn directly instead of reusing - // the connection config. This is important in high availability configurations where fallback connections may be - // specified or DNS may be used to load balance. - serverAddr := pgConn.conn.RemoteAddr() - cancelConn, err := pgConn.config.DialFunc(ctx, serverAddr.Network(), serverAddr.String()) - if err != nil { - return err - } - defer cancelConn.Close() - - if ctx != context.Background() { - contextWatcher := ctxwatch.NewContextWatcher( - func() { cancelConn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) }, - func() { cancelConn.SetDeadline(time.Time{}) }, - ) - contextWatcher.Watch(ctx) - defer contextWatcher.Unwatch() - } - - buf := make([]byte, 16) - binary.BigEndian.PutUint32(buf[0:4], 16) - binary.BigEndian.PutUint32(buf[4:8], 80877102) - binary.BigEndian.PutUint32(buf[8:12], uint32(pgConn.pid)) - binary.BigEndian.PutUint32(buf[12:16], uint32(pgConn.secretKey)) - _, err = cancelConn.Write(buf) - if err != nil { - return err - } - - _, err = cancelConn.Read(buf) - if err != io.EOF { - return err - } - - return nil -} - -// WaitForNotification waits for a LISTON/NOTIFY message to be received. It returns an error if a notification was not -// received. -func (pgConn *PgConn) WaitForNotification(ctx context.Context) error { - if err := pgConn.lock(); err != nil { - return err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return newContextAlreadyDoneError(ctx) - default: - } - - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - for { - msg, err := pgConn.receiveMessage() - if err != nil { - return preferContextOverNetTimeoutError(ctx, err) - } - - switch msg.(type) { - case *pgproto3.NotificationResponse: - return nil - } - } -} - -// Exec executes SQL via the PostgreSQL simple query protocol. SQL may contain multiple queries. Execution is -// implicitly wrapped in a transaction unless a transaction is already in progress or SQL contains transaction control -// statements. -// -// Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries. -func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader { - if err := pgConn.lock(); err != nil { - return &MultiResultReader{ - closed: true, - err: err, - } - } - - pgConn.multiResultReader = MultiResultReader{ - pgConn: pgConn, - ctx: ctx, - } - multiResult := &pgConn.multiResultReader - if ctx != context.Background() { - select { - case <-ctx.Done(): - multiResult.closed = true - multiResult.err = newContextAlreadyDoneError(ctx) - pgConn.unlock() - return multiResult - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - buf := pgConn.wbuf - buf = (&pgproto3.Query{String: sql}).Encode(buf) - - n, err := pgConn.conn.Write(buf) - if err != nil { - pgConn.asyncClose() - pgConn.contextWatcher.Unwatch() - multiResult.closed = true - multiResult.err = &writeError{err: err, safeToRetry: n == 0} - pgConn.unlock() - return multiResult - } - - return multiResult -} - -// ReceiveResults reads the result that might be returned by Postgres after a SendBytes -// (e.a. after sending a CopyDone in a copy-both situation). -// -// This is a very low level method that requires deep understanding of the PostgreSQL wire protocol to use correctly. -// See https://www.postgresql.org/docs/current/protocol.html. -func (pgConn *PgConn) ReceiveResults(ctx context.Context) *MultiResultReader { - if err := pgConn.lock(); err != nil { - return &MultiResultReader{ - closed: true, - err: err, - } - } - - pgConn.multiResultReader = MultiResultReader{ - pgConn: pgConn, - ctx: ctx, - } - multiResult := &pgConn.multiResultReader - if ctx != context.Background() { - select { - case <-ctx.Done(): - multiResult.closed = true - multiResult.err = newContextAlreadyDoneError(ctx) - pgConn.unlock() - return multiResult - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - return multiResult -} - -// ExecParams executes a command via the PostgreSQL extended query protocol. -// -// sql is a SQL command string. It may only contain one query. Parameter substitution is positional using $1, $2, $3, -// etc. -// -// paramValues are the parameter values. It must be encoded in the format given by paramFormats. -// -// paramOIDs is a slice of data type OIDs for paramValues. If paramOIDs is nil, the server will infer the data type for -// all parameters. Any paramOID element that is 0 that will cause the server to infer the data type for that parameter. -// ExecParams will panic if len(paramOIDs) is not 0, 1, or len(paramValues). -// -// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or -// binary format. If paramFormats is nil all params are text format. ExecParams will panic if -// len(paramFormats) is not 0, 1, or len(paramValues). -// -// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or -// binary format. If resultFormats is nil all results will be in text format. -// -// ResultReader must be closed before PgConn can be used again. -func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader { - result := pgConn.execExtendedPrefix(ctx, paramValues) - if result.closed { - return result - } - - buf := pgConn.wbuf - buf = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(buf) - buf = (&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(buf) - - pgConn.execExtendedSuffix(buf, result) - - return result -} - -// ExecPrepared enqueues the execution of a prepared statement via the PostgreSQL extended query protocol. -// -// paramValues are the parameter values. It must be encoded in the format given by paramFormats. -// -// paramFormats is a slice of format codes determining for each paramValue column whether it is encoded in text or -// binary format. If paramFormats is nil all params are text format. ExecPrepared will panic if -// len(paramFormats) is not 0, 1, or len(paramValues). -// -// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or -// binary format. If resultFormats is nil all results will be in text format. -// -// ResultReader must be closed before PgConn can be used again. -func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader { - result := pgConn.execExtendedPrefix(ctx, paramValues) - if result.closed { - return result - } - - buf := pgConn.wbuf - buf = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(buf) - - pgConn.execExtendedSuffix(buf, result) - - return result -} - -func (pgConn *PgConn) execExtendedPrefix(ctx context.Context, paramValues [][]byte) *ResultReader { - pgConn.resultReader = ResultReader{ - pgConn: pgConn, - ctx: ctx, - } - result := &pgConn.resultReader - - if err := pgConn.lock(); err != nil { - result.concludeCommand(nil, err) - result.closed = true - return result - } - - if len(paramValues) > math.MaxUint16 { - result.concludeCommand(nil, fmt.Errorf("extended protocol limited to %v parameters", math.MaxUint16)) - result.closed = true - pgConn.unlock() - return result - } - - if ctx != context.Background() { - select { - case <-ctx.Done(): - result.concludeCommand(nil, newContextAlreadyDoneError(ctx)) - result.closed = true - pgConn.unlock() - return result - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - return result -} - -func (pgConn *PgConn) execExtendedSuffix(buf []byte, result *ResultReader) { - buf = (&pgproto3.Describe{ObjectType: 'P'}).Encode(buf) - buf = (&pgproto3.Execute{}).Encode(buf) - buf = (&pgproto3.Sync{}).Encode(buf) - - n, err := pgConn.conn.Write(buf) - if err != nil { - pgConn.asyncClose() - result.concludeCommand(nil, &writeError{err: err, safeToRetry: n == 0}) - pgConn.contextWatcher.Unwatch() - result.closed = true - pgConn.unlock() - return - } - - result.readUntilRowDescription() -} - -// CopyTo executes the copy command sql and copies the results to w. -func (pgConn *PgConn) CopyTo(ctx context.Context, w io.Writer, sql string) (CommandTag, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - - if ctx != context.Background() { - select { - case <-ctx.Done(): - pgConn.unlock() - return nil, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - // Send copy to command - buf := pgConn.wbuf - buf = (&pgproto3.Query{String: sql}).Encode(buf) - - n, err := pgConn.conn.Write(buf) - if err != nil { - pgConn.asyncClose() - pgConn.unlock() - return nil, &writeError{err: err, safeToRetry: n == 0} - } - - // Read results - var commandTag CommandTag - var pgErr error - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return nil, preferContextOverNetTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.CopyDone: - case *pgproto3.CopyData: - _, err := w.Write(msg.Data) - if err != nil { - pgConn.asyncClose() - return nil, err - } - case *pgproto3.ReadyForQuery: - pgConn.unlock() - return commandTag, pgErr - case *pgproto3.CommandComplete: - commandTag = CommandTag(msg.CommandTag) - case *pgproto3.ErrorResponse: - pgErr = ErrorResponseToPgError(msg) - } - } -} - -// CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server. -// -// Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r -// could still block. -func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - defer pgConn.unlock() - - if ctx != context.Background() { - select { - case <-ctx.Done(): - return nil, newContextAlreadyDoneError(ctx) - default: - } - pgConn.contextWatcher.Watch(ctx) - defer pgConn.contextWatcher.Unwatch() - } - - // Send copy to command - buf := pgConn.wbuf - buf = (&pgproto3.Query{String: sql}).Encode(buf) - - n, err := pgConn.conn.Write(buf) - if err != nil { - pgConn.asyncClose() - return nil, &writeError{err: err, safeToRetry: n == 0} - } - - // Send copy data - abortCopyChan := make(chan struct{}) - copyErrChan := make(chan error, 1) - signalMessageChan := pgConn.signalMessage() - - go func() { - buf := make([]byte, 0, 65536) - buf = append(buf, 'd') - sp := len(buf) - - for { - n, readErr := r.Read(buf[5:cap(buf)]) - if n > 0 { - buf = buf[0 : n+5] - pgio.SetInt32(buf[sp:], int32(n+4)) - - _, writeErr := pgConn.conn.Write(buf) - if writeErr != nil { - // Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. - pgConn.conn.Close() - - copyErrChan <- writeErr - return - } - } - if readErr != nil { - copyErrChan <- readErr - return - } - - select { - case <-abortCopyChan: - return - default: - } - } - }() - - var pgErr error - var copyErr error - for copyErr == nil && pgErr == nil { - select { - case copyErr = <-copyErrChan: - case <-signalMessageChan: - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return nil, preferContextOverNetTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.ErrorResponse: - pgErr = ErrorResponseToPgError(msg) - default: - signalMessageChan = pgConn.signalMessage() - } - } - } - close(abortCopyChan) - - buf = buf[:0] - if copyErr == io.EOF || pgErr != nil { - copyDone := &pgproto3.CopyDone{} - buf = copyDone.Encode(buf) - } else { - copyFail := &pgproto3.CopyFail{Message: copyErr.Error()} - buf = copyFail.Encode(buf) - } - _, err = pgConn.conn.Write(buf) - if err != nil { - pgConn.asyncClose() - return nil, err - } - - // Read results - var commandTag CommandTag - for { - msg, err := pgConn.receiveMessage() - if err != nil { - pgConn.asyncClose() - return nil, preferContextOverNetTimeoutError(ctx, err) - } - - switch msg := msg.(type) { - case *pgproto3.ReadyForQuery: - return commandTag, pgErr - case *pgproto3.CommandComplete: - commandTag = CommandTag(msg.CommandTag) - case *pgproto3.ErrorResponse: - pgErr = ErrorResponseToPgError(msg) - } - } -} - -// MultiResultReader is a reader for a command that could return multiple results such as Exec or ExecBatch. -type MultiResultReader struct { - pgConn *PgConn - ctx context.Context - - rr *ResultReader - - closed bool - err error -} - -// ReadAll reads all available results. Calling ReadAll is mutually exclusive with all other MultiResultReader methods. -func (mrr *MultiResultReader) ReadAll() ([]*Result, error) { - var results []*Result - - for mrr.NextResult() { - results = append(results, mrr.ResultReader().Read()) - } - err := mrr.Close() - - return results, err -} - -func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) { - msg, err := mrr.pgConn.receiveMessage() - - if err != nil { - mrr.pgConn.contextWatcher.Unwatch() - mrr.err = preferContextOverNetTimeoutError(mrr.ctx, err) - mrr.closed = true - mrr.pgConn.asyncClose() - return nil, mrr.err - } - - switch msg := msg.(type) { - case *pgproto3.ReadyForQuery: - mrr.pgConn.contextWatcher.Unwatch() - mrr.closed = true - mrr.pgConn.unlock() - case *pgproto3.ErrorResponse: - mrr.err = ErrorResponseToPgError(msg) - } - - return msg, nil -} - -// NextResult returns advances the MultiResultReader to the next result and returns true if a result is available. -func (mrr *MultiResultReader) NextResult() bool { - for !mrr.closed && mrr.err == nil { - msg, err := mrr.receiveMessage() - if err != nil { - return false - } - - switch msg := msg.(type) { - case *pgproto3.RowDescription: - mrr.pgConn.resultReader = ResultReader{ - pgConn: mrr.pgConn, - multiResultReader: mrr, - ctx: mrr.ctx, - fieldDescriptions: msg.Fields, - } - mrr.rr = &mrr.pgConn.resultReader - return true - case *pgproto3.CommandComplete: - mrr.pgConn.resultReader = ResultReader{ - commandTag: CommandTag(msg.CommandTag), - commandConcluded: true, - closed: true, - } - mrr.rr = &mrr.pgConn.resultReader - return true - case *pgproto3.EmptyQueryResponse: - return false - } - } - - return false -} - -// ResultReader returns the current ResultReader. -func (mrr *MultiResultReader) ResultReader() *ResultReader { - return mrr.rr -} - -// Close closes the MultiResultReader and returns the first error that occurred during the MultiResultReader's use. -func (mrr *MultiResultReader) Close() error { - for !mrr.closed { - _, err := mrr.receiveMessage() - if err != nil { - return mrr.err - } - } - - return mrr.err -} - -// ResultReader is a reader for the result of a single query. -type ResultReader struct { - pgConn *PgConn - multiResultReader *MultiResultReader - ctx context.Context - - fieldDescriptions []pgproto3.FieldDescription - rowValues [][]byte - commandTag CommandTag - commandConcluded bool - closed bool - err error -} - -// Result is the saved query response that is returned by calling Read on a ResultReader. -type Result struct { - FieldDescriptions []pgproto3.FieldDescription - Rows [][][]byte - CommandTag CommandTag - Err error -} - -// Read saves the query response to a Result. -func (rr *ResultReader) Read() *Result { - br := &Result{} - - for rr.NextRow() { - if br.FieldDescriptions == nil { - br.FieldDescriptions = make([]pgproto3.FieldDescription, len(rr.FieldDescriptions())) - copy(br.FieldDescriptions, rr.FieldDescriptions()) - } - - row := make([][]byte, len(rr.Values())) - copy(row, rr.Values()) - br.Rows = append(br.Rows, row) - } - - br.CommandTag, br.Err = rr.Close() - - return br -} - -// NextRow advances the ResultReader to the next row and returns true if a row is available. -func (rr *ResultReader) NextRow() bool { - for !rr.commandConcluded { - msg, err := rr.receiveMessage() - if err != nil { - return false - } - - switch msg := msg.(type) { - case *pgproto3.DataRow: - rr.rowValues = msg.Values - return true - } - } - - return false -} - -// FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until -// the ResultReader is closed. -func (rr *ResultReader) FieldDescriptions() []pgproto3.FieldDescription { - return rr.fieldDescriptions -} - -// Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only -// valid until the next NextRow call or the ResultReader is closed. However, the underlying byte data is safe to -// retain a reference to and mutate. -func (rr *ResultReader) Values() [][]byte { - return rr.rowValues -} - -// Close consumes any remaining result data and returns the command tag or -// error. -func (rr *ResultReader) Close() (CommandTag, error) { - if rr.closed { - return rr.commandTag, rr.err - } - rr.closed = true - - for !rr.commandConcluded { - _, err := rr.receiveMessage() - if err != nil { - return nil, rr.err - } - } - - if rr.multiResultReader == nil { - for { - msg, err := rr.receiveMessage() - if err != nil { - return nil, rr.err - } - - switch msg := msg.(type) { - // Detect a deferred constraint violation where the ErrorResponse is sent after CommandComplete. - case *pgproto3.ErrorResponse: - rr.err = ErrorResponseToPgError(msg) - case *pgproto3.ReadyForQuery: - rr.pgConn.contextWatcher.Unwatch() - rr.pgConn.unlock() - return rr.commandTag, rr.err - } - } - } - - return rr.commandTag, rr.err -} - -// readUntilRowDescription ensures the ResultReader's fieldDescriptions are loaded. It does not return an error as any -// error will be stored in the ResultReader. -func (rr *ResultReader) readUntilRowDescription() { - for !rr.commandConcluded { - // Peek before receive to avoid consuming a DataRow if the result set does not include a RowDescription method. - // This should never happen under normal pgconn usage, but it is possible if SendBytes and ReceiveResults are - // manually used to construct a query that does not issue a describe statement. - msg, _ := rr.pgConn.peekMessage() - if _, ok := msg.(*pgproto3.DataRow); ok { - return - } - - // Consume the message - msg, _ = rr.receiveMessage() - if _, ok := msg.(*pgproto3.RowDescription); ok { - return - } - } -} - -func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error) { - if rr.multiResultReader == nil { - msg, err = rr.pgConn.receiveMessage() - } else { - msg, err = rr.multiResultReader.receiveMessage() - } - - if err != nil { - err = preferContextOverNetTimeoutError(rr.ctx, err) - rr.concludeCommand(nil, err) - rr.pgConn.contextWatcher.Unwatch() - rr.closed = true - if rr.multiResultReader == nil { - rr.pgConn.asyncClose() - } - - return nil, rr.err - } - - switch msg := msg.(type) { - case *pgproto3.RowDescription: - rr.fieldDescriptions = msg.Fields - case *pgproto3.CommandComplete: - rr.concludeCommand(CommandTag(msg.CommandTag), nil) - case *pgproto3.EmptyQueryResponse: - rr.concludeCommand(nil, nil) - case *pgproto3.ErrorResponse: - rr.concludeCommand(nil, ErrorResponseToPgError(msg)) - } - - return msg, nil -} - -func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) { - // Keep the first error that is recorded. Store the error before checking if the command is already concluded to - // allow for receiving an error after CommandComplete but before ReadyForQuery. - if err != nil && rr.err == nil { - rr.err = err - } - - if rr.commandConcluded { - return - } - - rr.commandTag = commandTag - rr.rowValues = nil - rr.commandConcluded = true -} - -// Batch is a collection of queries that can be sent to the PostgreSQL server in a single round-trip. -type Batch struct { - buf []byte -} - -// ExecParams appends an ExecParams command to the batch. See PgConn.ExecParams for parameter descriptions. -func (batch *Batch) ExecParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { - batch.buf = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(batch.buf) - batch.ExecPrepared("", paramValues, paramFormats, resultFormats) -} - -// ExecPrepared appends an ExecPrepared e command to the batch. See PgConn.ExecPrepared for parameter descriptions. -func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { - batch.buf = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(batch.buf) - batch.buf = (&pgproto3.Describe{ObjectType: 'P'}).Encode(batch.buf) - batch.buf = (&pgproto3.Execute{}).Encode(batch.buf) -} - -// ExecBatch executes all the queries in batch in a single round-trip. Execution is implicitly transactional unless a -// transaction is already in progress or SQL contains transaction control statements. -func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader { - if err := pgConn.lock(); err != nil { - return &MultiResultReader{ - closed: true, - err: err, - } - } - - pgConn.multiResultReader = MultiResultReader{ - pgConn: pgConn, - ctx: ctx, - } - multiResult := &pgConn.multiResultReader - - if ctx != context.Background() { - select { - case <-ctx.Done(): - multiResult.closed = true - multiResult.err = newContextAlreadyDoneError(ctx) - pgConn.unlock() - return multiResult - default: - } - pgConn.contextWatcher.Watch(ctx) - } - - batch.buf = (&pgproto3.Sync{}).Encode(batch.buf) - - // A large batch can deadlock without concurrent reading and writing. If the Write fails the underlying net.Conn is - // closed. This is all that can be done without introducing a race condition or adding a concurrent safe communication - // channel to relay the error back. The practical effect of this is that the underlying Write error is not reported. - // The error the code reading the batch results receives will be a closed connection error. - // - // See https://github.com/jackc/pgx/issues/374. - go func() { - _, err := pgConn.conn.Write(batch.buf) - if err != nil { - pgConn.conn.Close() - } - }() - - return multiResult -} - -// EscapeString escapes a string such that it can safely be interpolated into a SQL command string. It does not include -// the surrounding single quotes. -// -// The current implementation requires that standard_conforming_strings=on and client_encoding="UTF8". If these -// conditions are not met an error will be returned. It is possible these restrictions will be lifted in the future. -func (pgConn *PgConn) EscapeString(s string) (string, error) { - if pgConn.ParameterStatus("standard_conforming_strings") != "on" { - return "", errors.New("EscapeString must be run with standard_conforming_strings=on") - } - - if pgConn.ParameterStatus("client_encoding") != "UTF8" { - return "", errors.New("EscapeString must be run with client_encoding=UTF8") - } - - return strings.Replace(s, "'", "''", -1), nil -} - -// HijackedConn is the result of hijacking a connection. -// -// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning -// compatibility. -type HijackedConn struct { - Conn net.Conn // the underlying TCP or unix domain socket connection - PID uint32 // backend pid - SecretKey uint32 // key to use to send a cancel query message to the server - ParameterStatuses map[string]string // parameters that have been reported by the server - TxStatus byte - Frontend Frontend - Config *Config -} - -// Hijack extracts the internal connection data. pgConn must be in an idle state. pgConn is unusable after hijacking. -// Hijacking is typically only useful when using pgconn to establish a connection, but taking complete control of the -// raw connection after that (e.g. a load balancer or proxy). -// -// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning -// compatibility. -func (pgConn *PgConn) Hijack() (*HijackedConn, error) { - if err := pgConn.lock(); err != nil { - return nil, err - } - pgConn.status = connStatusClosed - - return &HijackedConn{ - Conn: pgConn.conn, - PID: pgConn.pid, - SecretKey: pgConn.secretKey, - ParameterStatuses: pgConn.parameterStatuses, - TxStatus: pgConn.txStatus, - Frontend: pgConn.frontend, - Config: pgConn.config, - }, nil -} - -// Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of -// PgConn.Hijack. The connection must be in an idle state. -// -// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning -// compatibility. -func Construct(hc *HijackedConn) (*PgConn, error) { - pgConn := &PgConn{ - conn: hc.Conn, - pid: hc.PID, - secretKey: hc.SecretKey, - parameterStatuses: hc.ParameterStatuses, - txStatus: hc.TxStatus, - frontend: hc.Frontend, - config: hc.Config, - - status: connStatusIdle, - - wbuf: make([]byte, 0, wbufLen), - cleanupDone: make(chan struct{}), - } - - pgConn.contextWatcher = newContextWatcher(pgConn.conn) - - return pgConn, nil -} diff --git a/vendor/github.com/jackc/pgconn/stmtcache/lru.go b/vendor/github.com/jackc/pgconn/stmtcache/lru.go deleted file mode 100644 index 90fb76c2f..000000000 --- a/vendor/github.com/jackc/pgconn/stmtcache/lru.go +++ /dev/null @@ -1,165 +0,0 @@ -package stmtcache - -import ( - "container/list" - "context" - "fmt" - "sync/atomic" - - "github.com/jackc/pgconn" -) - -var lruCount uint64 - -// LRU implements Cache with a Least Recently Used (LRU) cache. -type LRU struct { - conn *pgconn.PgConn - mode int - cap int - prepareCount int - m map[string]*list.Element - l *list.List - psNamePrefix string - stmtsToClear []string -} - -// NewLRU creates a new LRU. mode is either ModePrepare or ModeDescribe. cap is the maximum size of the cache. -func NewLRU(conn *pgconn.PgConn, mode int, cap int) *LRU { - mustBeValidMode(mode) - mustBeValidCap(cap) - - n := atomic.AddUint64(&lruCount, 1) - - return &LRU{ - conn: conn, - mode: mode, - cap: cap, - m: make(map[string]*list.Element), - l: list.New(), - psNamePrefix: fmt.Sprintf("lrupsc_%d", n), - } -} - -// Get returns the prepared statement description for sql preparing or describing the sql on the server as needed. -func (c *LRU) Get(ctx context.Context, sql string) (*pgconn.StatementDescription, error) { - if ctx != context.Background() { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - } - - // flush an outstanding bad statements - txStatus := c.conn.TxStatus() - if (txStatus == 'I' || txStatus == 'T') && len(c.stmtsToClear) > 0 { - for _, stmt := range c.stmtsToClear { - err := c.clearStmt(ctx, stmt) - if err != nil { - return nil, err - } - } - } - - if el, ok := c.m[sql]; ok { - c.l.MoveToFront(el) - return el.Value.(*pgconn.StatementDescription), nil - } - - if c.l.Len() == c.cap { - err := c.removeOldest(ctx) - if err != nil { - return nil, err - } - } - - psd, err := c.prepare(ctx, sql) - if err != nil { - return nil, err - } - - el := c.l.PushFront(psd) - c.m[sql] = el - - return psd, nil -} - -// Clear removes all entries in the cache. Any prepared statements will be deallocated from the PostgreSQL session. -func (c *LRU) Clear(ctx context.Context) error { - for c.l.Len() > 0 { - err := c.removeOldest(ctx) - if err != nil { - return err - } - } - - return nil -} - -func (c *LRU) StatementErrored(sql string, err error) { - pgErr, ok := err.(*pgconn.PgError) - if !ok { - return - } - - isInvalidCachedPlanError := pgErr.Severity == "ERROR" && - pgErr.Code == "0A000" && - pgErr.Message == "cached plan must not change result type" - if isInvalidCachedPlanError { - c.stmtsToClear = append(c.stmtsToClear, sql) - } -} - -func (c *LRU) clearStmt(ctx context.Context, sql string) error { - elem, inMap := c.m[sql] - if !inMap { - // The statement probably fell off the back of the list. In that case, we've - // ensured that it isn't in the cache, so we can declare victory. - return nil - } - - c.l.Remove(elem) - - psd := elem.Value.(*pgconn.StatementDescription) - delete(c.m, psd.SQL) - if c.mode == ModePrepare { - return c.conn.Exec(ctx, fmt.Sprintf("deallocate %s", psd.Name)).Close() - } - return nil -} - -// Len returns the number of cached prepared statement descriptions. -func (c *LRU) Len() int { - return c.l.Len() -} - -// Cap returns the maximum number of cached prepared statement descriptions. -func (c *LRU) Cap() int { - return c.cap -} - -// Mode returns the mode of the cache (ModePrepare or ModeDescribe) -func (c *LRU) Mode() int { - return c.mode -} - -func (c *LRU) prepare(ctx context.Context, sql string) (*pgconn.StatementDescription, error) { - var name string - if c.mode == ModePrepare { - name = fmt.Sprintf("%s_%d", c.psNamePrefix, c.prepareCount) - c.prepareCount += 1 - } - - return c.conn.Prepare(ctx, name, sql, nil) -} - -func (c *LRU) removeOldest(ctx context.Context) error { - oldest := c.l.Back() - c.l.Remove(oldest) - psd := oldest.Value.(*pgconn.StatementDescription) - delete(c.m, psd.SQL) - if c.mode == ModePrepare { - return c.conn.Exec(ctx, fmt.Sprintf("deallocate %s", psd.Name)).Close() - } - return nil -} diff --git a/vendor/github.com/jackc/pgconn/stmtcache/stmtcache.go b/vendor/github.com/jackc/pgconn/stmtcache/stmtcache.go deleted file mode 100644 index d083e1b4f..000000000 --- a/vendor/github.com/jackc/pgconn/stmtcache/stmtcache.go +++ /dev/null @@ -1,58 +0,0 @@ -// Package stmtcache is a cache that can be used to implement lazy prepared statements. -package stmtcache - -import ( - "context" - - "github.com/jackc/pgconn" -) - -const ( - ModePrepare = iota // Cache should prepare named statements. - ModeDescribe // Cache should prepare the anonymous prepared statement to only fetch the description of the statement. -) - -// Cache prepares and caches prepared statement descriptions. -type Cache interface { - // Get returns the prepared statement description for sql preparing or describing the sql on the server as needed. - Get(ctx context.Context, sql string) (*pgconn.StatementDescription, error) - - // Clear removes all entries in the cache. Any prepared statements will be deallocated from the PostgreSQL session. - Clear(ctx context.Context) error - - // StatementErrored informs the cache that the given statement resulted in an error when it - // was last used against the database. In some cases, this will cause the cache to maer that - // statement as bad. The bad statement will instead be flushed during the next call to Get - // that occurs outside of a failed transaction. - StatementErrored(sql string, err error) - - // Len returns the number of cached prepared statement descriptions. - Len() int - - // Cap returns the maximum number of cached prepared statement descriptions. - Cap() int - - // Mode returns the mode of the cache (ModePrepare or ModeDescribe) - Mode() int -} - -// New returns the preferred cache implementation for mode and cap. mode is either ModePrepare or ModeDescribe. cap is -// the maximum size of the cache. -func New(conn *pgconn.PgConn, mode int, cap int) Cache { - mustBeValidMode(mode) - mustBeValidCap(cap) - - return NewLRU(conn, mode, cap) -} - -func mustBeValidMode(mode int) { - if mode != ModePrepare && mode != ModeDescribe { - panic("mode must be ModePrepare or ModeDescribe") - } -} - -func mustBeValidCap(cap int) { - if cap < 1 { - panic("cache must have cap of >= 1") - } -} |