summaryrefslogtreecommitdiff
path: root/vendor/github.com/go-pg/pg/v10/base.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/go-pg/pg/v10/base.go')
-rw-r--r--vendor/github.com/go-pg/pg/v10/base.go618
1 files changed, 0 insertions, 618 deletions
diff --git a/vendor/github.com/go-pg/pg/v10/base.go b/vendor/github.com/go-pg/pg/v10/base.go
deleted file mode 100644
index d13997464..000000000
--- a/vendor/github.com/go-pg/pg/v10/base.go
+++ /dev/null
@@ -1,618 +0,0 @@
-package pg
-
-import (
- "context"
- "io"
- "time"
-
- "github.com/go-pg/pg/v10/internal"
- "github.com/go-pg/pg/v10/internal/pool"
- "github.com/go-pg/pg/v10/orm"
- "github.com/go-pg/pg/v10/types"
-)
-
-type baseDB struct {
- db orm.DB
- opt *Options
- pool pool.Pooler
-
- fmter *orm.Formatter
- queryHooks []QueryHook
-}
-
-// PoolStats contains the stats of a connection pool.
-type PoolStats pool.Stats
-
-// PoolStats returns connection pool stats.
-func (db *baseDB) PoolStats() *PoolStats {
- stats := db.pool.Stats()
- return (*PoolStats)(stats)
-}
-
-func (db *baseDB) clone() *baseDB {
- return &baseDB{
- db: db.db,
- opt: db.opt,
- pool: db.pool,
-
- fmter: db.fmter,
- queryHooks: copyQueryHooks(db.queryHooks),
- }
-}
-
-func (db *baseDB) withPool(p pool.Pooler) *baseDB {
- cp := db.clone()
- cp.pool = p
- return cp
-}
-
-func (db *baseDB) WithTimeout(d time.Duration) *baseDB {
- newopt := *db.opt
- newopt.ReadTimeout = d
- newopt.WriteTimeout = d
-
- cp := db.clone()
- cp.opt = &newopt
- return cp
-}
-
-func (db *baseDB) WithParam(param string, value interface{}) *baseDB {
- cp := db.clone()
- cp.fmter = db.fmter.WithParam(param, value)
- return cp
-}
-
-// Param returns value for the param.
-func (db *baseDB) Param(param string) interface{} {
- return db.fmter.Param(param)
-}
-
-func (db *baseDB) retryBackoff(retry int) time.Duration {
- return internal.RetryBackoff(retry, db.opt.MinRetryBackoff, db.opt.MaxRetryBackoff)
-}
-
-func (db *baseDB) getConn(ctx context.Context) (*pool.Conn, error) {
- cn, err := db.pool.Get(ctx)
- if err != nil {
- return nil, err
- }
-
- if cn.Inited {
- return cn, nil
- }
-
- if err := db.initConn(ctx, cn); err != nil {
- db.pool.Remove(ctx, cn, err)
- // It is safe to reset StickyConnPool if conn can't be initialized.
- if p, ok := db.pool.(*pool.StickyConnPool); ok {
- _ = p.Reset(ctx)
- }
- if err := internal.Unwrap(err); err != nil {
- return nil, err
- }
- return nil, err
- }
-
- return cn, nil
-}
-
-func (db *baseDB) initConn(ctx context.Context, cn *pool.Conn) error {
- if cn.Inited {
- return nil
- }
- cn.Inited = true
-
- if db.opt.TLSConfig != nil {
- err := db.enableSSL(ctx, cn, db.opt.TLSConfig)
- if err != nil {
- return err
- }
- }
-
- err := db.startup(ctx, cn, db.opt.User, db.opt.Password, db.opt.Database, db.opt.ApplicationName)
- if err != nil {
- return err
- }
-
- if db.opt.OnConnect != nil {
- p := pool.NewSingleConnPool(db.pool, cn)
- return db.opt.OnConnect(ctx, newConn(ctx, db.withPool(p)))
- }
-
- return nil
-}
-
-func (db *baseDB) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
- if isBadConn(err, false) {
- db.pool.Remove(ctx, cn, err)
- } else {
- db.pool.Put(ctx, cn)
- }
-}
-
-func (db *baseDB) withConn(
- ctx context.Context, fn func(context.Context, *pool.Conn) error,
-) error {
- cn, err := db.getConn(ctx)
- if err != nil {
- return err
- }
-
- var fnDone chan struct{}
- if ctx != nil && ctx.Done() != nil {
- fnDone = make(chan struct{})
- go func() {
- select {
- case <-fnDone: // fn has finished, skip cancel
- case <-ctx.Done():
- err := db.cancelRequest(cn.ProcessID, cn.SecretKey)
- if err != nil {
- internal.Logger.Printf(ctx, "cancelRequest failed: %s", err)
- }
- // Signal end of conn use.
- fnDone <- struct{}{}
- }
- }()
- }
-
- defer func() {
- if fnDone == nil {
- db.releaseConn(ctx, cn, err)
- return
- }
-
- select {
- case <-fnDone: // wait for cancel to finish request
- // Looks like the canceled connection must be always removed from the pool.
- db.pool.Remove(ctx, cn, err)
- case fnDone <- struct{}{}: // signal fn finish, skip cancel goroutine
- db.releaseConn(ctx, cn, err)
- }
- }()
-
- err = fn(ctx, cn)
- return err
-}
-
-func (db *baseDB) shouldRetry(err error) bool {
- switch err {
- case io.EOF, io.ErrUnexpectedEOF:
- return true
- case nil, context.Canceled, context.DeadlineExceeded:
- return false
- }
-
- if pgerr, ok := err.(Error); ok {
- switch pgerr.Field('C') {
- case "40001", // serialization_failure
- "53300", // too_many_connections
- "55000": // attempted to delete invisible tuple
- return true
- case "57014": // statement_timeout
- return db.opt.RetryStatementTimeout
- default:
- return false
- }
- }
-
- if _, ok := err.(timeoutError); ok {
- return true
- }
-
- return false
-}
-
-// Close closes the database client, releasing any open resources.
-//
-// It is rare to Close a DB, as the DB handle is meant to be
-// long-lived and shared between many goroutines.
-func (db *baseDB) Close() error {
- return db.pool.Close()
-}
-
-// Exec executes a query ignoring returned rows. The params are for any
-// placeholders in the query.
-func (db *baseDB) Exec(query interface{}, params ...interface{}) (res Result, err error) {
- return db.exec(db.db.Context(), query, params...)
-}
-
-func (db *baseDB) ExecContext(c context.Context, query interface{}, params ...interface{}) (Result, error) {
- return db.exec(c, query, params...)
-}
-
-func (db *baseDB) exec(ctx context.Context, query interface{}, params ...interface{}) (Result, error) {
- wb := pool.GetWriteBuffer()
- defer pool.PutWriteBuffer(wb)
-
- if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
- return nil, err
- }
-
- ctx, evt, err := db.beforeQuery(ctx, db.db, nil, query, params, wb.Query())
- if err != nil {
- return nil, err
- }
-
- var res Result
- var lastErr error
- for attempt := 0; attempt <= db.opt.MaxRetries; attempt++ {
- if attempt > 0 {
- if err := internal.Sleep(ctx, db.retryBackoff(attempt-1)); err != nil {
- return nil, err
- }
- }
-
- lastErr = db.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
- res, err = db.simpleQuery(ctx, cn, wb)
- return err
- })
- if !db.shouldRetry(lastErr) {
- break
- }
- }
-
- if err := db.afterQuery(ctx, evt, res, lastErr); err != nil {
- return nil, err
- }
- return res, lastErr
-}
-
-// ExecOne acts like Exec, but query must affect only one row. It
-// returns ErrNoRows error when query returns zero rows or
-// ErrMultiRows when query returns multiple rows.
-func (db *baseDB) ExecOne(query interface{}, params ...interface{}) (Result, error) {
- return db.execOne(db.db.Context(), query, params...)
-}
-
-func (db *baseDB) ExecOneContext(ctx context.Context, query interface{}, params ...interface{}) (Result, error) {
- return db.execOne(ctx, query, params...)
-}
-
-func (db *baseDB) execOne(c context.Context, query interface{}, params ...interface{}) (Result, error) {
- res, err := db.ExecContext(c, query, params...)
- if err != nil {
- return nil, err
- }
-
- if err := internal.AssertOneRow(res.RowsAffected()); err != nil {
- return nil, err
- }
- return res, nil
-}
-
-// Query executes a query that returns rows, typically a SELECT.
-// The params are for any placeholders in the query.
-func (db *baseDB) Query(model, query interface{}, params ...interface{}) (res Result, err error) {
- return db.query(db.db.Context(), model, query, params...)
-}
-
-func (db *baseDB) QueryContext(c context.Context, model, query interface{}, params ...interface{}) (Result, error) {
- return db.query(c, model, query, params...)
-}
-
-func (db *baseDB) query(ctx context.Context, model, query interface{}, params ...interface{}) (Result, error) {
- wb := pool.GetWriteBuffer()
- defer pool.PutWriteBuffer(wb)
-
- if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
- return nil, err
- }
-
- ctx, evt, err := db.beforeQuery(ctx, db.db, model, query, params, wb.Query())
- if err != nil {
- return nil, err
- }
-
- var res Result
- var lastErr error
- for attempt := 0; attempt <= db.opt.MaxRetries; attempt++ {
- if attempt > 0 {
- if err := internal.Sleep(ctx, db.retryBackoff(attempt-1)); err != nil {
- return nil, err
- }
- }
-
- lastErr = db.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
- res, err = db.simpleQueryData(ctx, cn, model, wb)
- return err
- })
- if !db.shouldRetry(lastErr) {
- break
- }
- }
-
- if err := db.afterQuery(ctx, evt, res, lastErr); err != nil {
- return nil, err
- }
- return res, lastErr
-}
-
-// QueryOne acts like Query, but query must return only one row. It
-// returns ErrNoRows error when query returns zero rows or
-// ErrMultiRows when query returns multiple rows.
-func (db *baseDB) QueryOne(model, query interface{}, params ...interface{}) (Result, error) {
- return db.queryOne(db.db.Context(), model, query, params...)
-}
-
-func (db *baseDB) QueryOneContext(
- ctx context.Context, model, query interface{}, params ...interface{},
-) (Result, error) {
- return db.queryOne(ctx, model, query, params...)
-}
-
-func (db *baseDB) queryOne(ctx context.Context, model, query interface{}, params ...interface{}) (Result, error) {
- res, err := db.QueryContext(ctx, model, query, params...)
- if err != nil {
- return nil, err
- }
-
- if err := internal.AssertOneRow(res.RowsAffected()); err != nil {
- return nil, err
- }
- return res, nil
-}
-
-// CopyFrom copies data from reader to a table.
-func (db *baseDB) CopyFrom(r io.Reader, query interface{}, params ...interface{}) (res Result, err error) {
- c := db.db.Context()
- err = db.withConn(c, func(c context.Context, cn *pool.Conn) error {
- res, err = db.copyFrom(c, cn, r, query, params...)
- return err
- })
- return res, err
-}
-
-// TODO: don't get/put conn in the pool.
-func (db *baseDB) copyFrom(
- ctx context.Context, cn *pool.Conn, r io.Reader, query interface{}, params ...interface{},
-) (res Result, err error) {
- var evt *QueryEvent
-
- wb := pool.GetWriteBuffer()
- defer pool.PutWriteBuffer(wb)
-
- if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
- return nil, err
- }
-
- var model interface{}
- if len(params) > 0 {
- model, _ = params[len(params)-1].(orm.TableModel)
- }
-
- ctx, evt, err = db.beforeQuery(ctx, db.db, model, query, params, wb.Query())
- if err != nil {
- return nil, err
- }
-
- // Note that afterQuery uses the err.
- defer func() {
- if afterQueryErr := db.afterQuery(ctx, evt, res, err); afterQueryErr != nil {
- err = afterQueryErr
- }
- }()
-
- err = cn.WithWriter(ctx, db.opt.WriteTimeout, func(wb *pool.WriteBuffer) error {
- return writeQueryMsg(wb, db.fmter, query, params...)
- })
- if err != nil {
- return nil, err
- }
-
- err = cn.WithReader(ctx, db.opt.ReadTimeout, readCopyInResponse)
- if err != nil {
- return nil, err
- }
-
- for {
- err = cn.WithWriter(ctx, db.opt.WriteTimeout, func(wb *pool.WriteBuffer) error {
- return writeCopyData(wb, r)
- })
- if err != nil {
- if err == io.EOF {
- break
- }
- return nil, err
- }
- }
-
- err = cn.WithWriter(ctx, db.opt.WriteTimeout, func(wb *pool.WriteBuffer) error {
- writeCopyDone(wb)
- return nil
- })
- if err != nil {
- return nil, err
- }
-
- err = cn.WithReader(ctx, db.opt.ReadTimeout, func(rd *pool.ReaderContext) error {
- res, err = readReadyForQuery(rd)
- return err
- })
- if err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-// CopyTo copies data from a table to writer.
-func (db *baseDB) CopyTo(w io.Writer, query interface{}, params ...interface{}) (res Result, err error) {
- c := db.db.Context()
- err = db.withConn(c, func(c context.Context, cn *pool.Conn) error {
- res, err = db.copyTo(c, cn, w, query, params...)
- return err
- })
- return res, err
-}
-
-func (db *baseDB) copyTo(
- ctx context.Context, cn *pool.Conn, w io.Writer, query interface{}, params ...interface{},
-) (res Result, err error) {
- var evt *QueryEvent
-
- wb := pool.GetWriteBuffer()
- defer pool.PutWriteBuffer(wb)
-
- if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
- return nil, err
- }
-
- var model interface{}
- if len(params) > 0 {
- model, _ = params[len(params)-1].(orm.TableModel)
- }
-
- ctx, evt, err = db.beforeQuery(ctx, db.db, model, query, params, wb.Query())
- if err != nil {
- return nil, err
- }
-
- // Note that afterQuery uses the err.
- defer func() {
- if afterQueryErr := db.afterQuery(ctx, evt, res, err); afterQueryErr != nil {
- err = afterQueryErr
- }
- }()
-
- err = cn.WithWriter(ctx, db.opt.WriteTimeout, func(wb *pool.WriteBuffer) error {
- return writeQueryMsg(wb, db.fmter, query, params...)
- })
- if err != nil {
- return nil, err
- }
-
- err = cn.WithReader(ctx, db.opt.ReadTimeout, func(rd *pool.ReaderContext) error {
- err := readCopyOutResponse(rd)
- if err != nil {
- return err
- }
-
- res, err = readCopyData(rd, w)
- return err
- })
- if err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-// Ping verifies a connection to the database is still alive,
-// establishing a connection if necessary.
-func (db *baseDB) Ping(ctx context.Context) error {
- _, err := db.ExecContext(ctx, "SELECT 1")
- return err
-}
-
-// Model returns new query for the model.
-func (db *baseDB) Model(model ...interface{}) *Query {
- return orm.NewQuery(db.db, model...)
-}
-
-func (db *baseDB) ModelContext(c context.Context, model ...interface{}) *Query {
- return orm.NewQueryContext(c, db.db, model...)
-}
-
-func (db *baseDB) Formatter() orm.QueryFormatter {
- return db.fmter
-}
-
-func (db *baseDB) cancelRequest(processID, secretKey int32) error {
- c := context.TODO()
-
- cn, err := db.pool.NewConn(c)
- if err != nil {
- return err
- }
- defer func() {
- _ = db.pool.CloseConn(cn)
- }()
-
- return cn.WithWriter(c, db.opt.WriteTimeout, func(wb *pool.WriteBuffer) error {
- writeCancelRequestMsg(wb, processID, secretKey)
- return nil
- })
-}
-
-func (db *baseDB) simpleQuery(
- c context.Context, cn *pool.Conn, wb *pool.WriteBuffer,
-) (*result, error) {
- if err := cn.WriteBuffer(c, db.opt.WriteTimeout, wb); err != nil {
- return nil, err
- }
-
- var res *result
- if err := cn.WithReader(c, db.opt.ReadTimeout, func(rd *pool.ReaderContext) error {
- var err error
- res, err = readSimpleQuery(rd)
- return err
- }); err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-func (db *baseDB) simpleQueryData(
- c context.Context, cn *pool.Conn, model interface{}, wb *pool.WriteBuffer,
-) (*result, error) {
- if err := cn.WriteBuffer(c, db.opt.WriteTimeout, wb); err != nil {
- return nil, err
- }
-
- var res *result
- if err := cn.WithReader(c, db.opt.ReadTimeout, func(rd *pool.ReaderContext) error {
- var err error
- res, err = readSimpleQueryData(c, rd, model)
- return err
- }); err != nil {
- return nil, err
- }
-
- return res, nil
-}
-
-// Prepare creates a prepared statement for later queries or
-// executions. Multiple queries or executions may be run concurrently
-// from the returned statement.
-func (db *baseDB) Prepare(q string) (*Stmt, error) {
- return prepareStmt(db.withPool(pool.NewStickyConnPool(db.pool)), q)
-}
-
-func (db *baseDB) prepare(
- c context.Context, cn *pool.Conn, q string,
-) (string, []types.ColumnInfo, error) {
- name := cn.NextID()
- err := cn.WithWriter(c, db.opt.WriteTimeout, func(wb *pool.WriteBuffer) error {
- writeParseDescribeSyncMsg(wb, name, q)
- return nil
- })
- if err != nil {
- return "", nil, err
- }
-
- var columns []types.ColumnInfo
- err = cn.WithReader(c, db.opt.ReadTimeout, func(rd *pool.ReaderContext) error {
- columns, err = readParseDescribeSync(rd)
- return err
- })
- if err != nil {
- return "", nil, err
- }
-
- return name, columns, nil
-}
-
-func (db *baseDB) closeStmt(c context.Context, cn *pool.Conn, name string) error {
- err := cn.WithWriter(c, db.opt.WriteTimeout, func(wb *pool.WriteBuffer) error {
- writeCloseMsg(wb, name)
- writeFlushMsg(wb)
- return nil
- })
- if err != nil {
- return err
- }
-
- err = cn.WithReader(c, db.opt.ReadTimeout, readCloseCompleteMsg)
- return err
-}