summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/db/bundb/account.go6
-rw-r--r--internal/db/bundb/db.go294
-rw-r--r--internal/db/bundb/emoji.go2
-rw-r--r--internal/db/bundb/list.go4
-rw-r--r--internal/db/bundb/marker.go2
-rw-r--r--internal/db/bundb/media.go2
-rw-r--r--internal/db/bundb/status.go6
7 files changed, 270 insertions, 46 deletions
diff --git a/internal/db/bundb/account.go b/internal/db/bundb/account.go
index c88edebbf..43e5055e1 100644
--- a/internal/db/bundb/account.go
+++ b/internal/db/bundb/account.go
@@ -298,7 +298,7 @@ func (a *accountDB) PutAccount(ctx context.Context, account *gtsmodel.Account) e
// It is safe to run this database transaction within cache.Store
// as the cache does not attempt a mutex lock until AFTER hook.
//
- return a.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return a.db.RunInTx(ctx, func(tx Tx) error {
// create links between this account and any emojis it uses
for _, i := range account.EmojiIDs {
if _, err := tx.NewInsert().Model(&gtsmodel.AccountToEmoji{
@@ -327,7 +327,7 @@ func (a *accountDB) UpdateAccount(ctx context.Context, account *gtsmodel.Account
// It is safe to run this database transaction within cache.Store
// as the cache does not attempt a mutex lock until AFTER hook.
//
- return a.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return a.db.RunInTx(ctx, func(tx Tx) error {
// create links between this account and any emojis it uses
// first clear out any old emoji links
if _, err := tx.
@@ -375,7 +375,7 @@ func (a *accountDB) DeleteAccount(ctx context.Context, id string) error {
return err
}
- return a.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return a.db.RunInTx(ctx, func(tx Tx) error {
// clear out any emoji links
if _, err := tx.
NewDelete().
diff --git a/internal/db/bundb/db.go b/internal/db/bundb/db.go
index 9b6edcefe..2b19ba0c4 100644
--- a/internal/db/bundb/db.go
+++ b/internal/db/bundb/db.go
@@ -21,6 +21,7 @@ import (
"context"
"database/sql"
"time"
+ "unsafe"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
@@ -66,7 +67,7 @@ func WrapDB(db *bun.DB) *DB {
return &DB{
raw: rawdb{
errHook: errProc,
- DB: db.DB,
+ db: db.DB,
},
bun: db,
}
@@ -87,18 +88,7 @@ func (db *DB) PingContext(ctx context.Context) error { return db.bun.PingContext
// Close is a direct call-through to bun.DB.Close().
func (db *DB) Close() error { return db.bun.Close() }
-// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout.
-func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx bun.Tx, err error) {
- bundb := db.bun // use *bun.DB interface to return bun.Tx type
- err = retryOnBusy(ctx, func() error {
- tx, err = bundb.BeginTx(ctx, opts)
- err = db.raw.errHook(err)
- return err
- })
- return
-}
-
-// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout.
+// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing.
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) {
bundb := db.bun // use underlying *bun.DB interface for their query formatting
err = retryOnBusy(ctx, func() error {
@@ -109,7 +99,7 @@ func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (resul
return
}
-// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout.
+// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing.
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) {
bundb := db.bun // use underlying *bun.DB interface for their query formatting
err = retryOnBusy(ctx, func() error {
@@ -120,19 +110,40 @@ func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows
return
}
-// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout.
+// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing.
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) {
bundb := db.bun // use underlying *bun.DB interface for their query formatting
_ = retryOnBusy(ctx, func() error {
row = bundb.QueryRowContext(ctx, query, args...)
- err := db.raw.errHook(row.Err())
+ if err := db.raw.errHook(row.Err()); err != nil {
+ updateRowError(row, err) // set new error
+ }
+ return row.Err()
+ })
+ return
+}
+
+// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout and our own error processing.
+func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx Tx, err error) {
+ var buntx bun.Tx // captured bun.Tx
+ bundb := db.bun // use *bun.DB interface to return bun.Tx type
+
+ err = retryOnBusy(ctx, func() error {
+ buntx, err = bundb.BeginTx(ctx, opts)
+ err = db.raw.errHook(err)
return err
})
+
+ if err == nil {
+ // Wrap bun.Tx in our type.
+ tx = wrapTx(db, &buntx)
+ }
+
return
}
// RunInTx is functionally the same as bun.DB.RunInTx() but with retry-busy timeouts.
-func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error {
+func (db *DB) RunInTx(ctx context.Context, fn func(Tx) error) error {
// Attempt to start new transaction.
tx, err := db.BeginTx(ctx, nil)
if err != nil {
@@ -143,24 +154,18 @@ func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error {
defer func() {
if !done {
- // Rollback (with retry-backoff).
- _ = retryOnBusy(ctx, func() error {
- err := tx.Rollback()
- return db.raw.errHook(err)
- })
+ // Rollback tx.
+ _ = tx.Rollback()
}
}()
// Perform supplied transaction
if err := fn(tx); err != nil {
- return db.raw.errHook(err)
+ return err
}
- // Commit (with retry-backoff).
- err = retryOnBusy(ctx, func() error {
- err := tx.Commit()
- return db.raw.errHook(err)
- })
+ // Commit tx.
+ err = tx.Commit()
done = true
return err
}
@@ -275,39 +280,258 @@ type rawdb struct {
// embedded raw
// db interface
- *sql.DB
+ db *sql.DB
}
-// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout.
+// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout and our own error processing.
func (db *rawdb) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) {
err = retryOnBusy(ctx, func() error {
- result, err = db.DB.ExecContext(ctx, query, args...)
+ result, err = db.db.ExecContext(ctx, query, args...)
err = db.errHook(err)
return err
})
return
}
-// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout.
+// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout and our own error processing.
func (db *rawdb) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) {
err = retryOnBusy(ctx, func() error {
- rows, err = db.DB.QueryContext(ctx, query, args...)
+ rows, err = db.db.QueryContext(ctx, query, args...)
err = db.errHook(err)
return err
})
return
}
-// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout.
+// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout and our own error processing.
func (db *rawdb) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) {
_ = retryOnBusy(ctx, func() error {
- row = db.DB.QueryRowContext(ctx, query, args...)
+ row = db.db.QueryRowContext(ctx, query, args...)
err := db.errHook(row.Err())
return err
})
return
}
+// Tx wraps a bun transaction instance
+// to provide common per-dialect SQL error
+// conversions to common types, and retries
+// on busy commit/rollback (SQLite only).
+type Tx struct {
+ // our own wrapped Tx type
+ // kept separate to the *bun.Tx
+ // type to be passed into query
+ // builders as bun.IConn iface
+ // (this prevents double firing
+ // bun query hooks).
+ //
+ // also holds per-dialect
+ // error hook function.
+ raw rawtx
+
+ // bun Tx interface we use
+ // for dialects, and improved
+ // struct marshal/unmarshaling.
+ bun *bun.Tx
+}
+
+// wrapTx wraps a given bun.Tx in our own wrapping Tx type.
+func wrapTx(db *DB, tx *bun.Tx) Tx {
+ return Tx{
+ raw: rawtx{
+ errHook: db.raw.errHook,
+ tx: tx.Tx,
+ },
+ bun: tx,
+ }
+}
+
+// ExecContext wraps bun.Tx.ExecContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
+func (tx Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
+ buntx := tx.bun // use underlying *bun.Tx interface for their query formatting
+ res, err := buntx.ExecContext(ctx, query, args...)
+ err = tx.raw.errHook(err)
+ return res, err
+}
+
+// QueryContext wraps bun.Tx.QueryContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
+func (tx Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
+ buntx := tx.bun // use underlying *bun.Tx interface for their query formatting
+ rows, err := buntx.QueryContext(ctx, query, args...)
+ err = tx.raw.errHook(err)
+ return rows, err
+}
+
+// QueryRowContext wraps bun.Tx.QueryRowContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
+func (tx Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
+ buntx := tx.bun // use underlying *bun.Tx interface for their query formatting
+ row := buntx.QueryRowContext(ctx, query, args...)
+ if err := tx.raw.errHook(row.Err()); err != nil {
+ updateRowError(row, err) // set new error
+ }
+ return row
+}
+
+// Commit wraps bun.Tx.Commit() with retry-busy timeout and our own error processing.
+func (tx Tx) Commit() (err error) {
+ buntx := tx.bun // use *bun.Tx interface
+ err = retryOnBusy(context.TODO(), func() error {
+ err = buntx.Commit()
+ err = tx.raw.errHook(err)
+ return err
+ })
+ return
+}
+
+// Rollback wraps bun.Tx.Rollback() with retry-busy timeout and our own error processing.
+func (tx Tx) Rollback() (err error) {
+ buntx := tx.bun // use *bun.Tx interface
+ err = retryOnBusy(context.TODO(), func() error {
+ err = buntx.Rollback()
+ err = tx.raw.errHook(err)
+ return err
+ })
+ return
+}
+
+// Dialect is a direct call-through to bun.DB.Dialect().
+func (tx Tx) Dialect() schema.Dialect {
+ return tx.bun.Dialect()
+}
+
+func (tx Tx) NewValues(model interface{}) *bun.ValuesQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewValues(model).Conn(&tx.raw)
+}
+
+func (tx Tx) NewMerge() *bun.MergeQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewMerge().Conn(&tx.raw)
+}
+
+func (tx Tx) NewSelect() *bun.SelectQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewSelect().Conn(&tx.raw)
+}
+
+func (tx Tx) NewInsert() *bun.InsertQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewInsert().Conn(&tx.raw)
+}
+
+func (tx Tx) NewUpdate() *bun.UpdateQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewUpdate().Conn(&tx.raw)
+}
+
+func (tx Tx) NewDelete() *bun.DeleteQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewDelete().Conn(&tx.raw)
+}
+
+func (tx Tx) NewRaw(query string, args ...interface{}) *bun.RawQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewRaw(query, args...).Conn(&tx.raw)
+}
+
+func (tx Tx) NewCreateTable() *bun.CreateTableQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewCreateTable().Conn(&tx.raw)
+}
+
+func (tx Tx) NewDropTable() *bun.DropTableQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewDropTable().Conn(&tx.raw)
+}
+
+func (tx Tx) NewCreateIndex() *bun.CreateIndexQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewCreateIndex().Conn(&tx.raw)
+}
+
+func (tx Tx) NewDropIndex() *bun.DropIndexQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewDropIndex().Conn(&tx.raw)
+}
+
+func (tx Tx) NewTruncateTable() *bun.TruncateTableQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewTruncateTable().Conn(&tx.raw)
+}
+
+func (tx Tx) NewAddColumn() *bun.AddColumnQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewAddColumn().Conn(&tx.raw)
+}
+
+func (tx Tx) NewDropColumn() *bun.DropColumnQuery {
+ // note: passing in rawtx as conn iface so no double query-hook
+ // firing when passed through the bun.Tx.Query___() functions.
+ return tx.bun.NewDropColumn().Conn(&tx.raw)
+}
+
+type rawtx struct {
+ // dialect specific error
+ // processing function hook.
+ errHook func(error) error
+
+ // embedded raw
+ // tx interface
+ tx *sql.Tx
+}
+
+// ExecContext wraps sql.Tx.ExecContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
+func (tx *rawtx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
+ res, err := tx.tx.ExecContext(ctx, query, args...)
+ err = tx.errHook(err)
+ return res, err
+}
+
+// QueryContext wraps sql.Tx.QueryContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
+func (tx *rawtx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
+ rows, err := tx.tx.QueryContext(ctx, query, args...)
+ err = tx.errHook(err)
+ return rows, err
+}
+
+// QueryRowContext wraps sql.Tx.QueryRowContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
+func (tx *rawtx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
+ row := tx.tx.QueryRowContext(ctx, query, args...)
+ if err := tx.errHook(row.Err()); err != nil {
+ updateRowError(row, err) // set new error
+ }
+ return row
+}
+
+// updateRowError updates an sql.Row's internal error field using the unsafe package.
+func updateRowError(sqlrow *sql.Row, err error) {
+ type row struct {
+ err error
+ rows *sql.Rows
+ }
+
+ // compile-time check to ensure sql.Row not changed.
+ if unsafe.Sizeof(row{}) != unsafe.Sizeof(sql.Row{}) {
+ panic("sql.Row has changed definition")
+ }
+
+ // this code is awful and i must be shamed for this.
+ (*row)(unsafe.Pointer(sqlrow)).err = err
+}
+
// retryOnBusy will retry given function on returned 'errBusy'.
func retryOnBusy(ctx context.Context, fn func() error) error {
var backoff time.Duration
diff --git a/internal/db/bundb/emoji.go b/internal/db/bundb/emoji.go
index 2a3d91fe4..a3a19485d 100644
--- a/internal/db/bundb/emoji.go
+++ b/internal/db/bundb/emoji.go
@@ -105,7 +105,7 @@ func (e *emojiDB) DeleteEmojiByID(ctx context.Context, id string) error {
return err
}
- return e.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return e.db.RunInTx(ctx, func(tx Tx) error {
// Delete relational links between this emoji
// and any statuses using it, returning the
// status IDs so we can later update them.
diff --git a/internal/db/bundb/list.go b/internal/db/bundb/list.go
index 23d9c13fb..7a117670a 100644
--- a/internal/db/bundb/list.go
+++ b/internal/db/bundb/list.go
@@ -206,7 +206,7 @@ func (l *listDB) DeleteListByID(ctx context.Context, id string) error {
}
}()
- return l.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return l.db.RunInTx(ctx, func(tx Tx) error {
// Delete all entries attached to list.
if _, err := tx.NewDelete().
Table("list_entries").
@@ -423,7 +423,7 @@ func (l *listDB) PutListEntries(ctx context.Context, entries []*gtsmodel.ListEnt
}()
// Finally, insert each list entry into the database.
- return l.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return l.db.RunInTx(ctx, func(tx Tx) error {
for _, entry := range entries {
entry := entry // rescope
if err := l.state.Caches.GTS.ListEntry().Store(entry, func() error {
diff --git a/internal/db/bundb/marker.go b/internal/db/bundb/marker.go
index 861f7de36..5d365e08a 100644
--- a/internal/db/bundb/marker.go
+++ b/internal/db/bundb/marker.go
@@ -87,7 +87,7 @@ func (m *markerDB) UpdateMarker(ctx context.Context, marker *gtsmodel.Marker) er
// Optimistic concurrency control: start a transaction, try to update a row with a previously retrieved version.
// If the update in the transaction fails to actually change anything, another update happened concurrently, and
// this update should be retried by the caller, which in this case involves sending HTTP 409 to the API client.
- return m.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return m.db.RunInTx(ctx, func(tx Tx) error {
result, err := tx.NewUpdate().
Model(marker).
WherePK().
diff --git a/internal/db/bundb/media.go b/internal/db/bundb/media.go
index fe6aefa90..a2603eacc 100644
--- a/internal/db/bundb/media.go
+++ b/internal/db/bundb/media.go
@@ -122,7 +122,7 @@ func (m *mediaDB) DeleteAttachment(ctx context.Context, id string) error {
defer m.state.Caches.GTS.Media().Invalidate("ID", id)
// Delete media attachment in new transaction.
- err = m.db.RunInTx(ctx, func(tx bun.Tx) error {
+ err = m.db.RunInTx(ctx, func(tx Tx) error {
if media.AccountID != "" {
var account gtsmodel.Account
diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go
index 0e97d32cc..26f0c1f38 100644
--- a/internal/db/bundb/status.go
+++ b/internal/db/bundb/status.go
@@ -276,7 +276,7 @@ func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) error
// It is safe to run this database transaction within cache.Store
// as the cache does not attempt a mutex lock until AFTER hook.
//
- return s.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return s.db.RunInTx(ctx, func(tx Tx) error {
// create links between this status and any emojis it uses
for _, i := range status.EmojiIDs {
if _, err := tx.
@@ -342,7 +342,7 @@ func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, co
// It is safe to run this database transaction within cache.Store
// as the cache does not attempt a mutex lock until AFTER hook.
//
- return s.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return s.db.RunInTx(ctx, func(tx Tx) error {
// create links between this status and any emojis it uses
for _, i := range status.EmojiIDs {
if _, err := tx.
@@ -420,7 +420,7 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
// On return ensure status invalidated from cache.
defer s.state.Caches.GTS.Status().Invalidate("ID", id)
- return s.db.RunInTx(ctx, func(tx bun.Tx) error {
+ return s.db.RunInTx(ctx, func(tx Tx) error {
// delete links between this status and any emojis it uses
if _, err := tx.
NewDelete().