diff options
Diffstat (limited to 'internal/db/bundb/migrations/util.go')
-rw-r--r-- | internal/db/bundb/migrations/util.go | 85 |
1 files changed, 77 insertions, 8 deletions
diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go index 7f8b57c42..6ffcdd09d 100644 --- a/internal/db/bundb/migrations/util.go +++ b/internal/db/bundb/migrations/util.go @@ -22,11 +22,13 @@ import ( "errors" "fmt" "reflect" + "slices" "strconv" "strings" "codeberg.org/gruf/go-byteutil" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/uptrace/bun" "github.com/uptrace/bun/dialect" @@ -46,6 +48,7 @@ func convertEnums[OldType ~string, NewType ~int16]( mapping map[OldType]NewType, defaultValue *NewType, indexCleanupCallback func(context.Context, bun.Tx) error, + batchByColumn string, ) error { if len(mapping) == 0 { return errors.New("empty mapping") @@ -87,7 +90,7 @@ func convertEnums[OldType ~string, NewType ~int16]( var qbuf byteutil.Buffer // Prepare a singular UPDATE statement using - // SET $newColumn = (CASE $column WHEN $old THEN $new ... END) + // SET $newColumn = (CASE $column WHEN $old THEN $new ... END). qbuf.B = append(qbuf.B, "UPDATE ? SET ? = (CASE ? "...) args = append(args, bun.Ident(table)) args = append(args, bun.Ident(newColumn)) @@ -99,16 +102,82 @@ func convertEnums[OldType ~string, NewType ~int16]( qbuf.B = append(qbuf.B, "ELSE ? END)"...) args = append(args, *defaultValue) - // Execute the prepared raw query with arguments. - res, err := tx.NewRaw(qbuf.String(), args...).Exec(ctx) - if err != nil { - return gtserror.Newf("error updating old column values: %w", err) + // Serialize it here to be + // used as the base for each + // set of batch queries below. + baseQStr := string(qbuf.B) + baseArgs := args + + // Query batch size + // in number of rows. + const batchsz = 5000 + + // Stores highest batch value + // used in iterate queries, + // starting at highest possible. + highest := id.Highest + + // Total updated rows. + var updated int + + for { + // Limit to batchsz + // items at once. + batchQ := tx. + NewSelect(). + Table(table). + Column(batchByColumn). + Where("? < ?", bun.Ident(batchByColumn), highest). + OrderExpr("? DESC", bun.Ident(batchByColumn)). + Limit(batchsz) + + // Finalize UPDATE to operate on this batch only. + qStr := baseQStr + " WHERE ? IN (?)" + args := append( + slices.Clone(baseArgs), + bun.Ident(batchByColumn), + batchQ, + ) + + // Execute the prepared raw query with arguments. + res, err := tx.NewRaw(qStr, args...).Exec(ctx) + if err != nil { + return gtserror.Newf("error updating old column values: %w", err) + } + + // Check how many items we updated. + thisUpdated, err := res.RowsAffected() + if err != nil { + return gtserror.Newf("error counting affected rows: %w", err) + } + + if thisUpdated == 0 { + // Nothing updated + // means we're done. + break + } + + // Update the overall count. + updated += int(thisUpdated) + + // Log helpful message to admin. + log.Infof(ctx, "migrated %d of %d %s (up to %s)", + updated, total, table, highest) + + // Get next highest + // id for next batch. + if err := tx. + NewSelect(). + With("batch_query", batchQ). + ColumnExpr("min(?) FROM ?", bun.Ident(batchByColumn), bun.Ident("batch_query")). + Scan(ctx, &highest); err != nil { + return gtserror.Newf("error selecting next highest: %w", err) + } } - // Count number items updated. - updated, _ := res.RowsAffected() if total != int(updated) { - log.Warnf(ctx, "total=%d does not match updated=%d", total, updated) + // Return error here in order to rollback the whole transaction. + return fmt.Errorf("total=%d does not match updated=%d", total, updated) } // Run index cleanup callback if set. |