summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/internal
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/internal')
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/anynil/anynil.go36
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/iobufpool/iobufpool.go70
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go70
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go520
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go11
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go81
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/pgio/README.md6
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/pgio/doc.go6
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/pgio/write.go40
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go322
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/stmtcache/lru_cache.go98
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/stmtcache/stmtcache.go57
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/stmtcache/unlimited_cache.go71
13 files changed, 1388 insertions, 0 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/internal/anynil/anynil.go b/vendor/github.com/jackc/pgx/v5/internal/anynil/anynil.go
new file mode 100644
index 000000000..9a48c1a84
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/anynil/anynil.go
@@ -0,0 +1,36 @@
+package anynil
+
+import "reflect"
+
+// Is returns true if value is any type of nil. e.g. nil or []byte(nil).
+func Is(value any) bool {
+ if value == nil {
+ return true
+ }
+
+ refVal := reflect.ValueOf(value)
+ switch refVal.Kind() {
+ case reflect.Chan, reflect.Func, reflect.Map, reflect.Ptr, reflect.UnsafePointer, reflect.Interface, reflect.Slice:
+ return refVal.IsNil()
+ default:
+ return false
+ }
+}
+
+// Normalize converts typed nils (e.g. []byte(nil)) into untyped nil. Other values are returned unmodified.
+func Normalize(v any) any {
+ if Is(v) {
+ return nil
+ }
+ return v
+}
+
+// NormalizeSlice converts all typed nils (e.g. []byte(nil)) in s into untyped nils. Other values are unmodified. s is
+// mutated in place.
+func NormalizeSlice(s []any) {
+ for i := range s {
+ if Is(s[i]) {
+ s[i] = nil
+ }
+ }
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/iobufpool/iobufpool.go b/vendor/github.com/jackc/pgx/v5/internal/iobufpool/iobufpool.go
new file mode 100644
index 000000000..89e0c2273
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/iobufpool/iobufpool.go
@@ -0,0 +1,70 @@
+// Package iobufpool implements a global segregated-fit pool of buffers for IO.
+//
+// It uses *[]byte instead of []byte to avoid the sync.Pool allocation with Put. Unfortunately, using a pointer to avoid
+// an allocation is purposely not documented. https://github.com/golang/go/issues/16323
+package iobufpool
+
+import "sync"
+
+const minPoolExpOf2 = 8
+
+var pools [18]*sync.Pool
+
+func init() {
+ for i := range pools {
+ bufLen := 1 << (minPoolExpOf2 + i)
+ pools[i] = &sync.Pool{
+ New: func() any {
+ buf := make([]byte, bufLen)
+ return &buf
+ },
+ }
+ }
+}
+
+// Get gets a []byte of len size with cap <= size*2.
+func Get(size int) *[]byte {
+ i := getPoolIdx(size)
+ if i >= len(pools) {
+ buf := make([]byte, size)
+ return &buf
+ }
+
+ ptrBuf := (pools[i].Get().(*[]byte))
+ *ptrBuf = (*ptrBuf)[:size]
+
+ return ptrBuf
+}
+
+func getPoolIdx(size int) int {
+ size--
+ size >>= minPoolExpOf2
+ i := 0
+ for size > 0 {
+ size >>= 1
+ i++
+ }
+
+ return i
+}
+
+// Put returns buf to the pool.
+func Put(buf *[]byte) {
+ i := putPoolIdx(cap(*buf))
+ if i < 0 {
+ return
+ }
+
+ pools[i].Put(buf)
+}
+
+func putPoolIdx(size int) int {
+ minPoolSize := 1 << minPoolExpOf2
+ for i := range pools {
+ if size == minPoolSize<<i {
+ return i
+ }
+ }
+
+ return -1
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go
new file mode 100644
index 000000000..4bf25481c
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go
@@ -0,0 +1,70 @@
+package nbconn
+
+import (
+ "sync"
+)
+
+const minBufferQueueLen = 8
+
+type bufferQueue struct {
+ lock sync.Mutex
+ queue []*[]byte
+ r, w int
+}
+
+func (bq *bufferQueue) pushBack(buf *[]byte) {
+ bq.lock.Lock()
+ defer bq.lock.Unlock()
+
+ if bq.w >= len(bq.queue) {
+ bq.growQueue()
+ }
+ bq.queue[bq.w] = buf
+ bq.w++
+}
+
+func (bq *bufferQueue) pushFront(buf *[]byte) {
+ bq.lock.Lock()
+ defer bq.lock.Unlock()
+
+ if bq.w >= len(bq.queue) {
+ bq.growQueue()
+ }
+ copy(bq.queue[bq.r+1:bq.w+1], bq.queue[bq.r:bq.w])
+ bq.queue[bq.r] = buf
+ bq.w++
+}
+
+func (bq *bufferQueue) popFront() *[]byte {
+ bq.lock.Lock()
+ defer bq.lock.Unlock()
+
+ if bq.r == bq.w {
+ return nil
+ }
+
+ buf := bq.queue[bq.r]
+ bq.queue[bq.r] = nil // Clear reference so it can be garbage collected.
+ bq.r++
+
+ if bq.r == bq.w {
+ bq.r = 0
+ bq.w = 0
+ if len(bq.queue) > minBufferQueueLen {
+ bq.queue = make([]*[]byte, minBufferQueueLen)
+ }
+ }
+
+ return buf
+}
+
+func (bq *bufferQueue) growQueue() {
+ desiredLen := (len(bq.queue) + 1) * 3 / 2
+ if desiredLen < minBufferQueueLen {
+ desiredLen = minBufferQueueLen
+ }
+
+ newQueue := make([]*[]byte, desiredLen)
+ copy(newQueue, bq.queue)
+ bq.queue = newQueue
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go
new file mode 100644
index 000000000..7a38383f0
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go
@@ -0,0 +1,520 @@
+// Package nbconn implements a non-blocking net.Conn wrapper.
+//
+// It is designed to solve three problems.
+//
+// The first is resolving the deadlock that can occur when both sides of a connection are blocked writing because all
+// buffers between are full. See https://github.com/jackc/pgconn/issues/27 for discussion.
+//
+// The second is the inability to use a write deadline with a TLS.Conn without killing the connection.
+//
+// The third is to efficiently check if a connection has been closed via a non-blocking read.
+package nbconn
+
+import (
+ "crypto/tls"
+ "errors"
+ "net"
+ "os"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ "github.com/jackc/pgx/v5/internal/iobufpool"
+)
+
+var errClosed = errors.New("closed")
+var ErrWouldBlock = new(wouldBlockError)
+
+const fakeNonblockingWriteWaitDuration = 100 * time.Millisecond
+const minNonblockingReadWaitDuration = time.Microsecond
+const maxNonblockingReadWaitDuration = 100 * time.Millisecond
+
+// NonBlockingDeadline is a magic value that when passed to Set[Read]Deadline places the connection in non-blocking read
+// mode.
+var NonBlockingDeadline = time.Date(1900, 1, 1, 0, 0, 0, 608536336, time.UTC)
+
+// disableSetDeadlineDeadline is a magic value that when passed to Set[Read|Write]Deadline causes those methods to
+// ignore all future calls.
+var disableSetDeadlineDeadline = time.Date(1900, 1, 1, 0, 0, 0, 968549727, time.UTC)
+
+// wouldBlockError implements net.Error so tls.Conn will recognize ErrWouldBlock as a temporary error.
+type wouldBlockError struct{}
+
+func (*wouldBlockError) Error() string {
+ return "would block"
+}
+
+func (*wouldBlockError) Timeout() bool { return true }
+func (*wouldBlockError) Temporary() bool { return true }
+
+// Conn is a net.Conn where Write never blocks and always succeeds. Flush or Read must be called to actually write to
+// the underlying connection.
+type Conn interface {
+ net.Conn
+
+ // Flush flushes any buffered writes.
+ Flush() error
+
+ // BufferReadUntilBlock reads and buffers any successfully read bytes until the read would block.
+ BufferReadUntilBlock() error
+}
+
+// NetConn is a non-blocking net.Conn wrapper. It implements net.Conn.
+type NetConn struct {
+ // 64 bit fields accessed with atomics must be at beginning of struct to guarantee alignment for certain 32-bit
+ // architectures. See BUGS section of https://pkg.go.dev/sync/atomic and https://github.com/jackc/pgx/issues/1288 and
+ // https://github.com/jackc/pgx/issues/1307. Only access with atomics
+ closed int64 // 0 = not closed, 1 = closed
+
+ conn net.Conn
+ rawConn syscall.RawConn
+
+ readQueue bufferQueue
+ writeQueue bufferQueue
+
+ readFlushLock sync.Mutex
+ // non-blocking writes with syscall.RawConn are done with a callback function. By using these fields instead of the
+ // callback functions closure to pass the buf argument and receive the n and err results we avoid some allocations.
+ nonblockWriteFunc func(fd uintptr) (done bool)
+ nonblockWriteBuf []byte
+ nonblockWriteErr error
+ nonblockWriteN int
+
+ // non-blocking reads with syscall.RawConn are done with a callback function. By using these fields instead of the
+ // callback functions closure to pass the buf argument and receive the n and err results we avoid some allocations.
+ nonblockReadFunc func(fd uintptr) (done bool)
+ nonblockReadBuf []byte
+ nonblockReadErr error
+ nonblockReadN int
+
+ readDeadlineLock sync.Mutex
+ readDeadline time.Time
+ readNonblocking bool
+ fakeNonBlockingShortReadCount int
+ fakeNonblockingReadWaitDuration time.Duration
+
+ writeDeadlineLock sync.Mutex
+ writeDeadline time.Time
+}
+
+func NewNetConn(conn net.Conn, fakeNonBlockingIO bool) *NetConn {
+ nc := &NetConn{
+ conn: conn,
+ fakeNonblockingReadWaitDuration: maxNonblockingReadWaitDuration,
+ }
+
+ if !fakeNonBlockingIO {
+ if sc, ok := conn.(syscall.Conn); ok {
+ if rawConn, err := sc.SyscallConn(); err == nil {
+ nc.rawConn = rawConn
+ }
+ }
+ }
+
+ return nc
+}
+
+// Read implements io.Reader.
+func (c *NetConn) Read(b []byte) (n int, err error) {
+ if c.isClosed() {
+ return 0, errClosed
+ }
+
+ c.readFlushLock.Lock()
+ defer c.readFlushLock.Unlock()
+
+ err = c.flush()
+ if err != nil {
+ return 0, err
+ }
+
+ for n < len(b) {
+ buf := c.readQueue.popFront()
+ if buf == nil {
+ break
+ }
+ copiedN := copy(b[n:], *buf)
+ if copiedN < len(*buf) {
+ *buf = (*buf)[copiedN:]
+ c.readQueue.pushFront(buf)
+ } else {
+ iobufpool.Put(buf)
+ }
+ n += copiedN
+ }
+
+ // If any bytes were already buffered return them without trying to do a Read. Otherwise, when the caller is trying to
+ // Read up to len(b) bytes but all available bytes have already been buffered the underlying Read would block.
+ if n > 0 {
+ return n, nil
+ }
+
+ var readNonblocking bool
+ c.readDeadlineLock.Lock()
+ readNonblocking = c.readNonblocking
+ c.readDeadlineLock.Unlock()
+
+ var readN int
+ if readNonblocking {
+ readN, err = c.nonblockingRead(b[n:])
+ } else {
+ readN, err = c.conn.Read(b[n:])
+ }
+ n += readN
+ return n, err
+}
+
+// Write implements io.Writer. It never blocks due to buffering all writes. It will only return an error if the Conn is
+// closed. Call Flush to actually write to the underlying connection.
+func (c *NetConn) Write(b []byte) (n int, err error) {
+ if c.isClosed() {
+ return 0, errClosed
+ }
+
+ buf := iobufpool.Get(len(b))
+ copy(*buf, b)
+ c.writeQueue.pushBack(buf)
+ return len(b), nil
+}
+
+func (c *NetConn) Close() (err error) {
+ swapped := atomic.CompareAndSwapInt64(&c.closed, 0, 1)
+ if !swapped {
+ return errClosed
+ }
+
+ defer func() {
+ closeErr := c.conn.Close()
+ if err == nil {
+ err = closeErr
+ }
+ }()
+
+ c.readFlushLock.Lock()
+ defer c.readFlushLock.Unlock()
+ err = c.flush()
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (c *NetConn) LocalAddr() net.Addr {
+ return c.conn.LocalAddr()
+}
+
+func (c *NetConn) RemoteAddr() net.Addr {
+ return c.conn.RemoteAddr()
+}
+
+// SetDeadline is the equivalent of calling SetReadDealine(t) and SetWriteDeadline(t).
+func (c *NetConn) SetDeadline(t time.Time) error {
+ err := c.SetReadDeadline(t)
+ if err != nil {
+ return err
+ }
+ return c.SetWriteDeadline(t)
+}
+
+// SetReadDeadline sets the read deadline as t. If t == NonBlockingDeadline then future reads will be non-blocking.
+func (c *NetConn) SetReadDeadline(t time.Time) error {
+ if c.isClosed() {
+ return errClosed
+ }
+
+ c.readDeadlineLock.Lock()
+ defer c.readDeadlineLock.Unlock()
+ if c.readDeadline == disableSetDeadlineDeadline {
+ return nil
+ }
+ if t == disableSetDeadlineDeadline {
+ c.readDeadline = t
+ return nil
+ }
+
+ if t == NonBlockingDeadline {
+ c.readNonblocking = true
+ t = time.Time{}
+ } else {
+ c.readNonblocking = false
+ }
+
+ c.readDeadline = t
+
+ return c.conn.SetReadDeadline(t)
+}
+
+func (c *NetConn) SetWriteDeadline(t time.Time) error {
+ if c.isClosed() {
+ return errClosed
+ }
+
+ c.writeDeadlineLock.Lock()
+ defer c.writeDeadlineLock.Unlock()
+ if c.writeDeadline == disableSetDeadlineDeadline {
+ return nil
+ }
+ if t == disableSetDeadlineDeadline {
+ c.writeDeadline = t
+ return nil
+ }
+
+ c.writeDeadline = t
+
+ return c.conn.SetWriteDeadline(t)
+}
+
+func (c *NetConn) Flush() error {
+ if c.isClosed() {
+ return errClosed
+ }
+
+ c.readFlushLock.Lock()
+ defer c.readFlushLock.Unlock()
+ return c.flush()
+}
+
+// flush does the actual work of flushing the writeQueue. readFlushLock must already be held.
+func (c *NetConn) flush() error {
+ var stopChan chan struct{}
+ var errChan chan error
+
+ defer func() {
+ if stopChan != nil {
+ select {
+ case stopChan <- struct{}{}:
+ case <-errChan:
+ }
+ }
+ }()
+
+ for buf := c.writeQueue.popFront(); buf != nil; buf = c.writeQueue.popFront() {
+ remainingBuf := *buf
+ for len(remainingBuf) > 0 {
+ n, err := c.nonblockingWrite(remainingBuf)
+ remainingBuf = remainingBuf[n:]
+ if err != nil {
+ if !errors.Is(err, ErrWouldBlock) {
+ *buf = (*buf)[:len(remainingBuf)]
+ copy(*buf, remainingBuf)
+ c.writeQueue.pushFront(buf)
+ return err
+ }
+
+ // Writing was blocked. Reading might unblock it.
+ if stopChan == nil {
+ stopChan, errChan = c.bufferNonblockingRead()
+ }
+
+ select {
+ case err := <-errChan:
+ stopChan = nil
+ return err
+ default:
+ }
+
+ }
+ }
+ iobufpool.Put(buf)
+ }
+
+ return nil
+}
+
+func (c *NetConn) BufferReadUntilBlock() error {
+ for {
+ buf := iobufpool.Get(8 * 1024)
+ n, err := c.nonblockingRead(*buf)
+ if n > 0 {
+ *buf = (*buf)[:n]
+ c.readQueue.pushBack(buf)
+ } else if n == 0 {
+ iobufpool.Put(buf)
+ }
+
+ if err != nil {
+ if errors.Is(err, ErrWouldBlock) {
+ return nil
+ } else {
+ return err
+ }
+ }
+ }
+}
+
+func (c *NetConn) bufferNonblockingRead() (stopChan chan struct{}, errChan chan error) {
+ stopChan = make(chan struct{})
+ errChan = make(chan error, 1)
+
+ go func() {
+ for {
+ err := c.BufferReadUntilBlock()
+ if err != nil {
+ errChan <- err
+ return
+ }
+
+ select {
+ case <-stopChan:
+ return
+ default:
+ }
+ }
+ }()
+
+ return stopChan, errChan
+}
+
+func (c *NetConn) isClosed() bool {
+ closed := atomic.LoadInt64(&c.closed)
+ return closed == 1
+}
+
+func (c *NetConn) nonblockingWrite(b []byte) (n int, err error) {
+ if c.rawConn == nil {
+ return c.fakeNonblockingWrite(b)
+ } else {
+ return c.realNonblockingWrite(b)
+ }
+}
+
+func (c *NetConn) fakeNonblockingWrite(b []byte) (n int, err error) {
+ c.writeDeadlineLock.Lock()
+ defer c.writeDeadlineLock.Unlock()
+
+ deadline := time.Now().Add(fakeNonblockingWriteWaitDuration)
+ if c.writeDeadline.IsZero() || deadline.Before(c.writeDeadline) {
+ err = c.conn.SetWriteDeadline(deadline)
+ if err != nil {
+ return 0, err
+ }
+ defer func() {
+ // Ignoring error resetting deadline as there is nothing that can reasonably be done if it fails.
+ c.conn.SetWriteDeadline(c.writeDeadline)
+
+ if err != nil {
+ if errors.Is(err, os.ErrDeadlineExceeded) {
+ err = ErrWouldBlock
+ }
+ }
+ }()
+ }
+
+ return c.conn.Write(b)
+}
+
+func (c *NetConn) nonblockingRead(b []byte) (n int, err error) {
+ if c.rawConn == nil {
+ return c.fakeNonblockingRead(b)
+ } else {
+ return c.realNonblockingRead(b)
+ }
+}
+
+func (c *NetConn) fakeNonblockingRead(b []byte) (n int, err error) {
+ c.readDeadlineLock.Lock()
+ defer c.readDeadlineLock.Unlock()
+
+ // The first 5 reads only read 1 byte at a time. This should give us 4 chances to read when we are sure the bytes are
+ // already in Go or the OS's receive buffer.
+ if c.fakeNonBlockingShortReadCount < 5 && len(b) > 0 && c.fakeNonblockingReadWaitDuration < minNonblockingReadWaitDuration {
+ b = b[:1]
+ }
+
+ startTime := time.Now()
+ deadline := startTime.Add(c.fakeNonblockingReadWaitDuration)
+ if c.readDeadline.IsZero() || deadline.Before(c.readDeadline) {
+ err = c.conn.SetReadDeadline(deadline)
+ if err != nil {
+ return 0, err
+ }
+ defer func() {
+ // If the read was successful and the wait duration is not already the minimum
+ if err == nil && c.fakeNonblockingReadWaitDuration > minNonblockingReadWaitDuration {
+ endTime := time.Now()
+
+ if n > 0 && c.fakeNonBlockingShortReadCount < 5 {
+ c.fakeNonBlockingShortReadCount++
+ }
+
+ // The wait duration should be 2x the fastest read that has occurred. This should give reasonable assurance that
+ // a Read deadline will not block a read before it has a chance to read data already in Go or the OS's receive
+ // buffer.
+ proposedWait := endTime.Sub(startTime) * 2
+ if proposedWait < minNonblockingReadWaitDuration {
+ proposedWait = minNonblockingReadWaitDuration
+ }
+ if proposedWait < c.fakeNonblockingReadWaitDuration {
+ c.fakeNonblockingReadWaitDuration = proposedWait
+ }
+ }
+
+ // Ignoring error resetting deadline as there is nothing that can reasonably be done if it fails.
+ c.conn.SetReadDeadline(c.readDeadline)
+
+ if err != nil {
+ if errors.Is(err, os.ErrDeadlineExceeded) {
+ err = ErrWouldBlock
+ }
+ }
+ }()
+ }
+
+ return c.conn.Read(b)
+}
+
+// syscall.Conn is interface
+
+// TLSClient establishes a TLS connection as a client over conn using config.
+//
+// To avoid the first Read on the returned *TLSConn also triggering a Write due to the TLS handshake and thereby
+// potentially causing a read and write deadlines to behave unexpectedly, Handshake is called explicitly before the
+// *TLSConn is returned.
+func TLSClient(conn *NetConn, config *tls.Config) (*TLSConn, error) {
+ tc := tls.Client(conn, config)
+ err := tc.Handshake()
+ if err != nil {
+ return nil, err
+ }
+
+ // Ensure last written part of Handshake is actually sent.
+ err = conn.Flush()
+ if err != nil {
+ return nil, err
+ }
+
+ return &TLSConn{
+ tlsConn: tc,
+ nbConn: conn,
+ }, nil
+}
+
+// TLSConn is a TLS wrapper around a *Conn. It works around a temporary write error (such as a timeout) being fatal to a
+// tls.Conn.
+type TLSConn struct {
+ tlsConn *tls.Conn
+ nbConn *NetConn
+}
+
+func (tc *TLSConn) Read(b []byte) (n int, err error) { return tc.tlsConn.Read(b) }
+func (tc *TLSConn) Write(b []byte) (n int, err error) { return tc.tlsConn.Write(b) }
+func (tc *TLSConn) BufferReadUntilBlock() error { return tc.nbConn.BufferReadUntilBlock() }
+func (tc *TLSConn) Flush() error { return tc.nbConn.Flush() }
+func (tc *TLSConn) LocalAddr() net.Addr { return tc.tlsConn.LocalAddr() }
+func (tc *TLSConn) RemoteAddr() net.Addr { return tc.tlsConn.RemoteAddr() }
+
+func (tc *TLSConn) Close() error {
+ // tls.Conn.closeNotify() sets a 5 second deadline to avoid blocking, sends a TLS alert close notification, and then
+ // sets the deadline to now. This causes NetConn's Close not to be able to flush the write buffer. Instead we set our
+ // own 5 second deadline then make all set deadlines no-op.
+ tc.tlsConn.SetDeadline(time.Now().Add(time.Second * 5))
+ tc.tlsConn.SetDeadline(disableSetDeadlineDeadline)
+
+ return tc.tlsConn.Close()
+}
+
+func (tc *TLSConn) SetDeadline(t time.Time) error { return tc.tlsConn.SetDeadline(t) }
+func (tc *TLSConn) SetReadDeadline(t time.Time) error { return tc.tlsConn.SetReadDeadline(t) }
+func (tc *TLSConn) SetWriteDeadline(t time.Time) error { return tc.tlsConn.SetWriteDeadline(t) }
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go
new file mode 100644
index 000000000..4915c6219
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go
@@ -0,0 +1,11 @@
+//go:build !unix
+
+package nbconn
+
+func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
+ return c.fakeNonblockingWrite(b)
+}
+
+func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
+ return c.fakeNonblockingRead(b)
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go
new file mode 100644
index 000000000..e93372f25
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go
@@ -0,0 +1,81 @@
+//go:build unix
+
+package nbconn
+
+import (
+ "errors"
+ "io"
+ "syscall"
+)
+
+// realNonblockingWrite does a non-blocking write. readFlushLock must already be held.
+func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
+ if c.nonblockWriteFunc == nil {
+ c.nonblockWriteFunc = func(fd uintptr) (done bool) {
+ c.nonblockWriteN, c.nonblockWriteErr = syscall.Write(int(fd), c.nonblockWriteBuf)
+ return true
+ }
+ }
+ c.nonblockWriteBuf = b
+ c.nonblockWriteN = 0
+ c.nonblockWriteErr = nil
+
+ err = c.rawConn.Write(c.nonblockWriteFunc)
+ n = c.nonblockWriteN
+ c.nonblockWriteBuf = nil // ensure that no reference to b is kept.
+ if err == nil && c.nonblockWriteErr != nil {
+ if errors.Is(c.nonblockWriteErr, syscall.EWOULDBLOCK) {
+ err = ErrWouldBlock
+ } else {
+ err = c.nonblockWriteErr
+ }
+ }
+ if err != nil {
+ // n may be -1 when an error occurs.
+ if n < 0 {
+ n = 0
+ }
+
+ return n, err
+ }
+
+ return n, nil
+}
+
+func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
+ if c.nonblockReadFunc == nil {
+ c.nonblockReadFunc = func(fd uintptr) (done bool) {
+ c.nonblockReadN, c.nonblockReadErr = syscall.Read(int(fd), c.nonblockReadBuf)
+ return true
+ }
+ }
+ c.nonblockReadBuf = b
+ c.nonblockReadN = 0
+ c.nonblockReadErr = nil
+
+ err = c.rawConn.Read(c.nonblockReadFunc)
+ n = c.nonblockReadN
+ c.nonblockReadBuf = nil // ensure that no reference to b is kept.
+ if err == nil && c.nonblockReadErr != nil {
+ if errors.Is(c.nonblockReadErr, syscall.EWOULDBLOCK) {
+ err = ErrWouldBlock
+ } else {
+ err = c.nonblockReadErr
+ }
+ }
+ if err != nil {
+ // n may be -1 when an error occurs.
+ if n < 0 {
+ n = 0
+ }
+
+ return n, err
+ }
+
+ // syscall read did not return an error and 0 bytes were read means EOF.
+ if n == 0 {
+ return 0, io.EOF
+ }
+
+ return n, nil
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/pgio/README.md b/vendor/github.com/jackc/pgx/v5/internal/pgio/README.md
new file mode 100644
index 000000000..b2fc58014
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/pgio/README.md
@@ -0,0 +1,6 @@
+# pgio
+
+Package pgio is a low-level toolkit building messages in the PostgreSQL wire protocol.
+
+pgio provides functions for appending integers to a []byte while doing byte
+order conversion.
diff --git a/vendor/github.com/jackc/pgx/v5/internal/pgio/doc.go b/vendor/github.com/jackc/pgx/v5/internal/pgio/doc.go
new file mode 100644
index 000000000..ef2dcc7f7
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/pgio/doc.go
@@ -0,0 +1,6 @@
+// Package pgio is a low-level toolkit building messages in the PostgreSQL wire protocol.
+/*
+pgio provides functions for appending integers to a []byte while doing byte
+order conversion.
+*/
+package pgio
diff --git a/vendor/github.com/jackc/pgx/v5/internal/pgio/write.go b/vendor/github.com/jackc/pgx/v5/internal/pgio/write.go
new file mode 100644
index 000000000..96aedf9dd
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/pgio/write.go
@@ -0,0 +1,40 @@
+package pgio
+
+import "encoding/binary"
+
+func AppendUint16(buf []byte, n uint16) []byte {
+ wp := len(buf)
+ buf = append(buf, 0, 0)
+ binary.BigEndian.PutUint16(buf[wp:], n)
+ return buf
+}
+
+func AppendUint32(buf []byte, n uint32) []byte {
+ wp := len(buf)
+ buf = append(buf, 0, 0, 0, 0)
+ binary.BigEndian.PutUint32(buf[wp:], n)
+ return buf
+}
+
+func AppendUint64(buf []byte, n uint64) []byte {
+ wp := len(buf)
+ buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0)
+ binary.BigEndian.PutUint64(buf[wp:], n)
+ return buf
+}
+
+func AppendInt16(buf []byte, n int16) []byte {
+ return AppendUint16(buf, uint16(n))
+}
+
+func AppendInt32(buf []byte, n int32) []byte {
+ return AppendUint32(buf, uint32(n))
+}
+
+func AppendInt64(buf []byte, n int64) []byte {
+ return AppendUint64(buf, uint64(n))
+}
+
+func SetInt32(buf []byte, n int32) {
+ binary.BigEndian.PutUint32(buf, uint32(n))
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go b/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go
new file mode 100644
index 000000000..e9e6d2287
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/sanitize/sanitize.go
@@ -0,0 +1,322 @@
+package sanitize
+
+import (
+ "bytes"
+ "encoding/hex"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+ "unicode/utf8"
+)
+
+// Part is either a string or an int. A string is raw SQL. An int is a
+// argument placeholder.
+type Part any
+
+type Query struct {
+ Parts []Part
+}
+
+// utf.DecodeRune returns the utf8.RuneError for errors. But that is actually rune U+FFFD -- the unicode replacement
+// character. utf8.RuneError is not an error if it is also width 3.
+//
+// https://github.com/jackc/pgx/issues/1380
+const replacementcharacterwidth = 3
+
+func (q *Query) Sanitize(args ...any) (string, error) {
+ argUse := make([]bool, len(args))
+ buf := &bytes.Buffer{}
+
+ for _, part := range q.Parts {
+ var str string
+ switch part := part.(type) {
+ case string:
+ str = part
+ case int:
+ argIdx := part - 1
+ if argIdx >= len(args) {
+ return "", fmt.Errorf("insufficient arguments")
+ }
+ arg := args[argIdx]
+ switch arg := arg.(type) {
+ case nil:
+ str = "null"
+ case int64:
+ str = strconv.FormatInt(arg, 10)
+ case float64:
+ str = strconv.FormatFloat(arg, 'f', -1, 64)
+ case bool:
+ str = strconv.FormatBool(arg)
+ case []byte:
+ str = QuoteBytes(arg)
+ case string:
+ str = QuoteString(arg)
+ case time.Time:
+ str = arg.Truncate(time.Microsecond).Format("'2006-01-02 15:04:05.999999999Z07:00:00'")
+ default:
+ return "", fmt.Errorf("invalid arg type: %T", arg)
+ }
+ argUse[argIdx] = true
+ default:
+ return "", fmt.Errorf("invalid Part type: %T", part)
+ }
+ buf.WriteString(str)
+ }
+
+ for i, used := range argUse {
+ if !used {
+ return "", fmt.Errorf("unused argument: %d", i)
+ }
+ }
+ return buf.String(), nil
+}
+
+func NewQuery(sql string) (*Query, error) {
+ l := &sqlLexer{
+ src: sql,
+ stateFn: rawState,
+ }
+
+ for l.stateFn != nil {
+ l.stateFn = l.stateFn(l)
+ }
+
+ query := &Query{Parts: l.parts}
+
+ return query, nil
+}
+
+func QuoteString(str string) string {
+ return "'" + strings.ReplaceAll(str, "'", "''") + "'"
+}
+
+func QuoteBytes(buf []byte) string {
+ return `'\x` + hex.EncodeToString(buf) + "'"
+}
+
+type sqlLexer struct {
+ src string
+ start int
+ pos int
+ nested int // multiline comment nesting level.
+ stateFn stateFn
+ parts []Part
+}
+
+type stateFn func(*sqlLexer) stateFn
+
+func rawState(l *sqlLexer) stateFn {
+ for {
+ r, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+
+ switch r {
+ case 'e', 'E':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune == '\'' {
+ l.pos += width
+ return escapeStringState
+ }
+ case '\'':
+ return singleQuoteState
+ case '"':
+ return doubleQuoteState
+ case '$':
+ nextRune, _ := utf8.DecodeRuneInString(l.src[l.pos:])
+ if '0' <= nextRune && nextRune <= '9' {
+ if l.pos-l.start > 0 {
+ l.parts = append(l.parts, l.src[l.start:l.pos-width])
+ }
+ l.start = l.pos
+ return placeholderState
+ }
+ case '-':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune == '-' {
+ l.pos += width
+ return oneLineCommentState
+ }
+ case '/':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune == '*' {
+ l.pos += width
+ return multilineCommentState
+ }
+ case utf8.RuneError:
+ if width != replacementcharacterwidth {
+ if l.pos-l.start > 0 {
+ l.parts = append(l.parts, l.src[l.start:l.pos])
+ l.start = l.pos
+ }
+ return nil
+ }
+ }
+ }
+}
+
+func singleQuoteState(l *sqlLexer) stateFn {
+ for {
+ r, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+
+ switch r {
+ case '\'':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune != '\'' {
+ return rawState
+ }
+ l.pos += width
+ case utf8.RuneError:
+ if width != replacementcharacterwidth {
+ if l.pos-l.start > 0 {
+ l.parts = append(l.parts, l.src[l.start:l.pos])
+ l.start = l.pos
+ }
+ return nil
+ }
+ }
+ }
+}
+
+func doubleQuoteState(l *sqlLexer) stateFn {
+ for {
+ r, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+
+ switch r {
+ case '"':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune != '"' {
+ return rawState
+ }
+ l.pos += width
+ case utf8.RuneError:
+ if width != replacementcharacterwidth {
+ if l.pos-l.start > 0 {
+ l.parts = append(l.parts, l.src[l.start:l.pos])
+ l.start = l.pos
+ }
+ return nil
+ }
+ }
+ }
+}
+
+// placeholderState consumes a placeholder value. The $ must have already has
+// already been consumed. The first rune must be a digit.
+func placeholderState(l *sqlLexer) stateFn {
+ num := 0
+
+ for {
+ r, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+
+ if '0' <= r && r <= '9' {
+ num *= 10
+ num += int(r - '0')
+ } else {
+ l.parts = append(l.parts, num)
+ l.pos -= width
+ l.start = l.pos
+ return rawState
+ }
+ }
+}
+
+func escapeStringState(l *sqlLexer) stateFn {
+ for {
+ r, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+
+ switch r {
+ case '\\':
+ _, width = utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+ case '\'':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune != '\'' {
+ return rawState
+ }
+ l.pos += width
+ case utf8.RuneError:
+ if width != replacementcharacterwidth {
+ if l.pos-l.start > 0 {
+ l.parts = append(l.parts, l.src[l.start:l.pos])
+ l.start = l.pos
+ }
+ return nil
+ }
+ }
+ }
+}
+
+func oneLineCommentState(l *sqlLexer) stateFn {
+ for {
+ r, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+
+ switch r {
+ case '\\':
+ _, width = utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+ case '\n', '\r':
+ return rawState
+ case utf8.RuneError:
+ if width != replacementcharacterwidth {
+ if l.pos-l.start > 0 {
+ l.parts = append(l.parts, l.src[l.start:l.pos])
+ l.start = l.pos
+ }
+ return nil
+ }
+ }
+ }
+}
+
+func multilineCommentState(l *sqlLexer) stateFn {
+ for {
+ r, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ l.pos += width
+
+ switch r {
+ case '/':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune == '*' {
+ l.pos += width
+ l.nested++
+ }
+ case '*':
+ nextRune, width := utf8.DecodeRuneInString(l.src[l.pos:])
+ if nextRune != '/' {
+ continue
+ }
+
+ l.pos += width
+ if l.nested == 0 {
+ return rawState
+ }
+ l.nested--
+
+ case utf8.RuneError:
+ if width != replacementcharacterwidth {
+ if l.pos-l.start > 0 {
+ l.parts = append(l.parts, l.src[l.start:l.pos])
+ l.start = l.pos
+ }
+ return nil
+ }
+ }
+ }
+}
+
+// SanitizeSQL replaces placeholder values with args. It quotes and escapes args
+// as necessary. This function is only safe when standard_conforming_strings is
+// on.
+func SanitizeSQL(sql string, args ...any) (string, error) {
+ query, err := NewQuery(sql)
+ if err != nil {
+ return "", err
+ }
+ return query.Sanitize(args...)
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/stmtcache/lru_cache.go b/vendor/github.com/jackc/pgx/v5/internal/stmtcache/lru_cache.go
new file mode 100644
index 000000000..a25cc8b1c
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/stmtcache/lru_cache.go
@@ -0,0 +1,98 @@
+package stmtcache
+
+import (
+ "container/list"
+
+ "github.com/jackc/pgx/v5/pgconn"
+)
+
+// LRUCache implements Cache with a Least Recently Used (LRU) cache.
+type LRUCache struct {
+ cap int
+ m map[string]*list.Element
+ l *list.List
+ invalidStmts []*pgconn.StatementDescription
+}
+
+// NewLRUCache creates a new LRUCache. cap is the maximum size of the cache.
+func NewLRUCache(cap int) *LRUCache {
+ return &LRUCache{
+ cap: cap,
+ m: make(map[string]*list.Element),
+ l: list.New(),
+ }
+}
+
+// Get returns the statement description for sql. Returns nil if not found.
+func (c *LRUCache) Get(key string) *pgconn.StatementDescription {
+ if el, ok := c.m[key]; ok {
+ c.l.MoveToFront(el)
+ return el.Value.(*pgconn.StatementDescription)
+ }
+
+ return nil
+
+}
+
+// Put stores sd in the cache. Put panics if sd.SQL is "". Put does nothing if sd.SQL already exists in the cache.
+func (c *LRUCache) Put(sd *pgconn.StatementDescription) {
+ if sd.SQL == "" {
+ panic("cannot store statement description with empty SQL")
+ }
+
+ if _, present := c.m[sd.SQL]; present {
+ return
+ }
+
+ if c.l.Len() == c.cap {
+ c.invalidateOldest()
+ }
+
+ el := c.l.PushFront(sd)
+ c.m[sd.SQL] = el
+}
+
+// Invalidate invalidates statement description identified by sql. Does nothing if not found.
+func (c *LRUCache) Invalidate(sql string) {
+ if el, ok := c.m[sql]; ok {
+ delete(c.m, sql)
+ c.invalidStmts = append(c.invalidStmts, el.Value.(*pgconn.StatementDescription))
+ c.l.Remove(el)
+ }
+}
+
+// InvalidateAll invalidates all statement descriptions.
+func (c *LRUCache) InvalidateAll() {
+ el := c.l.Front()
+ for el != nil {
+ c.invalidStmts = append(c.invalidStmts, el.Value.(*pgconn.StatementDescription))
+ el = el.Next()
+ }
+
+ c.m = make(map[string]*list.Element)
+ c.l = list.New()
+}
+
+func (c *LRUCache) HandleInvalidated() []*pgconn.StatementDescription {
+ invalidStmts := c.invalidStmts
+ c.invalidStmts = nil
+ return invalidStmts
+}
+
+// Len returns the number of cached prepared statement descriptions.
+func (c *LRUCache) Len() int {
+ return c.l.Len()
+}
+
+// Cap returns the maximum number of cached prepared statement descriptions.
+func (c *LRUCache) Cap() int {
+ return c.cap
+}
+
+func (c *LRUCache) invalidateOldest() {
+ oldest := c.l.Back()
+ sd := oldest.Value.(*pgconn.StatementDescription)
+ c.invalidStmts = append(c.invalidStmts, sd)
+ delete(c.m, sd.SQL)
+ c.l.Remove(oldest)
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/stmtcache/stmtcache.go b/vendor/github.com/jackc/pgx/v5/internal/stmtcache/stmtcache.go
new file mode 100644
index 000000000..e1bdcba57
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/stmtcache/stmtcache.go
@@ -0,0 +1,57 @@
+// Package stmtcache is a cache for statement descriptions.
+package stmtcache
+
+import (
+ "strconv"
+ "sync/atomic"
+
+ "github.com/jackc/pgx/v5/pgconn"
+)
+
+var stmtCounter int64
+
+// NextStatementName returns a statement name that will be unique for the lifetime of the program.
+func NextStatementName() string {
+ n := atomic.AddInt64(&stmtCounter, 1)
+ return "stmtcache_" + strconv.FormatInt(n, 10)
+}
+
+// Cache caches statement descriptions.
+type Cache interface {
+ // Get returns the statement description for sql. Returns nil if not found.
+ Get(sql string) *pgconn.StatementDescription
+
+ // Put stores sd in the cache. Put panics if sd.SQL is "". Put does nothing if sd.SQL already exists in the cache.
+ Put(sd *pgconn.StatementDescription)
+
+ // Invalidate invalidates statement description identified by sql. Does nothing if not found.
+ Invalidate(sql string)
+
+ // InvalidateAll invalidates all statement descriptions.
+ InvalidateAll()
+
+ // HandleInvalidated returns a slice of all statement descriptions invalidated since the last call to HandleInvalidated.
+ HandleInvalidated() []*pgconn.StatementDescription
+
+ // Len returns the number of cached prepared statement descriptions.
+ Len() int
+
+ // Cap returns the maximum number of cached prepared statement descriptions.
+ Cap() int
+}
+
+func IsStatementInvalid(err error) bool {
+ pgErr, ok := err.(*pgconn.PgError)
+ if !ok {
+ return false
+ }
+
+ // https://github.com/jackc/pgx/issues/1162
+ //
+ // We used to look for the message "cached plan must not change result type". However, that message can be localized.
+ // Unfortunately, error code "0A000" - "FEATURE NOT SUPPORTED" is used for many different errors and the only way to
+ // tell the difference is by the message. But all that happens is we clear a statement that we otherwise wouldn't
+ // have so it should be safe.
+ possibleInvalidCachedPlanError := pgErr.Code == "0A000"
+ return possibleInvalidCachedPlanError
+}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/stmtcache/unlimited_cache.go b/vendor/github.com/jackc/pgx/v5/internal/stmtcache/unlimited_cache.go
new file mode 100644
index 000000000..f5f59396e
--- /dev/null
+++ b/vendor/github.com/jackc/pgx/v5/internal/stmtcache/unlimited_cache.go
@@ -0,0 +1,71 @@
+package stmtcache
+
+import (
+ "math"
+
+ "github.com/jackc/pgx/v5/pgconn"
+)
+
+// UnlimitedCache implements Cache with no capacity limit.
+type UnlimitedCache struct {
+ m map[string]*pgconn.StatementDescription
+ invalidStmts []*pgconn.StatementDescription
+}
+
+// NewUnlimitedCache creates a new UnlimitedCache.
+func NewUnlimitedCache() *UnlimitedCache {
+ return &UnlimitedCache{
+ m: make(map[string]*pgconn.StatementDescription),
+ }
+}
+
+// Get returns the statement description for sql. Returns nil if not found.
+func (c *UnlimitedCache) Get(sql string) *pgconn.StatementDescription {
+ return c.m[sql]
+}
+
+// Put stores sd in the cache. Put panics if sd.SQL is "". Put does nothing if sd.SQL already exists in the cache.
+func (c *UnlimitedCache) Put(sd *pgconn.StatementDescription) {
+ if sd.SQL == "" {
+ panic("cannot store statement description with empty SQL")
+ }
+
+ if _, present := c.m[sd.SQL]; present {
+ return
+ }
+
+ c.m[sd.SQL] = sd
+}
+
+// Invalidate invalidates statement description identified by sql. Does nothing if not found.
+func (c *UnlimitedCache) Invalidate(sql string) {
+ if sd, ok := c.m[sql]; ok {
+ delete(c.m, sql)
+ c.invalidStmts = append(c.invalidStmts, sd)
+ }
+}
+
+// InvalidateAll invalidates all statement descriptions.
+func (c *UnlimitedCache) InvalidateAll() {
+ for _, sd := range c.m {
+ c.invalidStmts = append(c.invalidStmts, sd)
+ }
+
+ c.m = make(map[string]*pgconn.StatementDescription)
+}
+
+func (c *UnlimitedCache) HandleInvalidated() []*pgconn.StatementDescription {
+ invalidStmts := c.invalidStmts
+ c.invalidStmts = nil
+ return invalidStmts
+}
+
+// Len returns the number of cached prepared statement descriptions.
+func (c *UnlimitedCache) Len() int {
+ return len(c.m)
+}
+
+// Cap returns the maximum number of cached prepared statement descriptions.
+func (c *UnlimitedCache) Cap() int {
+ return math.MaxInt
+}