summaryrefslogtreecommitdiff
path: root/internal/db/bundb/migrations/util.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/db/bundb/migrations/util.go')
-rw-r--r--internal/db/bundb/migrations/util.go85
1 files changed, 77 insertions, 8 deletions
diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go
index 7f8b57c42..6ffcdd09d 100644
--- a/internal/db/bundb/migrations/util.go
+++ b/internal/db/bundb/migrations/util.go
@@ -22,11 +22,13 @@ import (
"errors"
"fmt"
"reflect"
+ "slices"
"strconv"
"strings"
"codeberg.org/gruf/go-byteutil"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect"
@@ -46,6 +48,7 @@ func convertEnums[OldType ~string, NewType ~int16](
mapping map[OldType]NewType,
defaultValue *NewType,
indexCleanupCallback func(context.Context, bun.Tx) error,
+ batchByColumn string,
) error {
if len(mapping) == 0 {
return errors.New("empty mapping")
@@ -87,7 +90,7 @@ func convertEnums[OldType ~string, NewType ~int16](
var qbuf byteutil.Buffer
// Prepare a singular UPDATE statement using
- // SET $newColumn = (CASE $column WHEN $old THEN $new ... END)
+ // SET $newColumn = (CASE $column WHEN $old THEN $new ... END).
qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...)
args = append(args, bun.Ident(table))
args = append(args, bun.Ident(newColumn))
@@ -99,16 +102,82 @@ func convertEnums[OldType ~string, NewType ~int16](
qbuf.B = append(qbuf.B, "ELSE ? END)"...)
args = append(args, *defaultValue)
- // Execute the prepared raw query with arguments.
- res, err := tx.NewRaw(qbuf.String(), args...).Exec(ctx)
- if err != nil {
- return gtserror.Newf("error updating old column values: %w", err)
+ // Serialize it here to be
+ // used as the base for each
+ // set of batch queries below.
+ baseQStr := string(qbuf.B)
+ baseArgs := args
+
+ // Query batch size
+ // in number of rows.
+ const batchsz = 5000
+
+ // Stores highest batch value
+ // used in iterate queries,
+ // starting at highest possible.
+ highest := id.Highest
+
+ // Total updated rows.
+ var updated int
+
+ for {
+ // Limit to batchsz
+ // items at once.
+ batchQ := tx.
+ NewSelect().
+ Table(table).
+ Column(batchByColumn).
+ Where("? < ?", bun.Ident(batchByColumn), highest).
+ OrderExpr("? DESC", bun.Ident(batchByColumn)).
+ Limit(batchsz)
+
+ // Finalize UPDATE to operate on this batch only.
+ qStr := baseQStr + " WHERE ? IN (?)"
+ args := append(
+ slices.Clone(baseArgs),
+ bun.Ident(batchByColumn),
+ batchQ,
+ )
+
+ // Execute the prepared raw query with arguments.
+ res, err := tx.NewRaw(qStr, args...).Exec(ctx)
+ if err != nil {
+ return gtserror.Newf("error updating old column values: %w", err)
+ }
+
+ // Check how many items we updated.
+ thisUpdated, err := res.RowsAffected()
+ if err != nil {
+ return gtserror.Newf("error counting affected rows: %w", err)
+ }
+
+ if thisUpdated == 0 {
+ // Nothing updated
+ // means we're done.
+ break
+ }
+
+ // Update the overall count.
+ updated += int(thisUpdated)
+
+ // Log helpful message to admin.
+ log.Infof(ctx, "migrated %d of %d %s (up to %s)",
+ updated, total, table, highest)
+
+ // Get next highest
+ // id for next batch.
+ if err := tx.
+ NewSelect().
+ With("batch_query", batchQ).
+ ColumnExpr("min(?) FROM ?", bun.Ident(batchByColumn), bun.Ident("batch_query")).
+ Scan(ctx, &highest); err != nil {
+ return gtserror.Newf("error selecting next highest: %w", err)
+ }
}
- // Count number items updated.
- updated, _ := res.RowsAffected()
if total != int(updated) {
- log.Warnf(ctx, "total=%d does not match updated=%d", total, updated)
+ // Return error here in order to rollback the whole transaction.
+ return fmt.Errorf("total=%d does not match updated=%d", total, updated)
}
// Run index cleanup callback if set.