diff options
Diffstat (limited to 'internal/db/bundb/db.go')
-rw-r--r-- | internal/db/bundb/db.go | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/internal/db/bundb/db.go b/internal/db/bundb/db.go new file mode 100644 index 000000000..9b6edcefe --- /dev/null +++ b/internal/db/bundb/db.go @@ -0,0 +1,354 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package bundb + +import ( + "context" + "database/sql" + "time" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/uptrace/bun" + "github.com/uptrace/bun/dialect" + "github.com/uptrace/bun/schema" +) + +// DB wraps a bun database instance +// to provide common per-dialect SQL error +// conversions to common types, and retries +// on returned busy (SQLite only). +type DB struct { + // our own wrapped db type + // with retry backoff support. + // kept separate to the *bun.DB + // type to be passed into query + // builders as bun.IConn iface + // (this prevents double firing + // bun query hooks). + // + // also holds per-dialect + // error hook function. + raw rawdb + + // bun DB interface we use + // for dialects, and improved + // struct marshal/unmarshaling. + bun *bun.DB +} + +// WrapDB wraps a bun database instance in our database type. +func WrapDB(db *bun.DB) *DB { + var errProc func(error) error + switch name := db.Dialect().Name(); name { + case dialect.PG: + errProc = processPostgresError + case dialect.SQLite: + errProc = processSQLiteError + default: + panic("unknown dialect name: " + name.String()) + } + return &DB{ + raw: rawdb{ + errHook: errProc, + DB: db.DB, + }, + bun: db, + } +} + +// Dialect is a direct call-through to bun.DB.Dialect(). +func (db *DB) Dialect() schema.Dialect { return db.bun.Dialect() } + +// AddQueryHook is a direct call-through to bun.DB.AddQueryHook(). +func (db *DB) AddQueryHook(hook bun.QueryHook) { db.bun.AddQueryHook(hook) } + +// RegisterModels is a direct call-through to bun.DB.RegisterModels(). +func (db *DB) RegisterModel(models ...any) { db.bun.RegisterModel(models...) } + +// PingContext is a direct call-through to bun.DB.PingContext(). +func (db *DB) PingContext(ctx context.Context) error { return db.bun.PingContext(ctx) } + +// Close is a direct call-through to bun.DB.Close(). +func (db *DB) Close() error { return db.bun.Close() } + +// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout. +func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx bun.Tx, err error) { + bundb := db.bun // use *bun.DB interface to return bun.Tx type + err = retryOnBusy(ctx, func() error { + tx, err = bundb.BeginTx(ctx, opts) + err = db.raw.errHook(err) + return err + }) + return +} + +// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout. +func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) { + bundb := db.bun // use underlying *bun.DB interface for their query formatting + err = retryOnBusy(ctx, func() error { + result, err = bundb.ExecContext(ctx, query, args...) + err = db.raw.errHook(err) + return err + }) + return +} + +// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout. +func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) { + bundb := db.bun // use underlying *bun.DB interface for their query formatting + err = retryOnBusy(ctx, func() error { + rows, err = bundb.QueryContext(ctx, query, args...) + err = db.raw.errHook(err) + return err + }) + return +} + +// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout. +func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) { + bundb := db.bun // use underlying *bun.DB interface for their query formatting + _ = retryOnBusy(ctx, func() error { + row = bundb.QueryRowContext(ctx, query, args...) + err := db.raw.errHook(row.Err()) + return err + }) + return +} + +// RunInTx is functionally the same as bun.DB.RunInTx() but with retry-busy timeouts. +func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error { + // Attempt to start new transaction. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + var done bool + + defer func() { + if !done { + // Rollback (with retry-backoff). + _ = retryOnBusy(ctx, func() error { + err := tx.Rollback() + return db.raw.errHook(err) + }) + } + }() + + // Perform supplied transaction + if err := fn(tx); err != nil { + return db.raw.errHook(err) + } + + // Commit (with retry-backoff). + err = retryOnBusy(ctx, func() error { + err := tx.Commit() + return db.raw.errHook(err) + }) + done = true + return err +} + +func (db *DB) NewValues(model interface{}) *bun.ValuesQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewValuesQuery(db.bun, model).Conn(&db.raw) +} + +func (db *DB) NewMerge() *bun.MergeQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewMergeQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewSelect() *bun.SelectQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewSelectQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewInsert() *bun.InsertQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewInsertQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewUpdate() *bun.UpdateQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewUpdateQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewDelete() *bun.DeleteQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewDeleteQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewRaw(query string, args ...interface{}) *bun.RawQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewRawQuery(db.bun, query, args...).Conn(&db.raw) +} + +func (db *DB) NewCreateTable() *bun.CreateTableQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewCreateTableQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewDropTable() *bun.DropTableQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewDropTableQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewCreateIndex() *bun.CreateIndexQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewCreateIndexQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewDropIndex() *bun.DropIndexQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewDropIndexQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewTruncateTable() *bun.TruncateTableQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewTruncateTableQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewAddColumn() *bun.AddColumnQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewAddColumnQuery(db.bun).Conn(&db.raw) +} + +func (db *DB) NewDropColumn() *bun.DropColumnQuery { + // note: passing in rawdb as conn iface so no double query-hook + // firing when passed through the bun.DB.Query___() functions. + return bun.NewDropColumnQuery(db.bun).Conn(&db.raw) +} + +// Exists checks the results of a SelectQuery for the existence of the data in question, masking ErrNoEntries errors. +func (db *DB) Exists(ctx context.Context, query *bun.SelectQuery) (bool, error) { + exists, err := query.Exists(ctx) + switch err { + case nil: + return exists, nil + case sql.ErrNoRows: + return false, nil + default: + return false, err + } +} + +// NotExists checks the results of a SelectQuery for the non-existence of the data in question, masking ErrNoEntries errors. +func (db *DB) NotExists(ctx context.Context, query *bun.SelectQuery) (bool, error) { + exists, err := db.Exists(ctx, query) + return !exists, err +} + +type rawdb struct { + // dialect specific error + // processing function hook. + errHook func(error) error + + // embedded raw + // db interface + *sql.DB +} + +// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout. +func (db *rawdb) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) { + err = retryOnBusy(ctx, func() error { + result, err = db.DB.ExecContext(ctx, query, args...) + err = db.errHook(err) + return err + }) + return +} + +// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout. +func (db *rawdb) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) { + err = retryOnBusy(ctx, func() error { + rows, err = db.DB.QueryContext(ctx, query, args...) + err = db.errHook(err) + return err + }) + return +} + +// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout. +func (db *rawdb) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) { + _ = retryOnBusy(ctx, func() error { + row = db.DB.QueryRowContext(ctx, query, args...) + err := db.errHook(row.Err()) + return err + }) + return +} + +// retryOnBusy will retry given function on returned 'errBusy'. +func retryOnBusy(ctx context.Context, fn func() error) error { + var backoff time.Duration + + for i := 0; ; i++ { + // Perform func. + err := fn() + + if err != errBusy { + // May be nil, or may be + // some other error, either + // way return here. + return err + } + + // backoff according to a multiplier of 2ms * 2^2n, + // up to a maximum possible backoff time of 5 minutes. + // + // this works out as the following: + // 4ms + // 16ms + // 64ms + // 256ms + // 1.024s + // 4.096s + // 16.384s + // 1m5.536s + // 4m22.144s + backoff = 2 * time.Millisecond * (1 << (2*i + 1)) + if backoff >= 5*time.Minute { + break + } + + select { + // Context cancelled. + case <-ctx.Done(): + + // Backoff for some time. + case <-time.After(backoff): + } + } + + return gtserror.Newf("%w (waited > %s)", db.ErrBusyTimeout, backoff) +} |