diff options
Diffstat (limited to 'vendor/github.com/go-pg/pg/v10/internal/pool')
8 files changed, 1670 insertions, 0 deletions
diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/conn.go b/vendor/github.com/go-pg/pg/v10/internal/pool/conn.go new file mode 100644 index 000000000..91045245b --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/conn.go @@ -0,0 +1,158 @@ +package pool + +import ( + "context" + "net" + "strconv" + "sync/atomic" + "time" +) + +var noDeadline = time.Time{} + +type Conn struct { + netConn net.Conn + rd *ReaderContext + + ProcessID int32 + SecretKey int32 + lastID int64 + + createdAt time.Time + usedAt uint32 // atomic + pooled bool + Inited bool +} + +func NewConn(netConn net.Conn) *Conn { + cn := &Conn{ + createdAt: time.Now(), + } + cn.SetNetConn(netConn) + cn.SetUsedAt(time.Now()) + return cn +} + +func (cn *Conn) UsedAt() time.Time { + unix := atomic.LoadUint32(&cn.usedAt) + return time.Unix(int64(unix), 0) +} + +func (cn *Conn) SetUsedAt(tm time.Time) { + atomic.StoreUint32(&cn.usedAt, uint32(tm.Unix())) +} + +func (cn *Conn) RemoteAddr() net.Addr { + return cn.netConn.RemoteAddr() +} + +func (cn *Conn) SetNetConn(netConn net.Conn) { + cn.netConn = netConn + if cn.rd != nil { + cn.rd.Reset(netConn) + } +} + +func (cn *Conn) LockReader() { + if cn.rd != nil { + panic("not reached") + } + cn.rd = NewReaderContext() + cn.rd.Reset(cn.netConn) +} + +func (cn *Conn) NetConn() net.Conn { + return cn.netConn +} + +func (cn *Conn) NextID() string { + cn.lastID++ + return strconv.FormatInt(cn.lastID, 10) +} + +func (cn *Conn) WithReader( + ctx context.Context, timeout time.Duration, fn func(rd *ReaderContext) error, +) error { + if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { + return err + } + + rd := cn.rd + if rd == nil { + rd = GetReaderContext() + defer PutReaderContext(rd) + + rd.Reset(cn.netConn) + } + + rd.bytesRead = 0 + + if err := fn(rd); err != nil { + return err + } + + return nil +} + +func (cn *Conn) WithWriter( + ctx context.Context, timeout time.Duration, fn func(wb *WriteBuffer) error, +) error { + wb := GetWriteBuffer() + defer PutWriteBuffer(wb) + + if err := fn(wb); err != nil { + return err + } + + return cn.writeBuffer(ctx, timeout, wb) +} + +func (cn *Conn) WriteBuffer(ctx context.Context, timeout time.Duration, wb *WriteBuffer) error { + return cn.writeBuffer(ctx, timeout, wb) +} + +func (cn *Conn) writeBuffer( + ctx context.Context, + timeout time.Duration, + wb *WriteBuffer, +) error { + if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil { + return err + } + if _, err := cn.netConn.Write(wb.Bytes); err != nil { + return err + } + return nil +} + +func (cn *Conn) Close() error { + return cn.netConn.Close() +} + +func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time { + tm := time.Now() + cn.SetUsedAt(tm) + + if timeout > 0 { + tm = tm.Add(timeout) + } + + if ctx != nil { + deadline, ok := ctx.Deadline() + if ok { + if timeout == 0 { + return deadline + } + if deadline.Before(tm) { + return deadline + } + return tm + } + } + + if timeout > 0 { + return tm + } + + return noDeadline +} diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/pool.go b/vendor/github.com/go-pg/pg/v10/internal/pool/pool.go new file mode 100644 index 000000000..59f2c72d0 --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/pool.go @@ -0,0 +1,506 @@ +package pool + +import ( + "context" + "errors" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/go-pg/pg/v10/internal" +) + +var ( + ErrClosed = errors.New("pg: database is closed") + ErrPoolTimeout = errors.New("pg: connection pool timeout") +) + +var timers = sync.Pool{ + New: func() interface{} { + t := time.NewTimer(time.Hour) + t.Stop() + return t + }, +} + +// Stats contains pool state information and accumulated stats. +type Stats struct { + Hits uint32 // number of times free connection was found in the pool + Misses uint32 // number of times free connection was NOT found in the pool + Timeouts uint32 // number of times a wait timeout occurred + + TotalConns uint32 // number of total connections in the pool + IdleConns uint32 // number of idle connections in the pool + StaleConns uint32 // number of stale connections removed from the pool +} + +type Pooler interface { + NewConn(context.Context) (*Conn, error) + CloseConn(*Conn) error + + Get(context.Context) (*Conn, error) + Put(context.Context, *Conn) + Remove(context.Context, *Conn, error) + + Len() int + IdleLen() int + Stats() *Stats + + Close() error +} + +type Options struct { + Dialer func(context.Context) (net.Conn, error) + OnClose func(*Conn) error + + PoolSize int + MinIdleConns int + MaxConnAge time.Duration + PoolTimeout time.Duration + IdleTimeout time.Duration + IdleCheckFrequency time.Duration +} + +type ConnPool struct { + opt *Options + + dialErrorsNum uint32 // atomic + + _closed uint32 // atomic + + lastDialErrorMu sync.RWMutex + lastDialError error + + queue chan struct{} + + stats Stats + + connsMu sync.Mutex + conns []*Conn + idleConns []*Conn + + poolSize int + idleConnsLen int +} + +var _ Pooler = (*ConnPool)(nil) + +func NewConnPool(opt *Options) *ConnPool { + p := &ConnPool{ + opt: opt, + + queue: make(chan struct{}, opt.PoolSize), + conns: make([]*Conn, 0, opt.PoolSize), + idleConns: make([]*Conn, 0, opt.PoolSize), + } + + p.connsMu.Lock() + p.checkMinIdleConns() + p.connsMu.Unlock() + + if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { + go p.reaper(opt.IdleCheckFrequency) + } + + return p +} + +func (p *ConnPool) checkMinIdleConns() { + if p.opt.MinIdleConns == 0 { + return + } + for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns { + p.poolSize++ + p.idleConnsLen++ + go func() { + err := p.addIdleConn() + if err != nil { + p.connsMu.Lock() + p.poolSize-- + p.idleConnsLen-- + p.connsMu.Unlock() + } + }() + } +} + +func (p *ConnPool) addIdleConn() error { + cn, err := p.dialConn(context.TODO(), true) + if err != nil { + return err + } + + p.connsMu.Lock() + p.conns = append(p.conns, cn) + p.idleConns = append(p.idleConns, cn) + p.connsMu.Unlock() + return nil +} + +func (p *ConnPool) NewConn(c context.Context) (*Conn, error) { + return p.newConn(c, false) +} + +func (p *ConnPool) newConn(c context.Context, pooled bool) (*Conn, error) { + cn, err := p.dialConn(c, pooled) + if err != nil { + return nil, err + } + + p.connsMu.Lock() + + p.conns = append(p.conns, cn) + if pooled { + // If pool is full remove the cn on next Put. + if p.poolSize >= p.opt.PoolSize { + cn.pooled = false + } else { + p.poolSize++ + } + } + + p.connsMu.Unlock() + return cn, nil +} + +func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) { + if p.closed() { + return nil, ErrClosed + } + + if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { + return nil, p.getLastDialError() + } + + netConn, err := p.opt.Dialer(c) + if err != nil { + p.setLastDialError(err) + if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { + go p.tryDial() + } + return nil, err + } + + cn := NewConn(netConn) + cn.pooled = pooled + return cn, nil +} + +func (p *ConnPool) tryDial() { + for { + if p.closed() { + return + } + + conn, err := p.opt.Dialer(context.TODO()) + if err != nil { + p.setLastDialError(err) + time.Sleep(time.Second) + continue + } + + atomic.StoreUint32(&p.dialErrorsNum, 0) + _ = conn.Close() + return + } +} + +func (p *ConnPool) setLastDialError(err error) { + p.lastDialErrorMu.Lock() + p.lastDialError = err + p.lastDialErrorMu.Unlock() +} + +func (p *ConnPool) getLastDialError() error { + p.lastDialErrorMu.RLock() + err := p.lastDialError + p.lastDialErrorMu.RUnlock() + return err +} + +// Get returns existed connection from the pool or creates a new one. +func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { + if p.closed() { + return nil, ErrClosed + } + + err := p.waitTurn(ctx) + if err != nil { + return nil, err + } + + for { + p.connsMu.Lock() + cn := p.popIdle() + p.connsMu.Unlock() + + if cn == nil { + break + } + + if p.isStaleConn(cn) { + _ = p.CloseConn(cn) + continue + } + + atomic.AddUint32(&p.stats.Hits, 1) + return cn, nil + } + + atomic.AddUint32(&p.stats.Misses, 1) + + newcn, err := p.newConn(ctx, true) + if err != nil { + p.freeTurn() + return nil, err + } + + return newcn, nil +} + +func (p *ConnPool) getTurn() { + p.queue <- struct{}{} +} + +func (p *ConnPool) waitTurn(c context.Context) error { + select { + case <-c.Done(): + return c.Err() + default: + } + + select { + case p.queue <- struct{}{}: + return nil + default: + } + + timer := timers.Get().(*time.Timer) + timer.Reset(p.opt.PoolTimeout) + + select { + case <-c.Done(): + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return c.Err() + case p.queue <- struct{}{}: + if !timer.Stop() { + <-timer.C + } + timers.Put(timer) + return nil + case <-timer.C: + timers.Put(timer) + atomic.AddUint32(&p.stats.Timeouts, 1) + return ErrPoolTimeout + } +} + +func (p *ConnPool) freeTurn() { + <-p.queue +} + +func (p *ConnPool) popIdle() *Conn { + if len(p.idleConns) == 0 { + return nil + } + + idx := len(p.idleConns) - 1 + cn := p.idleConns[idx] + p.idleConns = p.idleConns[:idx] + p.idleConnsLen-- + p.checkMinIdleConns() + return cn +} + +func (p *ConnPool) Put(ctx context.Context, cn *Conn) { + if !cn.pooled { + p.Remove(ctx, cn, nil) + return + } + + p.connsMu.Lock() + p.idleConns = append(p.idleConns, cn) + p.idleConnsLen++ + p.connsMu.Unlock() + p.freeTurn() +} + +func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + p.removeConnWithLock(cn) + p.freeTurn() + _ = p.closeConn(cn) +} + +func (p *ConnPool) CloseConn(cn *Conn) error { + p.removeConnWithLock(cn) + return p.closeConn(cn) +} + +func (p *ConnPool) removeConnWithLock(cn *Conn) { + p.connsMu.Lock() + p.removeConn(cn) + p.connsMu.Unlock() +} + +func (p *ConnPool) removeConn(cn *Conn) { + for i, c := range p.conns { + if c == cn { + p.conns = append(p.conns[:i], p.conns[i+1:]...) + if cn.pooled { + p.poolSize-- + p.checkMinIdleConns() + } + return + } + } +} + +func (p *ConnPool) closeConn(cn *Conn) error { + if p.opt.OnClose != nil { + _ = p.opt.OnClose(cn) + } + return cn.Close() +} + +// Len returns total number of connections. +func (p *ConnPool) Len() int { + p.connsMu.Lock() + n := len(p.conns) + p.connsMu.Unlock() + return n +} + +// IdleLen returns number of idle connections. +func (p *ConnPool) IdleLen() int { + p.connsMu.Lock() + n := p.idleConnsLen + p.connsMu.Unlock() + return n +} + +func (p *ConnPool) Stats() *Stats { + idleLen := p.IdleLen() + return &Stats{ + Hits: atomic.LoadUint32(&p.stats.Hits), + Misses: atomic.LoadUint32(&p.stats.Misses), + Timeouts: atomic.LoadUint32(&p.stats.Timeouts), + + TotalConns: uint32(p.Len()), + IdleConns: uint32(idleLen), + StaleConns: atomic.LoadUint32(&p.stats.StaleConns), + } +} + +func (p *ConnPool) closed() bool { + return atomic.LoadUint32(&p._closed) == 1 +} + +func (p *ConnPool) Filter(fn func(*Conn) bool) error { + var firstErr error + p.connsMu.Lock() + for _, cn := range p.conns { + if fn(cn) { + if err := p.closeConn(cn); err != nil && firstErr == nil { + firstErr = err + } + } + } + p.connsMu.Unlock() + return firstErr +} + +func (p *ConnPool) Close() error { + if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) { + return ErrClosed + } + + var firstErr error + p.connsMu.Lock() + for _, cn := range p.conns { + if err := p.closeConn(cn); err != nil && firstErr == nil { + firstErr = err + } + } + p.conns = nil + p.poolSize = 0 + p.idleConns = nil + p.idleConnsLen = 0 + p.connsMu.Unlock() + + return firstErr +} + +func (p *ConnPool) reaper(frequency time.Duration) { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + + for range ticker.C { + if p.closed() { + break + } + n, err := p.ReapStaleConns() + if err != nil { + internal.Logger.Printf(context.TODO(), "ReapStaleConns failed: %s", err) + continue + } + atomic.AddUint32(&p.stats.StaleConns, uint32(n)) + } +} + +func (p *ConnPool) ReapStaleConns() (int, error) { + var n int + for { + p.getTurn() + + p.connsMu.Lock() + cn := p.reapStaleConn() + p.connsMu.Unlock() + + p.freeTurn() + + if cn != nil { + _ = p.closeConn(cn) + n++ + } else { + break + } + } + return n, nil +} + +func (p *ConnPool) reapStaleConn() *Conn { + if len(p.idleConns) == 0 { + return nil + } + + cn := p.idleConns[0] + if !p.isStaleConn(cn) { + return nil + } + + p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) + p.idleConnsLen-- + p.removeConn(cn) + + return cn +} + +func (p *ConnPool) isStaleConn(cn *Conn) bool { + if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { + return false + } + + now := time.Now() + if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { + return true + } + if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { + return true + } + + return false +} diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/pool_single.go b/vendor/github.com/go-pg/pg/v10/internal/pool/pool_single.go new file mode 100644 index 000000000..5a3fde191 --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/pool_single.go @@ -0,0 +1,58 @@ +package pool + +import "context" + +type SingleConnPool struct { + pool Pooler + cn *Conn + stickyErr error +} + +var _ Pooler = (*SingleConnPool)(nil) + +func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool { + return &SingleConnPool{ + pool: pool, + cn: cn, + } +} + +func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error) { + return p.pool.NewConn(ctx) +} + +func (p *SingleConnPool) CloseConn(cn *Conn) error { + return p.pool.CloseConn(cn) +} + +func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) { + if p.stickyErr != nil { + return nil, p.stickyErr + } + return p.cn, nil +} + +func (p *SingleConnPool) Put(ctx context.Context, cn *Conn) {} + +func (p *SingleConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + p.cn = nil + p.stickyErr = reason +} + +func (p *SingleConnPool) Close() error { + p.cn = nil + p.stickyErr = ErrClosed + return nil +} + +func (p *SingleConnPool) Len() int { + return 0 +} + +func (p *SingleConnPool) IdleLen() int { + return 0 +} + +func (p *SingleConnPool) Stats() *Stats { + return &Stats{} +} diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/pool_sticky.go b/vendor/github.com/go-pg/pg/v10/internal/pool/pool_sticky.go new file mode 100644 index 000000000..0415b5e87 --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/pool_sticky.go @@ -0,0 +1,202 @@ +package pool + +import ( + "context" + "errors" + "fmt" + "sync/atomic" +) + +const ( + stateDefault = 0 + stateInited = 1 + stateClosed = 2 +) + +type BadConnError struct { + wrapped error +} + +var _ error = (*BadConnError)(nil) + +func (e BadConnError) Error() string { + s := "pg: Conn is in a bad state" + if e.wrapped != nil { + s += ": " + e.wrapped.Error() + } + return s +} + +func (e BadConnError) Unwrap() error { + return e.wrapped +} + +//------------------------------------------------------------------------------ + +type StickyConnPool struct { + pool Pooler + shared int32 // atomic + + state uint32 // atomic + ch chan *Conn + + _badConnError atomic.Value +} + +var _ Pooler = (*StickyConnPool)(nil) + +func NewStickyConnPool(pool Pooler) *StickyConnPool { + p, ok := pool.(*StickyConnPool) + if !ok { + p = &StickyConnPool{ + pool: pool, + ch: make(chan *Conn, 1), + } + } + atomic.AddInt32(&p.shared, 1) + return p +} + +func (p *StickyConnPool) NewConn(ctx context.Context) (*Conn, error) { + return p.pool.NewConn(ctx) +} + +func (p *StickyConnPool) CloseConn(cn *Conn) error { + return p.pool.CloseConn(cn) +} + +func (p *StickyConnPool) Get(ctx context.Context) (*Conn, error) { + // In worst case this races with Close which is not a very common operation. + for i := 0; i < 1000; i++ { + switch atomic.LoadUint32(&p.state) { + case stateDefault: + cn, err := p.pool.Get(ctx) + if err != nil { + return nil, err + } + if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { + return cn, nil + } + p.pool.Remove(ctx, cn, ErrClosed) + case stateInited: + if err := p.badConnError(); err != nil { + return nil, err + } + cn, ok := <-p.ch + if !ok { + return nil, ErrClosed + } + return cn, nil + case stateClosed: + return nil, ErrClosed + default: + panic("not reached") + } + } + return nil, fmt.Errorf("pg: StickyConnPool.Get: infinite loop") +} + +func (p *StickyConnPool) Put(ctx context.Context, cn *Conn) { + defer func() { + if recover() != nil { + p.freeConn(ctx, cn) + } + }() + p.ch <- cn +} + +func (p *StickyConnPool) freeConn(ctx context.Context, cn *Conn) { + if err := p.badConnError(); err != nil { + p.pool.Remove(ctx, cn, err) + } else { + p.pool.Put(ctx, cn) + } +} + +func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + defer func() { + if recover() != nil { + p.pool.Remove(ctx, cn, ErrClosed) + } + }() + p._badConnError.Store(BadConnError{wrapped: reason}) + p.ch <- cn +} + +func (p *StickyConnPool) Close() error { + if shared := atomic.AddInt32(&p.shared, -1); shared > 0 { + return nil + } + + for i := 0; i < 1000; i++ { + state := atomic.LoadUint32(&p.state) + if state == stateClosed { + return ErrClosed + } + if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) { + close(p.ch) + cn, ok := <-p.ch + if ok { + p.freeConn(context.TODO(), cn) + } + return nil + } + } + + return errors.New("pg: StickyConnPool.Close: infinite loop") +} + +func (p *StickyConnPool) Reset(ctx context.Context) error { + if p.badConnError() == nil { + return nil + } + + select { + case cn, ok := <-p.ch: + if !ok { + return ErrClosed + } + p.pool.Remove(ctx, cn, ErrClosed) + p._badConnError.Store(BadConnError{wrapped: nil}) + default: + return errors.New("pg: StickyConnPool does not have a Conn") + } + + if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) { + state := atomic.LoadUint32(&p.state) + return fmt.Errorf("pg: invalid StickyConnPool state: %d", state) + } + + return nil +} + +func (p *StickyConnPool) badConnError() error { + if v := p._badConnError.Load(); v != nil { + err := v.(BadConnError) + if err.wrapped != nil { + return err + } + } + return nil +} + +func (p *StickyConnPool) Len() int { + switch atomic.LoadUint32(&p.state) { + case stateDefault: + return 0 + case stateInited: + return 1 + case stateClosed: + return 0 + default: + panic("not reached") + } +} + +func (p *StickyConnPool) IdleLen() int { + return len(p.ch) +} + +func (p *StickyConnPool) Stats() *Stats { + return &Stats{} +} diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/reader.go b/vendor/github.com/go-pg/pg/v10/internal/pool/reader.go new file mode 100644 index 000000000..b5d00807d --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/reader.go @@ -0,0 +1,80 @@ +package pool + +import ( + "sync" +) + +type Reader interface { + Buffered() int + + Bytes() []byte + Read([]byte) (int, error) + ReadByte() (byte, error) + UnreadByte() error + ReadSlice(byte) ([]byte, error) + Discard(int) (int, error) + + // ReadBytes(fn func(byte) bool) ([]byte, error) + // ReadN(int) ([]byte, error) + ReadFull() ([]byte, error) + ReadFullTemp() ([]byte, error) +} + +type ColumnInfo struct { + Index int16 + DataType int32 + Name string +} + +type ColumnAlloc struct { + columns []ColumnInfo +} + +func NewColumnAlloc() *ColumnAlloc { + return new(ColumnAlloc) +} + +func (c *ColumnAlloc) Reset() { + c.columns = c.columns[:0] +} + +func (c *ColumnAlloc) New(index int16, name []byte) *ColumnInfo { + c.columns = append(c.columns, ColumnInfo{ + Index: index, + Name: string(name), + }) + return &c.columns[len(c.columns)-1] +} + +func (c *ColumnAlloc) Columns() []ColumnInfo { + return c.columns +} + +type ReaderContext struct { + *BufReader + ColumnAlloc *ColumnAlloc +} + +func NewReaderContext() *ReaderContext { + const bufSize = 1 << 20 // 1mb + return &ReaderContext{ + BufReader: NewBufReader(bufSize), + ColumnAlloc: NewColumnAlloc(), + } +} + +var readerPool = sync.Pool{ + New: func() interface{} { + return NewReaderContext() + }, +} + +func GetReaderContext() *ReaderContext { + rd := readerPool.Get().(*ReaderContext) + return rd +} + +func PutReaderContext(rd *ReaderContext) { + rd.ColumnAlloc.Reset() + readerPool.Put(rd) +} diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/reader_buf.go b/vendor/github.com/go-pg/pg/v10/internal/pool/reader_buf.go new file mode 100644 index 000000000..3172e8b05 --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/reader_buf.go @@ -0,0 +1,431 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pool + +import ( + "bufio" + "bytes" + "io" +) + +type BufReader struct { + rd io.Reader // reader provided by the client + + buf []byte + r, w int // buf read and write positions + lastByte int + bytesRead int64 + err error + + available int // bytes available for reading + brd BytesReader // reusable bytes reader +} + +func NewBufReader(bufSize int) *BufReader { + return &BufReader{ + buf: make([]byte, bufSize), + available: -1, + } +} + +func (b *BufReader) BytesReader(n int) *BytesReader { + if n == -1 { + n = 0 + } + buf := b.buf[b.r : b.r+n] + b.r += n + b.brd.Reset(buf) + return &b.brd +} + +func (b *BufReader) SetAvailable(n int) { + b.available = n +} + +func (b *BufReader) Available() int { + return b.available +} + +func (b *BufReader) changeAvailable(n int) { + if b.available != -1 { + b.available += n + } +} + +func (b *BufReader) Reset(rd io.Reader) { + b.rd = rd + b.r, b.w = 0, 0 + b.err = nil +} + +// Buffered returns the number of bytes that can be read from the current buffer. +func (b *BufReader) Buffered() int { + buffered := b.w - b.r + if b.available == -1 || buffered <= b.available { + return buffered + } + return b.available +} + +func (b *BufReader) Bytes() []byte { + if b.available == -1 { + return b.buf[b.r:b.w] + } + w := b.r + b.available + if w > b.w { + w = b.w + } + return b.buf[b.r:w] +} + +func (b *BufReader) flush() []byte { + if b.available == -1 { + buf := b.buf[b.r:b.w] + b.r = b.w + return buf + } + + w := b.r + b.available + if w > b.w { + w = b.w + } + buf := b.buf[b.r:w] + b.r = w + b.changeAvailable(-len(buf)) + return buf +} + +// fill reads a new chunk into the buffer. +func (b *BufReader) fill() { + // Slide existing data to beginning. + if b.r > 0 { + copy(b.buf, b.buf[b.r:b.w]) + b.w -= b.r + b.r = 0 + } + + if b.w >= len(b.buf) { + panic("bufio: tried to fill full buffer") + } + if b.available == 0 { + b.err = io.EOF + return + } + + // Read new data: try a limited number of times. + const maxConsecutiveEmptyReads = 100 + for i := maxConsecutiveEmptyReads; i > 0; i-- { + n, err := b.read(b.buf[b.w:]) + b.w += n + if err != nil { + b.err = err + return + } + if n > 0 { + return + } + } + b.err = io.ErrNoProgress +} + +func (b *BufReader) readErr() error { + err := b.err + b.err = nil + return err +} + +func (b *BufReader) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, b.readErr() + } + + if b.available != -1 { + if b.available == 0 { + return 0, io.EOF + } + if len(p) > b.available { + p = p[:b.available] + } + } + + if b.r == b.w { + if b.err != nil { + return 0, b.readErr() + } + + if len(p) >= len(b.buf) { + // Large read, empty buffer. + // Read directly into p to avoid copy. + n, err = b.read(p) + if n > 0 { + b.changeAvailable(-n) + b.lastByte = int(p[n-1]) + } + return n, err + } + + // One read. + // Do not use b.fill, which will loop. + b.r = 0 + b.w = 0 + n, b.err = b.read(b.buf) + if n == 0 { + return 0, b.readErr() + } + b.w += n + } + + // copy as much as we can + n = copy(p, b.Bytes()) + b.r += n + b.changeAvailable(-n) + b.lastByte = int(b.buf[b.r-1]) + return n, nil +} + +// ReadSlice reads until the first occurrence of delim in the input, +// returning a slice pointing at the bytes in the buffer. +// The bytes stop being valid at the next read. +// If ReadSlice encounters an error before finding a delimiter, +// it returns all the data in the buffer and the error itself (often io.EOF). +// ReadSlice fails with error ErrBufferFull if the buffer fills without a delim. +// Because the data returned from ReadSlice will be overwritten +// by the next I/O operation, most clients should use +// ReadBytes or ReadString instead. +// ReadSlice returns err != nil if and only if line does not end in delim. +func (b *BufReader) ReadSlice(delim byte) (line []byte, err error) { + for { + // Search buffer. + if i := bytes.IndexByte(b.Bytes(), delim); i >= 0 { + i++ + line = b.buf[b.r : b.r+i] + b.r += i + b.changeAvailable(-i) + break + } + + // Pending error? + if b.err != nil { + line = b.flush() + err = b.readErr() + break + } + + buffered := b.Buffered() + + // Out of available. + if b.available != -1 && buffered >= b.available { + line = b.flush() + err = io.EOF + break + } + + // Buffer full? + if buffered >= len(b.buf) { + line = b.flush() + err = bufio.ErrBufferFull + break + } + + b.fill() // buffer is not full + } + + // Handle last byte, if any. + if i := len(line) - 1; i >= 0 { + b.lastByte = int(line[i]) + } + + return line, err +} + +func (b *BufReader) ReadBytes(fn func(byte) bool) (line []byte, err error) { + for { + for i, c := range b.Bytes() { + if !fn(c) { + i-- + line = b.buf[b.r : b.r+i] //nolint + b.r += i + b.changeAvailable(-i) + break + } + } + + // Pending error? + if b.err != nil { + line = b.flush() + err = b.readErr() + break + } + + buffered := b.Buffered() + + // Out of available. + if b.available != -1 && buffered >= b.available { + line = b.flush() + err = io.EOF + break + } + + // Buffer full? + if buffered >= len(b.buf) { + line = b.flush() + err = bufio.ErrBufferFull + break + } + + b.fill() // buffer is not full + } + + // Handle last byte, if any. + if i := len(line) - 1; i >= 0 { + b.lastByte = int(line[i]) + } + + return line, err +} + +func (b *BufReader) ReadByte() (byte, error) { + if b.available == 0 { + return 0, io.EOF + } + for b.r == b.w { + if b.err != nil { + return 0, b.readErr() + } + b.fill() // buffer is empty + } + c := b.buf[b.r] + b.r++ + b.lastByte = int(c) + b.changeAvailable(-1) + return c, nil +} + +func (b *BufReader) UnreadByte() error { + if b.lastByte < 0 || b.r == 0 && b.w > 0 { + return bufio.ErrInvalidUnreadByte + } + // b.r > 0 || b.w == 0 + if b.r > 0 { + b.r-- + } else { + // b.r == 0 && b.w == 0 + b.w = 1 + } + b.buf[b.r] = byte(b.lastByte) + b.lastByte = -1 + b.changeAvailable(+1) + return nil +} + +// Discard skips the next n bytes, returning the number of bytes discarded. +// +// If Discard skips fewer than n bytes, it also returns an error. +// If 0 <= n <= b.Buffered(), Discard is guaranteed to succeed without +// reading from the underlying io.BufReader. +func (b *BufReader) Discard(n int) (discarded int, err error) { + if n < 0 { + return 0, bufio.ErrNegativeCount + } + if n == 0 { + return + } + remain := n + for { + skip := b.Buffered() + if skip == 0 { + b.fill() + skip = b.Buffered() + } + if skip > remain { + skip = remain + } + b.r += skip + b.changeAvailable(-skip) + remain -= skip + if remain == 0 { + return n, nil + } + if b.err != nil { + return n - remain, b.readErr() + } + } +} + +func (b *BufReader) ReadN(n int) (line []byte, err error) { + if n < 0 { + return nil, bufio.ErrNegativeCount + } + if n == 0 { + return + } + + nn := n + if b.available != -1 && nn > b.available { + nn = b.available + } + + for { + buffered := b.Buffered() + + if buffered >= nn { + line = b.buf[b.r : b.r+nn] + b.r += nn + b.changeAvailable(-nn) + if n > nn { + err = io.EOF + } + break + } + + // Pending error? + if b.err != nil { + line = b.flush() + err = b.readErr() + break + } + + // Buffer full? + if buffered >= len(b.buf) { + line = b.flush() + err = bufio.ErrBufferFull + break + } + + b.fill() // buffer is not full + } + + // Handle last byte, if any. + if i := len(line) - 1; i >= 0 { + b.lastByte = int(line[i]) + } + + return line, err +} + +func (b *BufReader) ReadFull() ([]byte, error) { + if b.available == -1 { + panic("not reached") + } + buf := make([]byte, b.available) + _, err := io.ReadFull(b, buf) + return buf, err +} + +func (b *BufReader) ReadFullTemp() ([]byte, error) { + if b.available == -1 { + panic("not reached") + } + if b.available <= len(b.buf) { + return b.ReadN(b.available) + } + return b.ReadFull() +} + +func (b *BufReader) read(buf []byte) (int, error) { + n, err := b.rd.Read(buf) + b.bytesRead += int64(n) + return n, err +} diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/reader_bytes.go b/vendor/github.com/go-pg/pg/v10/internal/pool/reader_bytes.go new file mode 100644 index 000000000..93646b1da --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/reader_bytes.go @@ -0,0 +1,121 @@ +// Copyright 2012 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pool + +import ( + "bytes" + "errors" + "io" +) + +type BytesReader struct { + s []byte + i int +} + +func NewBytesReader(b []byte) *BytesReader { + return &BytesReader{ + s: b, + } +} + +func (r *BytesReader) Reset(b []byte) { + r.s = b + r.i = 0 +} + +func (r *BytesReader) Buffered() int { + return len(r.s) - r.i +} + +func (r *BytesReader) Bytes() []byte { + return r.s[r.i:] +} + +func (r *BytesReader) Read(b []byte) (n int, err error) { + if r.i >= len(r.s) { + return 0, io.EOF + } + n = copy(b, r.s[r.i:]) + r.i += n + return +} + +func (r *BytesReader) ReadByte() (byte, error) { + if r.i >= len(r.s) { + return 0, io.EOF + } + b := r.s[r.i] + r.i++ + return b, nil +} + +func (r *BytesReader) UnreadByte() error { + if r.i <= 0 { + return errors.New("UnreadByte: at beginning of slice") + } + r.i-- + return nil +} + +func (r *BytesReader) ReadSlice(delim byte) ([]byte, error) { + if i := bytes.IndexByte(r.s[r.i:], delim); i >= 0 { + i++ + line := r.s[r.i : r.i+i] + r.i += i + return line, nil + } + + line := r.s[r.i:] + r.i = len(r.s) + return line, io.EOF +} + +func (r *BytesReader) ReadBytes(fn func(byte) bool) ([]byte, error) { + for i, c := range r.s[r.i:] { + if !fn(c) { + i++ + line := r.s[r.i : r.i+i] + r.i += i + return line, nil + } + } + + line := r.s[r.i:] + r.i = len(r.s) + return line, io.EOF +} + +func (r *BytesReader) Discard(n int) (int, error) { + b, err := r.ReadN(n) + return len(b), err +} + +func (r *BytesReader) ReadN(n int) ([]byte, error) { + nn := n + if nn > len(r.s) { + nn = len(r.s) + } + + b := r.s[r.i : r.i+nn] + r.i += nn + if n > nn { + return b, io.EOF + } + return b, nil +} + +func (r *BytesReader) ReadFull() ([]byte, error) { + b := make([]byte, len(r.s)-r.i) + copy(b, r.s[r.i:]) + r.i = len(r.s) + return b, nil +} + +func (r *BytesReader) ReadFullTemp() ([]byte, error) { + b := r.s[r.i:] + r.i = len(r.s) + return b, nil +} diff --git a/vendor/github.com/go-pg/pg/v10/internal/pool/write_buffer.go b/vendor/github.com/go-pg/pg/v10/internal/pool/write_buffer.go new file mode 100644 index 000000000..6981d3f4c --- /dev/null +++ b/vendor/github.com/go-pg/pg/v10/internal/pool/write_buffer.go @@ -0,0 +1,114 @@ +package pool + +import ( + "encoding/binary" + "io" + "sync" +) + +const defaultBufSize = 65 << 10 // 65kb + +var wbPool = sync.Pool{ + New: func() interface{} { + return NewWriteBuffer() + }, +} + +func GetWriteBuffer() *WriteBuffer { + wb := wbPool.Get().(*WriteBuffer) + return wb +} + +func PutWriteBuffer(wb *WriteBuffer) { + wb.Reset() + wbPool.Put(wb) +} + +type WriteBuffer struct { + Bytes []byte + + msgStart int + paramStart int +} + +func NewWriteBuffer() *WriteBuffer { + return &WriteBuffer{ + Bytes: make([]byte, 0, defaultBufSize), + } +} + +func (buf *WriteBuffer) Reset() { + buf.Bytes = buf.Bytes[:0] +} + +func (buf *WriteBuffer) StartMessage(c byte) { + if c == 0 { + buf.msgStart = len(buf.Bytes) + buf.Bytes = append(buf.Bytes, 0, 0, 0, 0) + } else { + buf.msgStart = len(buf.Bytes) + 1 + buf.Bytes = append(buf.Bytes, c, 0, 0, 0, 0) + } +} + +func (buf *WriteBuffer) FinishMessage() { + binary.BigEndian.PutUint32( + buf.Bytes[buf.msgStart:], uint32(len(buf.Bytes)-buf.msgStart)) +} + +func (buf *WriteBuffer) Query() []byte { + return buf.Bytes[buf.msgStart+4 : len(buf.Bytes)-1] +} + +func (buf *WriteBuffer) StartParam() { + buf.paramStart = len(buf.Bytes) + buf.Bytes = append(buf.Bytes, 0, 0, 0, 0) +} + +func (buf *WriteBuffer) FinishParam() { + binary.BigEndian.PutUint32( + buf.Bytes[buf.paramStart:], uint32(len(buf.Bytes)-buf.paramStart-4)) +} + +var nullParamLength = int32(-1) + +func (buf *WriteBuffer) FinishNullParam() { + binary.BigEndian.PutUint32( + buf.Bytes[buf.paramStart:], uint32(nullParamLength)) +} + +func (buf *WriteBuffer) Write(b []byte) (int, error) { + buf.Bytes = append(buf.Bytes, b...) + return len(b), nil +} + +func (buf *WriteBuffer) WriteInt16(num int16) { + buf.Bytes = append(buf.Bytes, 0, 0) + binary.BigEndian.PutUint16(buf.Bytes[len(buf.Bytes)-2:], uint16(num)) +} + +func (buf *WriteBuffer) WriteInt32(num int32) { + buf.Bytes = append(buf.Bytes, 0, 0, 0, 0) + binary.BigEndian.PutUint32(buf.Bytes[len(buf.Bytes)-4:], uint32(num)) +} + +func (buf *WriteBuffer) WriteString(s string) { + buf.Bytes = append(buf.Bytes, s...) + buf.Bytes = append(buf.Bytes, 0) +} + +func (buf *WriteBuffer) WriteBytes(b []byte) { + buf.Bytes = append(buf.Bytes, b...) + buf.Bytes = append(buf.Bytes, 0) +} + +func (buf *WriteBuffer) WriteByte(c byte) error { + buf.Bytes = append(buf.Bytes, c) + return nil +} + +func (buf *WriteBuffer) ReadFrom(r io.Reader) (int64, error) { + n, err := r.Read(buf.Bytes[len(buf.Bytes):cap(buf.Bytes)]) + buf.Bytes = buf.Bytes[:len(buf.Bytes)+n] + return int64(n), err +} |
