summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/pgconn
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-07-21 14:22:20 +0000
committerLibravatar GitHub <noreply@github.com>2023-07-21 14:22:20 +0000
commitfa57c699fe9d2db81d7806d6e30c0e388fa01673 (patch)
treedd0f2d52be007f8207d3ff4b851d13e46ec26625 /vendor/github.com/jackc/pgx/v5/pgconn
parent[chore]: Bump github.com/minio/minio-go/v7 from 7.0.59 to 7.0.60 (#1992) (diff)
downloadgotosocial-fa57c699fe9d2db81d7806d6e30c0e388fa01673.tar.xz
[chore]: Bump github.com/jackc/pgx/v5 from 5.4.1 to 5.4.2 (#1991)
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/pgconn')
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go47
-rw-r--r--vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go63
2 files changed, 75 insertions, 35 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go b/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go
index aa1a3d39c..e65c2c2bf 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader.go
@@ -9,18 +9,18 @@ import (
)
const (
- bgReaderStatusStopped = iota
- bgReaderStatusRunning
- bgReaderStatusStopping
+ StatusStopped = iota
+ StatusRunning
+ StatusStopping
)
// BGReader is an io.Reader that can optionally buffer reads in the background. It is safe for concurrent use.
type BGReader struct {
r io.Reader
- cond *sync.Cond
- bgReaderStatus int32
- readResults []readResult
+ cond *sync.Cond
+ status int32
+ readResults []readResult
}
type readResult struct {
@@ -34,14 +34,14 @@ func (r *BGReader) Start() {
r.cond.L.Lock()
defer r.cond.L.Unlock()
- switch r.bgReaderStatus {
- case bgReaderStatusStopped:
- r.bgReaderStatus = bgReaderStatusRunning
+ switch r.status {
+ case StatusStopped:
+ r.status = StatusRunning
go r.bgRead()
- case bgReaderStatusRunning:
+ case StatusRunning:
// no-op
- case bgReaderStatusStopping:
- r.bgReaderStatus = bgReaderStatusRunning
+ case StatusStopping:
+ r.status = StatusRunning
}
}
@@ -51,16 +51,23 @@ func (r *BGReader) Stop() {
r.cond.L.Lock()
defer r.cond.L.Unlock()
- switch r.bgReaderStatus {
- case bgReaderStatusStopped:
+ switch r.status {
+ case StatusStopped:
// no-op
- case bgReaderStatusRunning:
- r.bgReaderStatus = bgReaderStatusStopping
- case bgReaderStatusStopping:
+ case StatusRunning:
+ r.status = StatusStopping
+ case StatusStopping:
// no-op
}
}
+// Status returns the current status of the background reader.
+func (r *BGReader) Status() int32 {
+ r.cond.L.Lock()
+ defer r.cond.L.Unlock()
+ return r.status
+}
+
func (r *BGReader) bgRead() {
keepReading := true
for keepReading {
@@ -70,8 +77,8 @@ func (r *BGReader) bgRead() {
r.cond.L.Lock()
r.readResults = append(r.readResults, readResult{buf: buf, err: err})
- if r.bgReaderStatus == bgReaderStatusStopping || err != nil {
- r.bgReaderStatus = bgReaderStatusStopped
+ if r.status == StatusStopping || err != nil {
+ r.status = StatusStopped
keepReading = false
}
r.cond.L.Unlock()
@@ -89,7 +96,7 @@ func (r *BGReader) Read(p []byte) (int, error) {
}
// There are no unread background read results and the background reader is stopped.
- if r.bgReaderStatus == bgReaderStatusStopped {
+ if r.status == StatusStopped {
return r.r.Read(p)
}
diff --git a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
index 9f84605fe..12357751a 100644
--- a/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
+++ b/vendor/github.com/jackc/pgx/v5/pgconn/pgconn.go
@@ -174,7 +174,7 @@ func ConnectConfig(octx context.Context, config *Config) (pgConn *PgConn, err er
const ERRCODE_INVALID_CATALOG_NAME = "3D000" // db does not exist
const ERRCODE_INSUFFICIENT_PRIVILEGE = "42501" // missing connect privilege
if pgerr.Code == ERRCODE_INVALID_PASSWORD ||
- pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION ||
+ pgerr.Code == ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION && fc.TLSConfig != nil ||
pgerr.Code == ERRCODE_INVALID_CATALOG_NAME ||
pgerr.Code == ERRCODE_INSUFFICIENT_PRIVILEGE {
break
@@ -263,7 +263,8 @@ func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*Fallba
}
func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig,
- ignoreNotPreferredErr bool) (*PgConn, error) {
+ ignoreNotPreferredErr bool,
+) (*PgConn, error) {
pgConn := new(PgConn)
pgConn.config = config
pgConn.cleanupDone = make(chan struct{})
@@ -298,6 +299,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
pgConn.status = connStatusConnecting
pgConn.bgReader = bgreader.New(pgConn.conn)
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
+ pgConn.slowWriteTimer.Stop()
pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn)
startupMsg := pgproto3.StartupMessage{
@@ -476,7 +478,8 @@ func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessa
err = &pgconnError{
msg: "receive message failed",
err: normalizeTimeoutError(ctx, err),
- safeToRetry: true}
+ safeToRetry: true,
+ }
}
return msg, err
}
@@ -553,7 +556,8 @@ func (pgConn *PgConn) receiveMessage() (pgproto3.BackendMessage, error) {
return msg, nil
}
-// Conn returns the underlying net.Conn. This rarely necessary.
+// Conn returns the underlying net.Conn. This rarely necessary. If the connection will be directly used for reading or
+// writing then SyncConn should usually be called before Conn.
func (pgConn *PgConn) Conn() net.Conn {
return pgConn.conn
}
@@ -1336,7 +1340,6 @@ func (mrr *MultiResultReader) ReadAll() ([]*Result, error) {
func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) {
msg, err := mrr.pgConn.receiveMessage()
-
if err != nil {
mrr.pgConn.contextWatcher.Unwatch()
mrr.err = normalizeTimeoutError(mrr.ctx, err)
@@ -1647,8 +1650,8 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR
batch.buf = (&pgproto3.Sync{}).Encode(batch.buf)
pgConn.enterPotentialWriteReadDeadlock()
+ defer pgConn.exitPotentialWriteReadDeadlock()
_, err := pgConn.conn.Write(batch.buf)
- pgConn.exitPotentialWriteReadDeadlock()
if err != nil {
multiResult.closed = true
multiResult.err = err
@@ -1719,23 +1722,50 @@ func (pgConn *PgConn) enterPotentialWriteReadDeadlock() {
//
// In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is
// ineffective.
- pgConn.slowWriteTimer.Reset(15 * time.Millisecond)
+ if pgConn.slowWriteTimer.Reset(15 * time.Millisecond) {
+ panic("BUG: slow write timer already active")
+ }
}
// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
func (pgConn *PgConn) exitPotentialWriteReadDeadlock() {
- if !pgConn.slowWriteTimer.Reset(time.Duration(math.MaxInt64)) {
- pgConn.slowWriteTimer.Stop()
- }
+ // The state of the timer is not relevant upon exiting the potential slow write. It may both
+ // fire (due to a slow write), or not fire (due to a fast write).
+ _ = pgConn.slowWriteTimer.Stop()
+ pgConn.bgReader.Stop()
}
func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
pgConn.enterPotentialWriteReadDeadlock()
+ defer pgConn.exitPotentialWriteReadDeadlock()
err := pgConn.frontend.Flush()
- pgConn.exitPotentialWriteReadDeadlock()
return err
}
+// SyncConn prepares the underlying net.Conn for direct use. PgConn may internally buffer reads or use goroutines for
+// background IO. This means that any direct use of the underlying net.Conn may be corrupted if a read is already
+// buffered or a read is in progress. SyncConn drains read buffers and stops background IO. In some cases this may
+// require sending a ping to the server. ctx can be used to cancel this operation. This should be called before any
+// operation that will use the underlying net.Conn directly. e.g. Before Conn() or Hijack().
+//
+// This should not be confused with the PostgreSQL protocol Sync message.
+func (pgConn *PgConn) SyncConn(ctx context.Context) error {
+ for i := 0; i < 10; i++ {
+ if pgConn.bgReader.Status() == bgreader.StatusStopped && pgConn.frontend.ReadBufferLen() == 0 {
+ return nil
+ }
+
+ err := pgConn.Ping(ctx)
+ if err != nil {
+ return fmt.Errorf("SyncConn: Ping failed while syncing conn: %w", err)
+ }
+ }
+
+ // This should never happen. Only way I can imagine this occuring is if the server is constantly sending data such as
+ // LISTEN/NOTIFY or log notifications such that we never can get an empty buffer.
+ return errors.New("SyncConn: conn never synchronized")
+}
+
// HijackedConn is the result of hijacking a connection.
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
@@ -1750,9 +1780,9 @@ type HijackedConn struct {
Config *Config
}
-// Hijack extracts the internal connection data. pgConn must be in an idle state. pgConn is unusable after hijacking.
-// Hijacking is typically only useful when using pgconn to establish a connection, but taking complete control of the
-// raw connection after that (e.g. a load balancer or proxy).
+// Hijack extracts the internal connection data. pgConn must be in an idle state. SyncConn should be called immediately
+// before Hijack. pgConn is unusable after hijacking. Hijacking is typically only useful when using pgconn to establish
+// a connection, but taking complete control of the raw connection after that (e.g. a load balancer or proxy).
//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
// compatibility.
@@ -1776,6 +1806,8 @@ func (pgConn *PgConn) Hijack() (*HijackedConn, error) {
// Construct created a PgConn from an already established connection to a PostgreSQL server. This is the inverse of
// PgConn.Hijack. The connection must be in an idle state.
//
+// hc.Frontend is replaced by a new pgproto3.Frontend built by hc.Config.BuildFrontend.
+//
// Due to the necessary exposure of internal implementation details, it is not covered by the semantic versioning
// compatibility.
func Construct(hc *HijackedConn) (*PgConn, error) {
@@ -1796,6 +1828,8 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
pgConn.contextWatcher = newContextWatcher(pgConn.conn)
pgConn.bgReader = bgreader.New(pgConn.conn)
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
+ pgConn.slowWriteTimer.Stop()
+ pgConn.frontend = hc.Config.BuildFrontend(pgConn.bgReader, pgConn.conn)
return pgConn, nil
}
@@ -1997,7 +2031,6 @@ func (p *Pipeline) GetResults() (results any, err error) {
}
}
-
}
func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) {