diff options
Diffstat (limited to 'internal/db/bundb/wrap.go')
-rw-r--r-- | internal/db/bundb/wrap.go | 258 |
1 files changed, 258 insertions, 0 deletions
diff --git a/internal/db/bundb/wrap.go b/internal/db/bundb/wrap.go new file mode 100644 index 000000000..a5039914a --- /dev/null +++ b/internal/db/bundb/wrap.go @@ -0,0 +1,258 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package bundb + +import ( + "context" + "database/sql" + "time" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect" +) + +// WrappedDB wraps a bun database instance +// to provide common per-dialect SQL error +// conversions to common types, and retries +// on returned busy errors (SQLite only for now). +type WrappedDB struct { + errHook func(error) error + *bun.DB // underlying conn +} + +// WrapDB wraps a bun database instance in our own WrappedDB type. +func WrapDB(db *bun.DB) *WrappedDB { + var errProc func(error) error + switch name := db.Dialect().Name(); name { + case dialect.PG: + errProc = processPostgresError + case dialect.SQLite: + errProc = processSQLiteError + default: + panic("unknown dialect name: " + name.String()) + } + return &WrappedDB{ + errHook: errProc, + DB: db, + } +} + +// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout. +func (db *WrappedDB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx bun.Tx, err error) { + err = retryOnBusy(ctx, func() error { + tx, err = db.DB.BeginTx(ctx, opts) + err = db.ProcessError(err) + return err + }) + return +} + +// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout. +func (db *WrappedDB) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) { + err = retryOnBusy(ctx, func() error { + result, err = db.DB.ExecContext(ctx, query, args...) + err = db.ProcessError(err) + return err + }) + return +} + +// QueryContext wraps bun.DB.QueryContext() with retry-busy timeout. +func (db *WrappedDB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) { + err = retryOnBusy(ctx, func() error { + rows, err = db.DB.QueryContext(ctx, query, args...) + err = db.ProcessError(err) + return err + }) + return +} + +// QueryRowContext wraps bun.DB.QueryRowContext() with retry-busy timeout. +func (db *WrappedDB) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) { + _ = retryOnBusy(ctx, func() error { + row = db.DB.QueryRowContext(ctx, query, args...) + err := db.ProcessError(row.Err()) + return err + }) + return +} + +// RunInTx is functionally the same as bun.DB.RunInTx() but with retry-busy timeouts. +func (db *WrappedDB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error { + // Attempt to start new transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + var done bool + + defer func() { + if !done { + // Rollback (with retry-backoff). + _ = retryOnBusy(ctx, func() error { + err := tx.Rollback() + return db.errHook(err) + }) + } + }() + + // Perform supplied transaction + if err := fn(tx); err != nil { + return db.errHook(err) + } + + // Commit (with retry-backoff). + err = retryOnBusy(ctx, func() error { + err := tx.Commit() + return db.errHook(err) + }) + done = true + return err +} + +func (db *WrappedDB) NewValues(model interface{}) *bun.ValuesQuery { + return bun.NewValuesQuery(db.DB, model).Conn(db) +} + +func (db *WrappedDB) NewMerge() *bun.MergeQuery { + return bun.NewMergeQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewSelect() *bun.SelectQuery { + return bun.NewSelectQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewInsert() *bun.InsertQuery { + return bun.NewInsertQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewUpdate() *bun.UpdateQuery { + return bun.NewUpdateQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewDelete() *bun.DeleteQuery { + return bun.NewDeleteQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewRaw(query string, args ...interface{}) *bun.RawQuery { + return bun.NewRawQuery(db.DB, query, args...).Conn(db) +} + +func (db *WrappedDB) NewCreateTable() *bun.CreateTableQuery { + return bun.NewCreateTableQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewDropTable() *bun.DropTableQuery { + return bun.NewDropTableQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewCreateIndex() *bun.CreateIndexQuery { + return bun.NewCreateIndexQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewDropIndex() *bun.DropIndexQuery { + return bun.NewDropIndexQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewTruncateTable() *bun.TruncateTableQuery { + return bun.NewTruncateTableQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewAddColumn() *bun.AddColumnQuery { + return bun.NewAddColumnQuery(db.DB).Conn(db) +} + +func (db *WrappedDB) NewDropColumn() *bun.DropColumnQuery { + return bun.NewDropColumnQuery(db.DB).Conn(db) +} + +// ProcessError processes an error to replace any known values with our own error types, +// making it easier to catch specific situations (e.g. no rows, already exists, etc) +func (db *WrappedDB) ProcessError(err error) error { + if err == nil { + return nil + } + return db.errHook(err) +} + +// Exists checks the results of a SelectQuery for the existence of the data in question, masking ErrNoEntries errors +func (db *WrappedDB) Exists(ctx context.Context, query *bun.SelectQuery) (bool, error) { + exists, err := query.Exists(ctx) + switch err { + case nil: + return exists, nil + case sql.ErrNoRows: + return false, nil + default: + return false, err + } +} + +// NotExists is the functional opposite of conn.Exists() +func (db *WrappedDB) NotExists(ctx context.Context, query *bun.SelectQuery) (bool, error) { + exists, err := db.Exists(ctx, query) + return !exists, err +} + +// retryOnBusy will retry given function on returned 'errBusy'. +func retryOnBusy(ctx context.Context, fn func() error) error { + var backoff time.Duration + + for i := 0; ; i++ { + // Perform func. + err := fn() + + if err != errBusy { + // May be nil, or may be + // some other error, either + // way return here. + return err + } + + // backoff according to a multiplier of 2ms * 2^2n, + // up to a maximum possible backoff time of 5 minutes. + // + // this works out as the following: + // 4ms + // 16ms + // 64ms + // 256ms + // 1.024s + // 4.096s + // 16.384s + // 1m5.536s + // 4m22.144s + backoff = 2 * time.Millisecond * (1 << (2*i + 1)) + if backoff >= 5*time.Minute { + break + } + + select { + // Context cancelled. + case <-ctx.Done(): + + // Backoff for some time. + case <-time.After(backoff): + } + } + + return gtserror.Newf("%w (waited > %s)", db.ErrBusyTimeout, backoff) +} |