summaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/v5/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/v5/batch.go')
-rw-r--r--vendor/github.com/jackc/pgx/v5/batch.go443
1 files changed, 0 insertions, 443 deletions
diff --git a/vendor/github.com/jackc/pgx/v5/batch.go b/vendor/github.com/jackc/pgx/v5/batch.go
deleted file mode 100644
index c3c2834f2..000000000
--- a/vendor/github.com/jackc/pgx/v5/batch.go
+++ /dev/null
@@ -1,443 +0,0 @@
-package pgx
-
-import (
- "context"
- "errors"
- "fmt"
-
- "github.com/jackc/pgx/v5/pgconn"
-)
-
-// QueuedQuery is a query that has been queued for execution via a Batch.
-type QueuedQuery struct {
- SQL string
- Arguments []any
- Fn batchItemFunc
- sd *pgconn.StatementDescription
-}
-
-type batchItemFunc func(br BatchResults) error
-
-// Query sets fn to be called when the response to qq is received.
-func (qq *QueuedQuery) Query(fn func(rows Rows) error) {
- qq.Fn = func(br BatchResults) error {
- rows, _ := br.Query()
- defer rows.Close()
-
- err := fn(rows)
- if err != nil {
- return err
- }
- rows.Close()
-
- return rows.Err()
- }
-}
-
-// Query sets fn to be called when the response to qq is received.
-func (qq *QueuedQuery) QueryRow(fn func(row Row) error) {
- qq.Fn = func(br BatchResults) error {
- row := br.QueryRow()
- return fn(row)
- }
-}
-
-// Exec sets fn to be called when the response to qq is received.
-func (qq *QueuedQuery) Exec(fn func(ct pgconn.CommandTag) error) {
- qq.Fn = func(br BatchResults) error {
- ct, err := br.Exec()
- if err != nil {
- return err
- }
-
- return fn(ct)
- }
-}
-
-// Batch queries are a way of bundling multiple queries together to avoid
-// unnecessary network round trips. A Batch must only be sent once.
-type Batch struct {
- QueuedQueries []*QueuedQuery
-}
-
-// Queue queues a query to batch b. query can be an SQL query or the name of a prepared statement. The only pgx option
-// argument that is supported is QueryRewriter. Queries are executed using the connection's DefaultQueryExecMode.
-//
-// While query can contain multiple statements if the connection's DefaultQueryExecMode is QueryModeSimple, this should
-// be avoided. QueuedQuery.Fn must not be set as it will only be called for the first query. That is, QueuedQuery.Query,
-// QueuedQuery.QueryRow, and QueuedQuery.Exec must not be called. In addition, any error messages or tracing that
-// include the current query may reference the wrong query.
-func (b *Batch) Queue(query string, arguments ...any) *QueuedQuery {
- qq := &QueuedQuery{
- SQL: query,
- Arguments: arguments,
- }
- b.QueuedQueries = append(b.QueuedQueries, qq)
- return qq
-}
-
-// Len returns number of queries that have been queued so far.
-func (b *Batch) Len() int {
- return len(b.QueuedQueries)
-}
-
-type BatchResults interface {
- // Exec reads the results from the next query in the batch as if the query has been sent with Conn.Exec. Prefer
- // calling Exec on the QueuedQuery.
- Exec() (pgconn.CommandTag, error)
-
- // Query reads the results from the next query in the batch as if the query has been sent with Conn.Query. Prefer
- // calling Query on the QueuedQuery.
- Query() (Rows, error)
-
- // QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow.
- // Prefer calling QueryRow on the QueuedQuery.
- QueryRow() Row
-
- // Close closes the batch operation. All unread results are read and any callback functions registered with
- // QueuedQuery.Query, QueuedQuery.QueryRow, or QueuedQuery.Exec will be called. If a callback function returns an
- // error or the batch encounters an error subsequent callback functions will not be called.
- //
- // Close must be called before the underlying connection can be used again. Any error that occurred during a batch
- // operation may have made it impossible to resyncronize the connection with the server. In this case the underlying
- // connection will have been closed.
- //
- // Close is safe to call multiple times. If it returns an error subsequent calls will return the same error. Callback
- // functions will not be rerun.
- Close() error
-}
-
-type batchResults struct {
- ctx context.Context
- conn *Conn
- mrr *pgconn.MultiResultReader
- err error
- b *Batch
- qqIdx int
- closed bool
- endTraced bool
-}
-
-// Exec reads the results from the next query in the batch as if the query has been sent with Exec.
-func (br *batchResults) Exec() (pgconn.CommandTag, error) {
- if br.err != nil {
- return pgconn.CommandTag{}, br.err
- }
- if br.closed {
- return pgconn.CommandTag{}, fmt.Errorf("batch already closed")
- }
-
- query, arguments, _ := br.nextQueryAndArgs()
-
- if !br.mrr.NextResult() {
- err := br.mrr.Close()
- if err == nil {
- err = errors.New("no more results in batch")
- }
- if br.conn.batchTracer != nil {
- br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
- SQL: query,
- Args: arguments,
- Err: err,
- })
- }
- return pgconn.CommandTag{}, err
- }
-
- commandTag, err := br.mrr.ResultReader().Close()
- if err != nil {
- br.err = err
- br.mrr.Close()
- }
-
- if br.conn.batchTracer != nil {
- br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
- SQL: query,
- Args: arguments,
- CommandTag: commandTag,
- Err: br.err,
- })
- }
-
- return commandTag, br.err
-}
-
-// Query reads the results from the next query in the batch as if the query has been sent with Query.
-func (br *batchResults) Query() (Rows, error) {
- query, arguments, ok := br.nextQueryAndArgs()
- if !ok {
- query = "batch query"
- }
-
- if br.err != nil {
- return &baseRows{err: br.err, closed: true}, br.err
- }
-
- if br.closed {
- alreadyClosedErr := fmt.Errorf("batch already closed")
- return &baseRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr
- }
-
- rows := br.conn.getRows(br.ctx, query, arguments)
- rows.batchTracer = br.conn.batchTracer
-
- if !br.mrr.NextResult() {
- rows.err = br.mrr.Close()
- if rows.err == nil {
- rows.err = errors.New("no more results in batch")
- }
- rows.closed = true
-
- if br.conn.batchTracer != nil {
- br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
- SQL: query,
- Args: arguments,
- Err: rows.err,
- })
- }
-
- return rows, rows.err
- }
-
- rows.resultReader = br.mrr.ResultReader()
- return rows, nil
-}
-
-// QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
-func (br *batchResults) QueryRow() Row {
- rows, _ := br.Query()
- return (*connRow)(rows.(*baseRows))
-
-}
-
-// Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to
-// resyncronize the connection with the server. In this case the underlying connection will have been closed.
-func (br *batchResults) Close() error {
- defer func() {
- if !br.endTraced {
- if br.conn != nil && br.conn.batchTracer != nil {
- br.conn.batchTracer.TraceBatchEnd(br.ctx, br.conn, TraceBatchEndData{Err: br.err})
- }
- br.endTraced = true
- }
- }()
-
- if br.err != nil {
- return br.err
- }
-
- if br.closed {
- return nil
- }
-
- // Read and run fn for all remaining items
- for br.err == nil && !br.closed && br.b != nil && br.qqIdx < len(br.b.QueuedQueries) {
- if br.b.QueuedQueries[br.qqIdx].Fn != nil {
- err := br.b.QueuedQueries[br.qqIdx].Fn(br)
- if err != nil {
- br.err = err
- }
- } else {
- br.Exec()
- }
- }
-
- br.closed = true
-
- err := br.mrr.Close()
- if br.err == nil {
- br.err = err
- }
-
- return br.err
-}
-
-func (br *batchResults) earlyError() error {
- return br.err
-}
-
-func (br *batchResults) nextQueryAndArgs() (query string, args []any, ok bool) {
- if br.b != nil && br.qqIdx < len(br.b.QueuedQueries) {
- bi := br.b.QueuedQueries[br.qqIdx]
- query = bi.SQL
- args = bi.Arguments
- ok = true
- br.qqIdx++
- }
- return
-}
-
-type pipelineBatchResults struct {
- ctx context.Context
- conn *Conn
- pipeline *pgconn.Pipeline
- lastRows *baseRows
- err error
- b *Batch
- qqIdx int
- closed bool
- endTraced bool
-}
-
-// Exec reads the results from the next query in the batch as if the query has been sent with Exec.
-func (br *pipelineBatchResults) Exec() (pgconn.CommandTag, error) {
- if br.err != nil {
- return pgconn.CommandTag{}, br.err
- }
- if br.closed {
- return pgconn.CommandTag{}, fmt.Errorf("batch already closed")
- }
- if br.lastRows != nil && br.lastRows.err != nil {
- return pgconn.CommandTag{}, br.err
- }
-
- query, arguments, err := br.nextQueryAndArgs()
- if err != nil {
- return pgconn.CommandTag{}, err
- }
-
- results, err := br.pipeline.GetResults()
- if err != nil {
- br.err = err
- return pgconn.CommandTag{}, br.err
- }
- var commandTag pgconn.CommandTag
- switch results := results.(type) {
- case *pgconn.ResultReader:
- commandTag, br.err = results.Close()
- default:
- return pgconn.CommandTag{}, fmt.Errorf("unexpected pipeline result: %T", results)
- }
-
- if br.conn.batchTracer != nil {
- br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
- SQL: query,
- Args: arguments,
- CommandTag: commandTag,
- Err: br.err,
- })
- }
-
- return commandTag, br.err
-}
-
-// Query reads the results from the next query in the batch as if the query has been sent with Query.
-func (br *pipelineBatchResults) Query() (Rows, error) {
- if br.err != nil {
- return &baseRows{err: br.err, closed: true}, br.err
- }
-
- if br.closed {
- alreadyClosedErr := fmt.Errorf("batch already closed")
- return &baseRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr
- }
-
- if br.lastRows != nil && br.lastRows.err != nil {
- br.err = br.lastRows.err
- return &baseRows{err: br.err, closed: true}, br.err
- }
-
- query, arguments, err := br.nextQueryAndArgs()
- if err != nil {
- return &baseRows{err: err, closed: true}, err
- }
-
- rows := br.conn.getRows(br.ctx, query, arguments)
- rows.batchTracer = br.conn.batchTracer
- br.lastRows = rows
-
- results, err := br.pipeline.GetResults()
- if err != nil {
- br.err = err
- rows.err = err
- rows.closed = true
-
- if br.conn.batchTracer != nil {
- br.conn.batchTracer.TraceBatchQuery(br.ctx, br.conn, TraceBatchQueryData{
- SQL: query,
- Args: arguments,
- Err: err,
- })
- }
- } else {
- switch results := results.(type) {
- case *pgconn.ResultReader:
- rows.resultReader = results
- default:
- err = fmt.Errorf("unexpected pipeline result: %T", results)
- br.err = err
- rows.err = err
- rows.closed = true
- }
- }
-
- return rows, rows.err
-}
-
-// QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
-func (br *pipelineBatchResults) QueryRow() Row {
- rows, _ := br.Query()
- return (*connRow)(rows.(*baseRows))
-
-}
-
-// Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to
-// resyncronize the connection with the server. In this case the underlying connection will have been closed.
-func (br *pipelineBatchResults) Close() error {
- defer func() {
- if !br.endTraced {
- if br.conn.batchTracer != nil {
- br.conn.batchTracer.TraceBatchEnd(br.ctx, br.conn, TraceBatchEndData{Err: br.err})
- }
- br.endTraced = true
- }
- }()
-
- if br.err == nil && br.lastRows != nil && br.lastRows.err != nil {
- br.err = br.lastRows.err
- return br.err
- }
-
- if br.closed {
- return br.err
- }
-
- // Read and run fn for all remaining items
- for br.err == nil && !br.closed && br.b != nil && br.qqIdx < len(br.b.QueuedQueries) {
- if br.b.QueuedQueries[br.qqIdx].Fn != nil {
- err := br.b.QueuedQueries[br.qqIdx].Fn(br)
- if err != nil {
- br.err = err
- }
- } else {
- br.Exec()
- }
- }
-
- br.closed = true
-
- err := br.pipeline.Close()
- if br.err == nil {
- br.err = err
- }
-
- return br.err
-}
-
-func (br *pipelineBatchResults) earlyError() error {
- return br.err
-}
-
-func (br *pipelineBatchResults) nextQueryAndArgs() (query string, args []any, err error) {
- if br.b == nil {
- return "", nil, errors.New("no reference to batch")
- }
-
- if br.qqIdx >= len(br.b.QueuedQueries) {
- return "", nil, errors.New("no more results in batch")
- }
-
- bi := br.b.QueuedQueries[br.qqIdx]
- br.qqIdx++
- return bi.SQL, bi.Arguments, nil
-}