summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/internal/nbconn
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-06-19 09:56:15 +0200
committerLibravatar GitHub <noreply@github.com>2023-06-19 09:56:15 +0200
commit70357a770f7e84eed1d3d6a0dfc2ce9f30e5c5be (patch)
tree7462b5e97a94a874b488404e2029fd182e9ae030 /vendor/github.com/jackc/pgx/v5/internal/nbconn
parent[chore]: Bump golang.org/x/net from 0.10.0 to 0.11.0 (#1909) (diff)
downloadgotosocial-70357a770f7e84eed1d3d6a0dfc2ce9f30e5c5be.tar.xz
[chore]: Bump github.com/jackc/pgx/v5 from 5.3.1 to 5.4.1 (#1907)
Bumps [github.com/jackc/pgx/v5](https://github.com/jackc/pgx) from 5.3.1 to 5.4.1. - [Changelog](https://github.com/jackc/pgx/blob/master/CHANGELOG.md) - [Commits](https://github.com/jackc/pgx/compare/v5.3.1...v5.4.1) --- updated-dependencies: - dependency-name: github.com/jackc/pgx/v5 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/internal/nbconn')
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go70
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go520
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go11
-rw-r--r--vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go81
4 files changed, 0 insertions, 682 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go
deleted file mode 100644
index 4bf25481c..000000000
--- a/vendor/github.com/jackc/pgx/v5/internal/nbconn/bufferqueue.go
+++ /dev/null
@@ -1,70 +0,0 @@
-package nbconn
-
-import (
- "sync"
-)
-
-const minBufferQueueLen = 8
-
-type bufferQueue struct {
- lock sync.Mutex
- queue []*[]byte
- r, w int
-}
-
-func (bq *bufferQueue) pushBack(buf *[]byte) {
- bq.lock.Lock()
- defer bq.lock.Unlock()
-
- if bq.w >= len(bq.queue) {
- bq.growQueue()
- }
- bq.queue[bq.w] = buf
- bq.w++
-}
-
-func (bq *bufferQueue) pushFront(buf *[]byte) {
- bq.lock.Lock()
- defer bq.lock.Unlock()
-
- if bq.w >= len(bq.queue) {
- bq.growQueue()
- }
- copy(bq.queue[bq.r+1:bq.w+1], bq.queue[bq.r:bq.w])
- bq.queue[bq.r] = buf
- bq.w++
-}
-
-func (bq *bufferQueue) popFront() *[]byte {
- bq.lock.Lock()
- defer bq.lock.Unlock()
-
- if bq.r == bq.w {
- return nil
- }
-
- buf := bq.queue[bq.r]
- bq.queue[bq.r] = nil // Clear reference so it can be garbage collected.
- bq.r++
-
- if bq.r == bq.w {
- bq.r = 0
- bq.w = 0
- if len(bq.queue) > minBufferQueueLen {
- bq.queue = make([]*[]byte, minBufferQueueLen)
- }
- }
-
- return buf
-}
-
-func (bq *bufferQueue) growQueue() {
- desiredLen := (len(bq.queue) + 1) * 3 / 2
- if desiredLen < minBufferQueueLen {
- desiredLen = minBufferQueueLen
- }
-
- newQueue := make([]*[]byte, desiredLen)
- copy(newQueue, bq.queue)
- bq.queue = newQueue
-}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go
deleted file mode 100644
index 7a38383f0..000000000
--- a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn.go
+++ /dev/null
@@ -1,520 +0,0 @@
-// Package nbconn implements a non-blocking net.Conn wrapper.
-//
-// It is designed to solve three problems.
-//
-// The first is resolving the deadlock that can occur when both sides of a connection are blocked writing because all
-// buffers between are full. See https://github.com/jackc/pgconn/issues/27 for discussion.
-//
-// The second is the inability to use a write deadline with a TLS.Conn without killing the connection.
-//
-// The third is to efficiently check if a connection has been closed via a non-blocking read.
-package nbconn
-
-import (
- "crypto/tls"
- "errors"
- "net"
- "os"
- "sync"
- "sync/atomic"
- "syscall"
- "time"
-
- "github.com/jackc/pgx/v5/internal/iobufpool"
-)
-
-var errClosed = errors.New("closed")
-var ErrWouldBlock = new(wouldBlockError)
-
-const fakeNonblockingWriteWaitDuration = 100 * time.Millisecond
-const minNonblockingReadWaitDuration = time.Microsecond
-const maxNonblockingReadWaitDuration = 100 * time.Millisecond
-
-// NonBlockingDeadline is a magic value that when passed to Set[Read]Deadline places the connection in non-blocking read
-// mode.
-var NonBlockingDeadline = time.Date(1900, 1, 1, 0, 0, 0, 608536336, time.UTC)
-
-// disableSetDeadlineDeadline is a magic value that when passed to Set[Read|Write]Deadline causes those methods to
-// ignore all future calls.
-var disableSetDeadlineDeadline = time.Date(1900, 1, 1, 0, 0, 0, 968549727, time.UTC)
-
-// wouldBlockError implements net.Error so tls.Conn will recognize ErrWouldBlock as a temporary error.
-type wouldBlockError struct{}
-
-func (*wouldBlockError) Error() string {
- return "would block"
-}
-
-func (*wouldBlockError) Timeout() bool { return true }
-func (*wouldBlockError) Temporary() bool { return true }
-
-// Conn is a net.Conn where Write never blocks and always succeeds. Flush or Read must be called to actually write to
-// the underlying connection.
-type Conn interface {
- net.Conn
-
- // Flush flushes any buffered writes.
- Flush() error
-
- // BufferReadUntilBlock reads and buffers any successfully read bytes until the read would block.
- BufferReadUntilBlock() error
-}
-
-// NetConn is a non-blocking net.Conn wrapper. It implements net.Conn.
-type NetConn struct {
- // 64 bit fields accessed with atomics must be at beginning of struct to guarantee alignment for certain 32-bit
- // architectures. See BUGS section of https://pkg.go.dev/sync/atomic and https://github.com/jackc/pgx/issues/1288 and
- // https://github.com/jackc/pgx/issues/1307. Only access with atomics
- closed int64 // 0 = not closed, 1 = closed
-
- conn net.Conn
- rawConn syscall.RawConn
-
- readQueue bufferQueue
- writeQueue bufferQueue
-
- readFlushLock sync.Mutex
- // non-blocking writes with syscall.RawConn are done with a callback function. By using these fields instead of the
- // callback functions closure to pass the buf argument and receive the n and err results we avoid some allocations.
- nonblockWriteFunc func(fd uintptr) (done bool)
- nonblockWriteBuf []byte
- nonblockWriteErr error
- nonblockWriteN int
-
- // non-blocking reads with syscall.RawConn are done with a callback function. By using these fields instead of the
- // callback functions closure to pass the buf argument and receive the n and err results we avoid some allocations.
- nonblockReadFunc func(fd uintptr) (done bool)
- nonblockReadBuf []byte
- nonblockReadErr error
- nonblockReadN int
-
- readDeadlineLock sync.Mutex
- readDeadline time.Time
- readNonblocking bool
- fakeNonBlockingShortReadCount int
- fakeNonblockingReadWaitDuration time.Duration
-
- writeDeadlineLock sync.Mutex
- writeDeadline time.Time
-}
-
-func NewNetConn(conn net.Conn, fakeNonBlockingIO bool) *NetConn {
- nc := &NetConn{
- conn: conn,
- fakeNonblockingReadWaitDuration: maxNonblockingReadWaitDuration,
- }
-
- if !fakeNonBlockingIO {
- if sc, ok := conn.(syscall.Conn); ok {
- if rawConn, err := sc.SyscallConn(); err == nil {
- nc.rawConn = rawConn
- }
- }
- }
-
- return nc
-}
-
-// Read implements io.Reader.
-func (c *NetConn) Read(b []byte) (n int, err error) {
- if c.isClosed() {
- return 0, errClosed
- }
-
- c.readFlushLock.Lock()
- defer c.readFlushLock.Unlock()
-
- err = c.flush()
- if err != nil {
- return 0, err
- }
-
- for n < len(b) {
- buf := c.readQueue.popFront()
- if buf == nil {
- break
- }
- copiedN := copy(b[n:], *buf)
- if copiedN < len(*buf) {
- *buf = (*buf)[copiedN:]
- c.readQueue.pushFront(buf)
- } else {
- iobufpool.Put(buf)
- }
- n += copiedN
- }
-
- // If any bytes were already buffered return them without trying to do a Read. Otherwise, when the caller is trying to
- // Read up to len(b) bytes but all available bytes have already been buffered the underlying Read would block.
- if n > 0 {
- return n, nil
- }
-
- var readNonblocking bool
- c.readDeadlineLock.Lock()
- readNonblocking = c.readNonblocking
- c.readDeadlineLock.Unlock()
-
- var readN int
- if readNonblocking {
- readN, err = c.nonblockingRead(b[n:])
- } else {
- readN, err = c.conn.Read(b[n:])
- }
- n += readN
- return n, err
-}
-
-// Write implements io.Writer. It never blocks due to buffering all writes. It will only return an error if the Conn is
-// closed. Call Flush to actually write to the underlying connection.
-func (c *NetConn) Write(b []byte) (n int, err error) {
- if c.isClosed() {
- return 0, errClosed
- }
-
- buf := iobufpool.Get(len(b))
- copy(*buf, b)
- c.writeQueue.pushBack(buf)
- return len(b), nil
-}
-
-func (c *NetConn) Close() (err error) {
- swapped := atomic.CompareAndSwapInt64(&c.closed, 0, 1)
- if !swapped {
- return errClosed
- }
-
- defer func() {
- closeErr := c.conn.Close()
- if err == nil {
- err = closeErr
- }
- }()
-
- c.readFlushLock.Lock()
- defer c.readFlushLock.Unlock()
- err = c.flush()
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (c *NetConn) LocalAddr() net.Addr {
- return c.conn.LocalAddr()
-}
-
-func (c *NetConn) RemoteAddr() net.Addr {
- return c.conn.RemoteAddr()
-}
-
-// SetDeadline is the equivalent of calling SetReadDealine(t) and SetWriteDeadline(t).
-func (c *NetConn) SetDeadline(t time.Time) error {
- err := c.SetReadDeadline(t)
- if err != nil {
- return err
- }
- return c.SetWriteDeadline(t)
-}
-
-// SetReadDeadline sets the read deadline as t. If t == NonBlockingDeadline then future reads will be non-blocking.
-func (c *NetConn) SetReadDeadline(t time.Time) error {
- if c.isClosed() {
- return errClosed
- }
-
- c.readDeadlineLock.Lock()
- defer c.readDeadlineLock.Unlock()
- if c.readDeadline == disableSetDeadlineDeadline {
- return nil
- }
- if t == disableSetDeadlineDeadline {
- c.readDeadline = t
- return nil
- }
-
- if t == NonBlockingDeadline {
- c.readNonblocking = true
- t = time.Time{}
- } else {
- c.readNonblocking = false
- }
-
- c.readDeadline = t
-
- return c.conn.SetReadDeadline(t)
-}
-
-func (c *NetConn) SetWriteDeadline(t time.Time) error {
- if c.isClosed() {
- return errClosed
- }
-
- c.writeDeadlineLock.Lock()
- defer c.writeDeadlineLock.Unlock()
- if c.writeDeadline == disableSetDeadlineDeadline {
- return nil
- }
- if t == disableSetDeadlineDeadline {
- c.writeDeadline = t
- return nil
- }
-
- c.writeDeadline = t
-
- return c.conn.SetWriteDeadline(t)
-}
-
-func (c *NetConn) Flush() error {
- if c.isClosed() {
- return errClosed
- }
-
- c.readFlushLock.Lock()
- defer c.readFlushLock.Unlock()
- return c.flush()
-}
-
-// flush does the actual work of flushing the writeQueue. readFlushLock must already be held.
-func (c *NetConn) flush() error {
- var stopChan chan struct{}
- var errChan chan error
-
- defer func() {
- if stopChan != nil {
- select {
- case stopChan <- struct{}{}:
- case <-errChan:
- }
- }
- }()
-
- for buf := c.writeQueue.popFront(); buf != nil; buf = c.writeQueue.popFront() {
- remainingBuf := *buf
- for len(remainingBuf) > 0 {
- n, err := c.nonblockingWrite(remainingBuf)
- remainingBuf = remainingBuf[n:]
- if err != nil {
- if !errors.Is(err, ErrWouldBlock) {
- *buf = (*buf)[:len(remainingBuf)]
- copy(*buf, remainingBuf)
- c.writeQueue.pushFront(buf)
- return err
- }
-
- // Writing was blocked. Reading might unblock it.
- if stopChan == nil {
- stopChan, errChan = c.bufferNonblockingRead()
- }
-
- select {
- case err := <-errChan:
- stopChan = nil
- return err
- default:
- }
-
- }
- }
- iobufpool.Put(buf)
- }
-
- return nil
-}
-
-func (c *NetConn) BufferReadUntilBlock() error {
- for {
- buf := iobufpool.Get(8 * 1024)
- n, err := c.nonblockingRead(*buf)
- if n > 0 {
- *buf = (*buf)[:n]
- c.readQueue.pushBack(buf)
- } else if n == 0 {
- iobufpool.Put(buf)
- }
-
- if err != nil {
- if errors.Is(err, ErrWouldBlock) {
- return nil
- } else {
- return err
- }
- }
- }
-}
-
-func (c *NetConn) bufferNonblockingRead() (stopChan chan struct{}, errChan chan error) {
- stopChan = make(chan struct{})
- errChan = make(chan error, 1)
-
- go func() {
- for {
- err := c.BufferReadUntilBlock()
- if err != nil {
- errChan <- err
- return
- }
-
- select {
- case <-stopChan:
- return
- default:
- }
- }
- }()
-
- return stopChan, errChan
-}
-
-func (c *NetConn) isClosed() bool {
- closed := atomic.LoadInt64(&c.closed)
- return closed == 1
-}
-
-func (c *NetConn) nonblockingWrite(b []byte) (n int, err error) {
- if c.rawConn == nil {
- return c.fakeNonblockingWrite(b)
- } else {
- return c.realNonblockingWrite(b)
- }
-}
-
-func (c *NetConn) fakeNonblockingWrite(b []byte) (n int, err error) {
- c.writeDeadlineLock.Lock()
- defer c.writeDeadlineLock.Unlock()
-
- deadline := time.Now().Add(fakeNonblockingWriteWaitDuration)
- if c.writeDeadline.IsZero() || deadline.Before(c.writeDeadline) {
- err = c.conn.SetWriteDeadline(deadline)
- if err != nil {
- return 0, err
- }
- defer func() {
- // Ignoring error resetting deadline as there is nothing that can reasonably be done if it fails.
- c.conn.SetWriteDeadline(c.writeDeadline)
-
- if err != nil {
- if errors.Is(err, os.ErrDeadlineExceeded) {
- err = ErrWouldBlock
- }
- }
- }()
- }
-
- return c.conn.Write(b)
-}
-
-func (c *NetConn) nonblockingRead(b []byte) (n int, err error) {
- if c.rawConn == nil {
- return c.fakeNonblockingRead(b)
- } else {
- return c.realNonblockingRead(b)
- }
-}
-
-func (c *NetConn) fakeNonblockingRead(b []byte) (n int, err error) {
- c.readDeadlineLock.Lock()
- defer c.readDeadlineLock.Unlock()
-
- // The first 5 reads only read 1 byte at a time. This should give us 4 chances to read when we are sure the bytes are
- // already in Go or the OS's receive buffer.
- if c.fakeNonBlockingShortReadCount < 5 && len(b) > 0 && c.fakeNonblockingReadWaitDuration < minNonblockingReadWaitDuration {
- b = b[:1]
- }
-
- startTime := time.Now()
- deadline := startTime.Add(c.fakeNonblockingReadWaitDuration)
- if c.readDeadline.IsZero() || deadline.Before(c.readDeadline) {
- err = c.conn.SetReadDeadline(deadline)
- if err != nil {
- return 0, err
- }
- defer func() {
- // If the read was successful and the wait duration is not already the minimum
- if err == nil && c.fakeNonblockingReadWaitDuration > minNonblockingReadWaitDuration {
- endTime := time.Now()
-
- if n > 0 && c.fakeNonBlockingShortReadCount < 5 {
- c.fakeNonBlockingShortReadCount++
- }
-
- // The wait duration should be 2x the fastest read that has occurred. This should give reasonable assurance that
- // a Read deadline will not block a read before it has a chance to read data already in Go or the OS's receive
- // buffer.
- proposedWait := endTime.Sub(startTime) * 2
- if proposedWait < minNonblockingReadWaitDuration {
- proposedWait = minNonblockingReadWaitDuration
- }
- if proposedWait < c.fakeNonblockingReadWaitDuration {
- c.fakeNonblockingReadWaitDuration = proposedWait
- }
- }
-
- // Ignoring error resetting deadline as there is nothing that can reasonably be done if it fails.
- c.conn.SetReadDeadline(c.readDeadline)
-
- if err != nil {
- if errors.Is(err, os.ErrDeadlineExceeded) {
- err = ErrWouldBlock
- }
- }
- }()
- }
-
- return c.conn.Read(b)
-}
-
-// syscall.Conn is interface
-
-// TLSClient establishes a TLS connection as a client over conn using config.
-//
-// To avoid the first Read on the returned *TLSConn also triggering a Write due to the TLS handshake and thereby
-// potentially causing a read and write deadlines to behave unexpectedly, Handshake is called explicitly before the
-// *TLSConn is returned.
-func TLSClient(conn *NetConn, config *tls.Config) (*TLSConn, error) {
- tc := tls.Client(conn, config)
- err := tc.Handshake()
- if err != nil {
- return nil, err
- }
-
- // Ensure last written part of Handshake is actually sent.
- err = conn.Flush()
- if err != nil {
- return nil, err
- }
-
- return &TLSConn{
- tlsConn: tc,
- nbConn: conn,
- }, nil
-}
-
-// TLSConn is a TLS wrapper around a *Conn. It works around a temporary write error (such as a timeout) being fatal to a
-// tls.Conn.
-type TLSConn struct {
- tlsConn *tls.Conn
- nbConn *NetConn
-}
-
-func (tc *TLSConn) Read(b []byte) (n int, err error) { return tc.tlsConn.Read(b) }
-func (tc *TLSConn) Write(b []byte) (n int, err error) { return tc.tlsConn.Write(b) }
-func (tc *TLSConn) BufferReadUntilBlock() error { return tc.nbConn.BufferReadUntilBlock() }
-func (tc *TLSConn) Flush() error { return tc.nbConn.Flush() }
-func (tc *TLSConn) LocalAddr() net.Addr { return tc.tlsConn.LocalAddr() }
-func (tc *TLSConn) RemoteAddr() net.Addr { return tc.tlsConn.RemoteAddr() }
-
-func (tc *TLSConn) Close() error {
- // tls.Conn.closeNotify() sets a 5 second deadline to avoid blocking, sends a TLS alert close notification, and then
- // sets the deadline to now. This causes NetConn's Close not to be able to flush the write buffer. Instead we set our
- // own 5 second deadline then make all set deadlines no-op.
- tc.tlsConn.SetDeadline(time.Now().Add(time.Second * 5))
- tc.tlsConn.SetDeadline(disableSetDeadlineDeadline)
-
- return tc.tlsConn.Close()
-}
-
-func (tc *TLSConn) SetDeadline(t time.Time) error { return tc.tlsConn.SetDeadline(t) }
-func (tc *TLSConn) SetReadDeadline(t time.Time) error { return tc.tlsConn.SetReadDeadline(t) }
-func (tc *TLSConn) SetWriteDeadline(t time.Time) error { return tc.tlsConn.SetWriteDeadline(t) }
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go
deleted file mode 100644
index 4915c6219..000000000
--- a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_fake_non_block.go
+++ /dev/null
@@ -1,11 +0,0 @@
-//go:build !unix
-
-package nbconn
-
-func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
- return c.fakeNonblockingWrite(b)
-}
-
-func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
- return c.fakeNonblockingRead(b)
-}
diff --git a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go b/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go
deleted file mode 100644
index e93372f25..000000000
--- a/vendor/github.com/jackc/pgx/v5/internal/nbconn/nbconn_real_non_block.go
+++ /dev/null
@@ -1,81 +0,0 @@
-//go:build unix
-
-package nbconn
-
-import (
- "errors"
- "io"
- "syscall"
-)
-
-// realNonblockingWrite does a non-blocking write. readFlushLock must already be held.
-func (c *NetConn) realNonblockingWrite(b []byte) (n int, err error) {
- if c.nonblockWriteFunc == nil {
- c.nonblockWriteFunc = func(fd uintptr) (done bool) {
- c.nonblockWriteN, c.nonblockWriteErr = syscall.Write(int(fd), c.nonblockWriteBuf)
- return true
- }
- }
- c.nonblockWriteBuf = b
- c.nonblockWriteN = 0
- c.nonblockWriteErr = nil
-
- err = c.rawConn.Write(c.nonblockWriteFunc)
- n = c.nonblockWriteN
- c.nonblockWriteBuf = nil // ensure that no reference to b is kept.
- if err == nil && c.nonblockWriteErr != nil {
- if errors.Is(c.nonblockWriteErr, syscall.EWOULDBLOCK) {
- err = ErrWouldBlock
- } else {
- err = c.nonblockWriteErr
- }
- }
- if err != nil {
- // n may be -1 when an error occurs.
- if n < 0 {
- n = 0
- }
-
- return n, err
- }
-
- return n, nil
-}
-
-func (c *NetConn) realNonblockingRead(b []byte) (n int, err error) {
- if c.nonblockReadFunc == nil {
- c.nonblockReadFunc = func(fd uintptr) (done bool) {
- c.nonblockReadN, c.nonblockReadErr = syscall.Read(int(fd), c.nonblockReadBuf)
- return true
- }
- }
- c.nonblockReadBuf = b
- c.nonblockReadN = 0
- c.nonblockReadErr = nil
-
- err = c.rawConn.Read(c.nonblockReadFunc)
- n = c.nonblockReadN
- c.nonblockReadBuf = nil // ensure that no reference to b is kept.
- if err == nil && c.nonblockReadErr != nil {
- if errors.Is(c.nonblockReadErr, syscall.EWOULDBLOCK) {
- err = ErrWouldBlock
- } else {
- err = c.nonblockReadErr
- }
- }
- if err != nil {
- // n may be -1 when an error occurs.
- if n < 0 {
- n = 0
- }
-
- return n, err
- }
-
- // syscall read did not return an error and 0 bytes were read means EOF.
- if n == 0 {
- return 0, io.EOF
- }
-
- return n, nil
-}