summaryrefslogtreecommitdiff
path: root/internal/db/bundb
diff options
context:
space:
mode:
Diffstat (limited to 'internal/db/bundb')
-rw-r--r--internal/db/bundb/bundb.go1
-rw-r--r--internal/db/bundb/migrations/20231016113235_mute_status_thread.go2
-rw-r--r--internal/db/bundb/migrations/20231016113235_mute_status_thread/thread.go32
-rw-r--r--internal/db/bundb/migrations/20231016113235_mute_status_thread/threadmute.go29
-rw-r--r--internal/db/bundb/migrations/20250415111056_thread_all_statuses.go580
-rw-r--r--internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go133
-rw-r--r--internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/thread.go24
-rw-r--r--internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go131
-rw-r--r--internal/db/bundb/migrations/util.go107
-rw-r--r--internal/db/bundb/status.go343
-rw-r--r--internal/db/bundb/status_test.go300
11 files changed, 1581 insertions, 101 deletions
diff --git a/internal/db/bundb/bundb.go b/internal/db/bundb/bundb.go
index 8a3108ef2..bccf5ec98 100644
--- a/internal/db/bundb/bundb.go
+++ b/internal/db/bundb/bundb.go
@@ -336,7 +336,6 @@ func bunDB(sqldb *sql.DB, dialect func() schema.Dialect) *bun.DB {
&gtsmodel.ConversationToStatus{},
&gtsmodel.StatusToEmoji{},
&gtsmodel.StatusToTag{},
- &gtsmodel.ThreadToStatus{},
} {
db.RegisterModel(t)
}
diff --git a/internal/db/bundb/migrations/20231016113235_mute_status_thread.go b/internal/db/bundb/migrations/20231016113235_mute_status_thread.go
index 44eed5c1d..6f7518ba1 100644
--- a/internal/db/bundb/migrations/20231016113235_mute_status_thread.go
+++ b/internal/db/bundb/migrations/20231016113235_mute_status_thread.go
@@ -21,7 +21,7 @@ import (
"context"
"strings"
- gtsmodel "code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
+ gtsmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20231016113235_mute_status_thread"
"code.superseriousbusiness.org/gotosocial/internal/log"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect"
diff --git a/internal/db/bundb/migrations/20231016113235_mute_status_thread/thread.go b/internal/db/bundb/migrations/20231016113235_mute_status_thread/thread.go
new file mode 100644
index 000000000..5d5af1993
--- /dev/null
+++ b/internal/db/bundb/migrations/20231016113235_mute_status_thread/thread.go
@@ -0,0 +1,32 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package gtsmodel
+
+// Thread represents one thread of statuses.
+// TODO: add more fields here if necessary.
+type Thread struct {
+ ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
+ StatusIDs []string `bun:"-"` // ids of statuses belonging to this thread (order not guaranteed)
+}
+
+// ThreadToStatus is an intermediate struct to facilitate the
+// many2many relationship between a thread and one or more statuses.
+type ThreadToStatus struct {
+ ThreadID string `bun:"type:CHAR(26),unique:statusthread,nullzero,notnull"`
+ StatusID string `bun:"type:CHAR(26),unique:statusthread,nullzero,notnull"`
+}
diff --git a/internal/db/bundb/migrations/20231016113235_mute_status_thread/threadmute.go b/internal/db/bundb/migrations/20231016113235_mute_status_thread/threadmute.go
new file mode 100644
index 000000000..170f568a1
--- /dev/null
+++ b/internal/db/bundb/migrations/20231016113235_mute_status_thread/threadmute.go
@@ -0,0 +1,29 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package gtsmodel
+
+import "time"
+
+// ThreadMute represents an account-level mute of a thread of statuses.
+type ThreadMute struct {
+ ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
+ CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
+ UpdatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item last updated
+ ThreadID string `bun:"type:CHAR(26),nullzero,notnull,unique:thread_mute_thread_id_account_id"` // ID of the muted thread
+ AccountID string `bun:"type:CHAR(26),nullzero,notnull,unique:thread_mute_thread_id_account_id"` // Account ID of the creator of this mute
+}
diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go
new file mode 100644
index 000000000..4213da4f2
--- /dev/null
+++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses.go
@@ -0,0 +1,580 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package migrations
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "reflect"
+ "slices"
+ "strings"
+
+ "code.superseriousbusiness.org/gotosocial/internal/db"
+ newmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new"
+ oldmodel "code.superseriousbusiness.org/gotosocial/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old"
+ "code.superseriousbusiness.org/gotosocial/internal/gtserror"
+ "code.superseriousbusiness.org/gotosocial/internal/id"
+ "code.superseriousbusiness.org/gotosocial/internal/log"
+ "github.com/uptrace/bun"
+)
+
+func init() {
+ up := func(ctx context.Context, db *bun.DB) error {
+ newType := reflect.TypeOf(&newmodel.Status{})
+
+ // Get the new column definition with not-null thread_id.
+ newColDef, err := getBunColumnDef(db, newType, "ThreadID")
+ if err != nil {
+ return gtserror.Newf("error getting bun column def: %w", err)
+ }
+
+ // Update column def to use '${name}_new'.
+ newColDef = strings.Replace(newColDef,
+ "thread_id", "thread_id_new", 1)
+
+ var sr statusRethreader
+ var total uint64
+ var maxID string
+ var statuses []*oldmodel.Status
+
+ // Start at largest
+ // possible ULID value.
+ maxID = id.Highest
+
+ log.Warn(ctx, "rethreading top-level statuses, this will take a *long* time")
+ for /* TOP LEVEL STATUS LOOP */ {
+
+ // Reset slice.
+ clear(statuses)
+ statuses = statuses[:0]
+
+ // Select top-level statuses.
+ if err := db.NewSelect().
+ Model(&statuses).
+ Column("id", "thread_id").
+
+ // We specifically use in_reply_to_account_id instead of in_reply_to_id as
+ // they should both be set / unset in unison, but we specifically have an
+ // index on in_reply_to_account_id with ID ordering, unlike in_reply_to_id.
+ Where("? IS NULL", bun.Ident("in_reply_to_account_id")).
+ Where("? < ?", bun.Ident("id"), maxID).
+ OrderExpr("? DESC", bun.Ident("id")).
+ Limit(5000).
+ Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
+ return gtserror.Newf("error selecting top level statuses: %w", err)
+ }
+
+ // Reached end of block.
+ if len(statuses) == 0 {
+ break
+ }
+
+ // Set next maxID value from statuses.
+ maxID = statuses[len(statuses)-1].ID
+
+ // Rethread each selected batch of top-level statuses in a transaction.
+ if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+
+ // Rethread each top-level status.
+ for _, status := range statuses {
+ n, err := sr.rethreadStatus(ctx, tx, status)
+ if err != nil {
+ return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
+ }
+ total += n
+ }
+
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ log.Infof(ctx, "[%d] rethreading statuses (top-level)", total)
+ }
+
+ log.Warn(ctx, "rethreading straggler statuses, this will take a *long* time")
+ for /* STRAGGLER STATUS LOOP */ {
+
+ // Reset slice.
+ clear(statuses)
+ statuses = statuses[:0]
+
+ // Select straggler statuses.
+ if err := db.NewSelect().
+ Model(&statuses).
+ Column("id", "in_reply_to_id", "thread_id").
+ Where("? IS NULL", bun.Ident("thread_id")).
+
+ // We select in smaller batches for this part
+ // of the migration as there is a chance that
+ // we may be fetching statuses that might be
+ // part of the same thread, i.e. one call to
+ // rethreadStatus() may effect other statuses
+ // later in the slice.
+ Limit(1000).
+ Scan(ctx); err != nil && !errors.Is(err, sql.ErrNoRows) {
+ return gtserror.Newf("error selecting straggler statuses: %w", err)
+ }
+
+ // Reached end of block.
+ if len(statuses) == 0 {
+ break
+ }
+
+ // Rethread each selected batch of straggler statuses in a transaction.
+ if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+
+ // Rethread each top-level status.
+ for _, status := range statuses {
+ n, err := sr.rethreadStatus(ctx, tx, status)
+ if err != nil {
+ return gtserror.Newf("error rethreading status %s: %w", status.URI, err)
+ }
+ total += n
+ }
+
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ log.Infof(ctx, "[%d] rethreading statuses (stragglers)", total)
+ }
+
+ // Attempt to merge any sqlite write-ahead-log.
+ if err := doWALCheckpoint(ctx, db); err != nil {
+ return err
+ }
+
+ log.Info(ctx, "dropping old thread_to_statuses table")
+ if _, err := db.NewDropTable().
+ Table("thread_to_statuses").
+ IfExists().
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error dropping old thread_to_statuses table: %w", err)
+ }
+
+ // Run the majority of the thread_id_new -> thread_id migration in a tx.
+ if err := db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
+ log.Info(ctx, "creating new statuses thread_id column")
+ if _, err := tx.NewAddColumn().
+ Table("statuses").
+ ColumnExpr(newColDef).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error creating new thread_id column: %w", err)
+ }
+
+ log.Info(ctx, "setting thread_id_new = thread_id (this may take a while...)")
+ if err := batchUpdateByID(ctx, tx,
+ "statuses", // table
+ "id", // batchByCol
+ "UPDATE ? SET ? = ?", // updateQuery
+ []any{bun.Ident("statuses"),
+ bun.Ident("thread_id_new"),
+ bun.Ident("thread_id")},
+ ); err != nil {
+ return err
+ }
+
+ log.Info(ctx, "dropping old statuses thread_id index")
+ if _, err := tx.NewDropIndex().
+ Index("statuses_thread_id_idx").
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error dropping old thread_id index: %w", err)
+ }
+
+ log.Info(ctx, "dropping old statuses thread_id column")
+ if _, err := tx.NewDropColumn().
+ Table("statuses").
+ Column("thread_id").
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error dropping old thread_id column: %w", err)
+ }
+
+ log.Info(ctx, "renaming thread_id_new to thread_id")
+ if _, err := tx.NewRaw(
+ "ALTER TABLE ? RENAME COLUMN ? TO ?",
+ bun.Ident("statuses"),
+ bun.Ident("thread_id_new"),
+ bun.Ident("thread_id"),
+ ).Exec(ctx); err != nil {
+ return gtserror.Newf("error renaming new column: %w", err)
+ }
+
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ // Attempt to merge any sqlite write-ahead-log.
+ if err := doWALCheckpoint(ctx, db); err != nil {
+ return err
+ }
+
+ log.Info(ctx, "creating new statuses thread_id index")
+ if _, err := db.NewCreateIndex().
+ Table("statuses").
+ Index("statuses_thread_id_idx").
+ Column("thread_id").
+ IfNotExists().
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error creating new thread_id index: %w", err)
+ }
+
+ return nil
+ }
+
+ down := func(ctx context.Context, db *bun.DB) error {
+ return nil
+ }
+
+ if err := Migrations.Register(up, down); err != nil {
+ panic(err)
+ }
+}
+
+type statusRethreader struct {
+ // the unique status and thread IDs
+ // of all models passed to append().
+ // these are later used to update all
+ // statuses to a single thread ID, and
+ // update all thread related models to
+ // use the new updated thread ID.
+ statusIDs []string
+ threadIDs []string
+
+ // stores the unseen IDs of status
+ // InReplyTos newly tracked in append(),
+ // which is then used for a SELECT query
+ // in getParents(), then promptly reset.
+ inReplyToIDs []string
+
+ // statuses simply provides a reusable
+ // slice of status models for selects.
+ // its contents are ephemeral.
+ statuses []*oldmodel.Status
+
+ // seenIDs tracks the unique status and
+ // thread IDs we have seen, ensuring we
+ // don't append duplicates to statusIDs
+ // or threadIDs slices. also helps prevent
+ // adding duplicate parents to inReplyToIDs.
+ seenIDs map[string]struct{}
+
+ // allThreaded tracks whether every status
+ // passed to append() has a thread ID set.
+ // together with len(threadIDs) this can
+ // determine if already threaded correctly.
+ allThreaded bool
+}
+
+// rethreadStatus is the main logic handler for statusRethreader{}. this is what gets called from the migration
+// in order to trigger a status rethreading operation for the given status, returning total number rethreaded.
+func (sr *statusRethreader) rethreadStatus(ctx context.Context, tx bun.Tx, status *oldmodel.Status) (uint64, error) {
+
+ // Zero slice and
+ // map ptr values.
+ clear(sr.statusIDs)
+ clear(sr.threadIDs)
+ clear(sr.statuses)
+ clear(sr.seenIDs)
+
+ // Reset slices and values for use.
+ sr.statusIDs = sr.statusIDs[:0]
+ sr.threadIDs = sr.threadIDs[:0]
+ sr.statuses = sr.statuses[:0]
+ sr.allThreaded = true
+
+ if sr.seenIDs == nil {
+ // Allocate new hash set for status IDs.
+ sr.seenIDs = make(map[string]struct{})
+ }
+
+ // Ensure the passed status
+ // has up-to-date information.
+ // This may have changed from
+ // the initial batch selection
+ // to the rethreadStatus() call.
+ if err := tx.NewSelect().
+ Model(status).
+ Column("in_reply_to_id", "thread_id").
+ Where("? = ?", bun.Ident("id"), status.ID).
+ Scan(ctx); err != nil {
+ return 0, gtserror.Newf("error selecting status: %w", err)
+ }
+
+ // status and thread ID cursor
+ // index values. these are used
+ // to keep track of newly loaded
+ // status / thread IDs between
+ // loop iterations.
+ var statusIdx int
+ var threadIdx int
+
+ // Append given status as
+ // first to our ID slices.
+ sr.append(status)
+
+ for {
+ // Fetch parents for newly seen in_reply_tos since last loop.
+ if err := sr.getParents(ctx, tx); err != nil {
+ return 0, gtserror.Newf("error getting parents: %w", err)
+ }
+
+ // Fetch children for newly seen statuses since last loop.
+ if err := sr.getChildren(ctx, tx, statusIdx); err != nil {
+ return 0, gtserror.Newf("error getting children: %w", err)
+ }
+
+ // Check for newly picked-up threads
+ // to find stragglers for below. Else
+ // we've reached end of what we can do.
+ if threadIdx >= len(sr.threadIDs) {
+ break
+ }
+
+ // Update status IDs cursor.
+ statusIdx = len(sr.statusIDs)
+
+ // Fetch any stragglers for newly seen threads since last loop.
+ if err := sr.getStragglers(ctx, tx, threadIdx); err != nil {
+ return 0, gtserror.Newf("error getting stragglers: %w", err)
+ }
+
+ // Check for newly picked-up straggling statuses / replies to
+ // find parents / children for. Else we've done all we can do.
+ if statusIdx >= len(sr.statusIDs) && len(sr.inReplyToIDs) == 0 {
+ break
+ }
+
+ // Update thread IDs cursor.
+ threadIdx = len(sr.threadIDs)
+ }
+
+ // Total number of
+ // statuses threaded.
+ total := len(sr.statusIDs)
+
+ // Check for the case where the entire
+ // batch of statuses is already correctly
+ // threaded. Then we have nothing to do!
+ if sr.allThreaded && len(sr.threadIDs) == 1 {
+ return 0, nil
+ }
+
+ // Sort all of the threads and
+ // status IDs by age; old -> new.
+ slices.Sort(sr.threadIDs)
+ slices.Sort(sr.statusIDs)
+
+ var threadID string
+
+ if len(sr.threadIDs) > 0 {
+ // Regardless of whether there ended up being
+ // multiple threads, we take the oldest value
+ // thread ID to use for entire batch of them.
+ threadID = sr.threadIDs[0]
+ sr.threadIDs = sr.threadIDs[1:]
+ }
+
+ if threadID == "" {
+ // None of the previous parents were threaded, we instead
+ // generate new thread with ID based on oldest creation time.
+ createdAt, err := id.TimeFromULID(sr.statusIDs[0])
+ if err != nil {
+ return 0, gtserror.Newf("error parsing status ulid: %w", err)
+ }
+
+ // Generate thread ID from parsed time.
+ threadID = id.NewULIDFromTime(createdAt)
+
+ // We need to create a
+ // new thread table entry.
+ if _, err = tx.NewInsert().
+ Model(&newmodel.Thread{ID: threadID}).
+ Exec(ctx); err != nil {
+ return 0, gtserror.Newf("error creating new thread: %w", err)
+ }
+ }
+
+ // Update all the statuses to
+ // use determined thread_id.
+ if _, err := tx.NewUpdate().
+ Table("statuses").
+ Where("? IN (?)", bun.Ident("id"), bun.In(sr.statusIDs)).
+ Set("? = ?", bun.Ident("thread_id"), threadID).
+ Exec(ctx); err != nil {
+ return 0, gtserror.Newf("error updating status thread ids: %w", err)
+ }
+
+ if len(sr.threadIDs) > 0 {
+ // Update any existing thread
+ // mutes to use latest thread_id.
+ if _, err := tx.NewUpdate().
+ Table("thread_mutes").
+ Where("? IN (?)", bun.Ident("thread_id"), bun.In(sr.threadIDs)).
+ Set("? = ?", bun.Ident("thread_id"), threadID).
+ Exec(ctx); err != nil {
+ return 0, gtserror.Newf("error updating mute thread ids: %w", err)
+ }
+ }
+
+ return uint64(total), nil
+}
+
+// append will append the given status to the internal tracking of statusRethreader{} for
+// potential future operations, checking for uniqueness. it tracks the inReplyToID value
+// for the next call to getParents(), it tracks the status ID for list of statuses that
+// need updating, the thread ID for the list of thread links and mutes that need updating,
+// and whether all the statuses all have a provided thread ID (i.e. allThreaded).
+func (sr *statusRethreader) append(status *oldmodel.Status) {
+
+ // Check if status already seen before.
+ if _, ok := sr.seenIDs[status.ID]; ok {
+ return
+ }
+
+ if status.InReplyToID != "" {
+ // Status has a parent, add any unique parent ID
+ // to list of reply IDs that need to be queried.
+ if _, ok := sr.seenIDs[status.InReplyToID]; ok {
+ sr.inReplyToIDs = append(sr.inReplyToIDs, status.InReplyToID)
+ }
+ }
+
+ // Add status' ID to list of seen status IDs.
+ sr.statusIDs = append(sr.statusIDs, status.ID)
+
+ if status.ThreadID != "" {
+ // Status was threaded, add any unique thread
+ // ID to our list of known status thread IDs.
+ if _, ok := sr.seenIDs[status.ThreadID]; !ok {
+ sr.threadIDs = append(sr.threadIDs, status.ThreadID)
+ }
+ } else {
+ // Status was not threaded,
+ // we now know not all statuses
+ // found were threaded.
+ sr.allThreaded = false
+ }
+
+ // Add status ID to map of seen IDs.
+ sr.seenIDs[status.ID] = struct{}{}
+}
+
+func (sr *statusRethreader) getParents(ctx context.Context, tx bun.Tx) error {
+ var parent oldmodel.Status
+
+ // Iteratively query parent for each stored
+ // reply ID. Note this is safe to do as slice
+ // loop since 'seenIDs' prevents duplicates.
+ for i := 0; i < len(sr.inReplyToIDs); i++ {
+
+ // Get next status ID.
+ id := sr.statusIDs[i]
+
+ // Select next parent status.
+ if err := tx.NewSelect().
+ Model(&parent).
+ Column("id", "in_reply_to_id", "thread_id").
+ Where("? = ?", bun.Ident("id"), id).
+ Scan(ctx); err != nil && err != db.ErrNoEntries {
+ return err
+ }
+
+ // Parent was missing.
+ if parent.ID == "" {
+ continue
+ }
+
+ // Add to slices.
+ sr.append(&parent)
+ }
+
+ // Reset reply slice.
+ clear(sr.inReplyToIDs)
+ sr.inReplyToIDs = sr.inReplyToIDs[:0]
+
+ return nil
+}
+
+func (sr *statusRethreader) getChildren(ctx context.Context, tx bun.Tx, idx int) error {
+ // Iteratively query all children for each
+ // of fetched parent statuses. Note this is
+ // safe to do as a slice loop since 'seenIDs'
+ // ensures it only ever contains unique IDs.
+ for i := idx; i < len(sr.statusIDs); i++ {
+
+ // Get next status ID.
+ id := sr.statusIDs[i]
+
+ // Reset child slice.
+ clear(sr.statuses)
+ sr.statuses = sr.statuses[:0]
+
+ // Select children of ID.
+ if err := tx.NewSelect().
+ Model(&sr.statuses).
+ Column("id", "thread_id").
+ Where("? = ?", bun.Ident("in_reply_to_id"), id).
+ Scan(ctx); err != nil && err != db.ErrNoEntries {
+ return err
+ }
+
+ // Append child status IDs to slices.
+ for _, child := range sr.statuses {
+ sr.append(child)
+ }
+ }
+
+ return nil
+}
+
+func (sr *statusRethreader) getStragglers(ctx context.Context, tx bun.Tx, idx int) error {
+ // Check for threads to query.
+ if idx >= len(sr.threadIDs) {
+ return nil
+ }
+
+ // Reset status slice.
+ clear(sr.statuses)
+ sr.statuses = sr.statuses[:0]
+
+ // Select stragglers that
+ // also have thread IDs.
+ if err := tx.NewSelect().
+ Model(&sr.statuses).
+ Column("id", "thread_id", "in_reply_to_id").
+ Where("? IN (?) AND ? NOT IN (?)",
+ bun.Ident("thread_id"),
+ bun.In(sr.threadIDs[idx:]),
+ bun.Ident("id"),
+ bun.In(sr.statusIDs),
+ ).
+ Scan(ctx); err != nil && err != db.ErrNoEntries {
+ return err
+ }
+
+ // Append status IDs to slices.
+ for _, status := range sr.statuses {
+ sr.append(status)
+ }
+
+ return nil
+}
diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go
new file mode 100644
index 000000000..a03e93859
--- /dev/null
+++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/status.go
@@ -0,0 +1,133 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package gtsmodel
+
+import (
+ "time"
+)
+
+// Status represents a user-created 'post' or 'status' in the database, either remote or local
+type Status struct {
+ ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
+ CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
+ EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set)
+ FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched.
+ PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time.
+ URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status
+ URL string `bun:",nullzero"` // web url for viewing this status
+ Content string `bun:""` // Content HTML for this status.
+ AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status
+ TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status
+ MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status
+ EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status
+ Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account?
+ AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status?
+ AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status
+ InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to
+ InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to
+ InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to
+ InReplyTo *Status `bun:"-"` // status corresponding to inReplyToID
+ BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of
+ BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes.
+ BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status
+ BoostOf *Status `bun:"-"` // status that corresponds to boostOfID
+ ThreadID string `bun:"type:CHAR(26),nullzero,notnull,default:00000000000000000000000000"` // id of the thread to which this status belongs
+ EditIDs []string `bun:"edits,array"` //
+ PollID string `bun:"type:CHAR(26),nullzero"` //
+ ContentWarning string `bun:",nullzero"` // Content warning HTML for this status.
+ ContentWarningText string `bun:""` // Original text of the content warning without formatting
+ Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status
+ Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive?
+ Language string `bun:",nullzero"` // what language is this status written in?
+ CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status?
+ ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!.
+ Text string `bun:""` // Original text of the status without formatting
+ ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status
+ Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s)
+ PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
+ PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
+ ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to.
+}
+
+// enumType is the type we (at least, should) use
+// for database enum types. it is the largest size
+// supported by a PostgreSQL SMALLINT, since an
+// SQLite SMALLINT is actually variable in size.
+type enumType int16
+
+// Visibility represents the
+// visibility granularity of a status.
+type Visibility enumType
+
+const (
+ // VisibilityNone means nobody can see this.
+ // It's only used for web status visibility.
+ VisibilityNone Visibility = 1
+
+ // VisibilityPublic means this status will
+ // be visible to everyone on all timelines.
+ VisibilityPublic Visibility = 2
+
+ // VisibilityUnlocked means this status will be visible to everyone,
+ // but will only show on home timeline to followers, and in lists.
+ VisibilityUnlocked Visibility = 3
+
+ // VisibilityFollowersOnly means this status is viewable to followers only.
+ VisibilityFollowersOnly Visibility = 4
+
+ // VisibilityMutualsOnly means this status
+ // is visible to mutual followers only.
+ VisibilityMutualsOnly Visibility = 5
+
+ // VisibilityDirect means this status is
+ // visible only to mentioned recipients.
+ VisibilityDirect Visibility = 6
+
+ // VisibilityDefault is used when no other setting can be found.
+ VisibilityDefault Visibility = VisibilityUnlocked
+)
+
+// String returns a stringified, frontend API compatible form of Visibility.
+func (v Visibility) String() string {
+ switch v {
+ case VisibilityNone:
+ return "none"
+ case VisibilityPublic:
+ return "public"
+ case VisibilityUnlocked:
+ return "unlocked"
+ case VisibilityFollowersOnly:
+ return "followers_only"
+ case VisibilityMutualsOnly:
+ return "mutuals_only"
+ case VisibilityDirect:
+ return "direct"
+ default:
+ panic("invalid visibility")
+ }
+}
+
+// StatusContentType is the content type with which a status's text is
+// parsed. Can be either plain or markdown. Empty will default to plain.
+type StatusContentType enumType
+
+const (
+ StatusContentTypePlain StatusContentType = 1
+ StatusContentTypeMarkdown StatusContentType = 2
+ StatusContentTypeDefault = StatusContentTypePlain
+)
diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/thread.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/thread.go
new file mode 100644
index 000000000..319752476
--- /dev/null
+++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/new/thread.go
@@ -0,0 +1,24 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package gtsmodel
+
+// Thread represents one thread of statuses.
+// TODO: add more fields here if necessary.
+type Thread struct {
+ ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
+}
diff --git a/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go
new file mode 100644
index 000000000..f33a2b29e
--- /dev/null
+++ b/internal/db/bundb/migrations/20250415111056_thread_all_statuses/old/status.go
@@ -0,0 +1,131 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package gtsmodel
+
+import (
+ "time"
+)
+
+// Status represents a user-created 'post' or 'status' in the database, either remote or local
+type Status struct {
+ ID string `bun:"type:CHAR(26),pk,nullzero,notnull,unique"` // id of this item in the database
+ CreatedAt time.Time `bun:"type:timestamptz,nullzero,notnull,default:current_timestamp"` // when was item created
+ EditedAt time.Time `bun:"type:timestamptz,nullzero"` // when this status was last edited (if set)
+ FetchedAt time.Time `bun:"type:timestamptz,nullzero"` // when was item (remote) last fetched.
+ PinnedAt time.Time `bun:"type:timestamptz,nullzero"` // Status was pinned by owning account at this time.
+ URI string `bun:",unique,nullzero,notnull"` // activitypub URI of this status
+ URL string `bun:",nullzero"` // web url for viewing this status
+ Content string `bun:""` // Content HTML for this status.
+ AttachmentIDs []string `bun:"attachments,array"` // Database IDs of any media attachments associated with this status
+ TagIDs []string `bun:"tags,array"` // Database IDs of any tags used in this status
+ MentionIDs []string `bun:"mentions,array"` // Database IDs of any mentions in this status
+ EmojiIDs []string `bun:"emojis,array"` // Database IDs of any emojis used in this status
+ Local *bool `bun:",nullzero,notnull,default:false"` // is this status from a local account?
+ AccountID string `bun:"type:CHAR(26),nullzero,notnull"` // which account posted this status?
+ AccountURI string `bun:",nullzero,notnull"` // activitypub uri of the owner of this status
+ InReplyToID string `bun:"type:CHAR(26),nullzero"` // id of the status this status replies to
+ InReplyToURI string `bun:",nullzero"` // activitypub uri of the status this status is a reply to
+ InReplyToAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that this status replies to
+ BoostOfID string `bun:"type:CHAR(26),nullzero"` // id of the status this status is a boost of
+ BoostOfURI string `bun:"-"` // URI of the status this status is a boost of; field not inserted in the db, just for dereferencing purposes.
+ BoostOfAccountID string `bun:"type:CHAR(26),nullzero"` // id of the account that owns the boosted status
+ ThreadID string `bun:"type:CHAR(26),nullzero"` // id of the thread to which this status belongs; only set for remote statuses if a local account is involved at some point in the thread, otherwise null
+ EditIDs []string `bun:"edits,array"` //
+ PollID string `bun:"type:CHAR(26),nullzero"` //
+ ContentWarning string `bun:",nullzero"` // Content warning HTML for this status.
+ ContentWarningText string `bun:""` // Original text of the content warning without formatting
+ Visibility Visibility `bun:",nullzero,notnull"` // visibility entry for this status
+ Sensitive *bool `bun:",nullzero,notnull,default:false"` // mark the status as sensitive?
+ Language string `bun:",nullzero"` // what language is this status written in?
+ CreatedWithApplicationID string `bun:"type:CHAR(26),nullzero"` // Which application was used to create this status?
+ ActivityStreamsType string `bun:",nullzero,notnull"` // What is the activitystreams type of this status? See: https://www.w3.org/TR/activitystreams-vocabulary/#object-types. Will probably almost always be Note but who knows!.
+ Text string `bun:""` // Original text of the status without formatting
+ ContentType StatusContentType `bun:",nullzero"` // Content type used to process the original text of the status
+ Federated *bool `bun:",notnull"` // This status will be federated beyond the local timeline(s)
+ PendingApproval *bool `bun:",nullzero,notnull,default:false"` // If true then status is a reply or boost wrapper that must be Approved by the reply-ee or boost-ee before being fully distributed.
+ PreApproved bool `bun:"-"` // If true, then status is a reply to or boost wrapper of a status on our instance, has permission to do the interaction, and an Accept should be sent out for it immediately. Field not stored in the DB.
+ ApprovedByURI string `bun:",nullzero"` // URI of an Accept Activity that approves the Announce or Create Activity that this status was/will be attached to.
+}
+
+// enumType is the type we (at least, should) use
+// for database enum types. it is the largest size
+// supported by a PostgreSQL SMALLINT, since an
+// SQLite SMALLINT is actually variable in size.
+type enumType int16
+
+// Visibility represents the
+// visibility granularity of a status.
+type Visibility enumType
+
+const (
+ // VisibilityNone means nobody can see this.
+ // It's only used for web status visibility.
+ VisibilityNone Visibility = 1
+
+ // VisibilityPublic means this status will
+ // be visible to everyone on all timelines.
+ VisibilityPublic Visibility = 2
+
+ // VisibilityUnlocked means this status will be visible to everyone,
+ // but will only show on home timeline to followers, and in lists.
+ VisibilityUnlocked Visibility = 3
+
+ // VisibilityFollowersOnly means this status is viewable to followers only.
+ VisibilityFollowersOnly Visibility = 4
+
+ // VisibilityMutualsOnly means this status
+ // is visible to mutual followers only.
+ VisibilityMutualsOnly Visibility = 5
+
+ // VisibilityDirect means this status is
+ // visible only to mentioned recipients.
+ VisibilityDirect Visibility = 6
+
+ // VisibilityDefault is used when no other setting can be found.
+ VisibilityDefault Visibility = VisibilityUnlocked
+)
+
+// String returns a stringified, frontend API compatible form of Visibility.
+func (v Visibility) String() string {
+ switch v {
+ case VisibilityNone:
+ return "none"
+ case VisibilityPublic:
+ return "public"
+ case VisibilityUnlocked:
+ return "unlocked"
+ case VisibilityFollowersOnly:
+ return "followers_only"
+ case VisibilityMutualsOnly:
+ return "mutuals_only"
+ case VisibilityDirect:
+ return "direct"
+ default:
+ panic("invalid visibility")
+ }
+}
+
+// StatusContentType is the content type with which a status's text is
+// parsed. Can be either plain or markdown. Empty will default to plain.
+type StatusContentType enumType
+
+const (
+ StatusContentTypePlain StatusContentType = 1
+ StatusContentTypeMarkdown StatusContentType = 2
+ StatusContentTypeDefault = StatusContentTypePlain
+)
diff --git a/internal/db/bundb/migrations/util.go b/internal/db/bundb/migrations/util.go
index 3219a8aa7..8da861df7 100644
--- a/internal/db/bundb/migrations/util.go
+++ b/internal/db/bundb/migrations/util.go
@@ -26,6 +26,7 @@ import (
"strconv"
"strings"
+ "code.superseriousbusiness.org/gotosocial/internal/config"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/id"
"code.superseriousbusiness.org/gotosocial/internal/log"
@@ -37,6 +38,112 @@ import (
"github.com/uptrace/bun/schema"
)
+// doWALCheckpoint attempt to force a WAL file merge on SQLite3,
+// which can be useful given how much can build-up in the WAL.
+//
+// see: https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
+func doWALCheckpoint(ctx context.Context, db *bun.DB) error {
+ if db.Dialect().Name() == dialect.SQLite && strings.EqualFold(config.GetDbSqliteJournalMode(), "WAL") {
+ _, err := db.ExecContext(ctx, "PRAGMA wal_checkpoint(RESTART);")
+ if err != nil {
+ return gtserror.Newf("error performing wal_checkpoint: %w", err)
+ }
+ }
+ return nil
+}
+
+// batchUpdateByID performs the given updateQuery with updateArgs
+// over the entire given table, batching by the ID of batchByCol.
+func batchUpdateByID(
+ ctx context.Context,
+ tx bun.Tx,
+ table string,
+ batchByCol string,
+ updateQuery string,
+ updateArgs []any,
+) error {
+ // Get a count of all in table.
+ total, err := tx.NewSelect().
+ Table(table).
+ Count(ctx)
+ if err != nil {
+ return gtserror.Newf("error selecting total count: %w", err)
+ }
+
+ // Query batch size
+ // in number of rows.
+ const batchsz = 5000
+
+ // Stores highest batch value
+ // used in iterate queries,
+ // starting at highest possible.
+ highest := id.Highest
+
+ // Total updated rows.
+ var updated int
+
+ for {
+ // Limit to batchsz
+ // items at once.
+ batchQ := tx.
+ NewSelect().
+ Table(table).
+ Column(batchByCol).
+ Where("? < ?", bun.Ident(batchByCol), highest).
+ OrderExpr("? DESC", bun.Ident(batchByCol)).
+ Limit(batchsz)
+
+ // Finalize UPDATE to act only on batch.
+ qStr := updateQuery + " WHERE ? IN (?)"
+ args := append(slices.Clone(updateArgs),
+ bun.Ident(batchByCol),
+ batchQ,
+ )
+
+ // Execute the prepared raw query with arguments.
+ res, err := tx.NewRaw(qStr, args...).Exec(ctx)
+ if err != nil {
+ return gtserror.Newf("error updating old column values: %w", err)
+ }
+
+ // Check how many items we updated.
+ thisUpdated, err := res.RowsAffected()
+ if err != nil {
+ return gtserror.Newf("error counting affected rows: %w", err)
+ }
+
+ if thisUpdated == 0 {
+ // Nothing updated
+ // means we're done.
+ break
+ }
+
+ // Update the overall count.
+ updated += int(thisUpdated)
+
+ // Log helpful message to admin.
+ log.Infof(ctx, "migrated %d of %d %s (up to %s)",
+ updated, total, table, highest)
+
+ // Get next highest
+ // id for next batch.
+ if err := tx.
+ NewSelect().
+ With("batch_query", batchQ).
+ ColumnExpr("min(?) FROM ?", bun.Ident(batchByCol), bun.Ident("batch_query")).
+ Scan(ctx, &highest); err != nil {
+ return gtserror.Newf("error selecting next highest: %w", err)
+ }
+ }
+
+ if total != int(updated) {
+ // Return error here in order to rollback the whole transaction.
+ return fmt.Errorf("total=%d does not match updated=%d", total, updated)
+ }
+
+ return nil
+}
+
// convertEnums performs a transaction that converts
// a table's column of our old-style enums (strings) to
// more performant and space-saving integer types.
diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go
index cf4a2549a..81aba8726 100644
--- a/internal/db/bundb/status.go
+++ b/internal/db/bundb/status.go
@@ -21,11 +21,13 @@ import (
"context"
"errors"
"slices"
+ "strings"
"code.superseriousbusiness.org/gotosocial/internal/db"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
+ "code.superseriousbusiness.org/gotosocial/internal/id"
"code.superseriousbusiness.org/gotosocial/internal/log"
"code.superseriousbusiness.org/gotosocial/internal/state"
"code.superseriousbusiness.org/gotosocial/internal/util/xslices"
@@ -335,115 +337,284 @@ func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) error
// as the cache does not attempt a mutex lock until AFTER hook.
//
return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
- // create links between this status and any emojis it uses
- for _, i := range status.EmojiIDs {
- if _, err := tx.
- NewInsert().
- Model(&gtsmodel.StatusToEmoji{
- StatusID: status.ID,
- EmojiID: i,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ if status.BoostOfID != "" {
+ var threadID string
+
+ // Boost wrappers always inherit thread
+ // of the origin status they're boosting.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("id"), status.BoostOfID).
+ Scan(ctx, &threadID); err != nil {
+ return gtserror.Newf("error selecting boosted status: %w", err)
}
+
+ // Set the selected thread.
+ status.ThreadID = threadID
+
+ // They also require no further
+ // checks! Simply insert status here.
+ return insertStatus(ctx, tx, status)
}
- // create links between this status and any tags it uses
- for _, i := range status.TagIDs {
- if _, err := tx.
- NewInsert().
- Model(&gtsmodel.StatusToTag{
- StatusID: status.ID,
- TagID: i,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ // Gather a list of possible thread IDs
+ // of all the possible related statuses
+ // to this one. If one exists we can use
+ // the end result, and if too many exist
+ // we can fix the status threading.
+ var threadIDs []string
+
+ if status.InReplyToID != "" {
+ var threadID string
+
+ // A stored parent status exists,
+ // select its thread ID to ideally
+ // inherit this for status.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("id"), status.InReplyToID).
+ Scan(ctx, &threadID); err != nil {
+ return gtserror.Newf("error selecting status parent: %w", err)
+ }
+
+ // Append possible ID to threads slice.
+ threadIDs = append(threadIDs, threadID)
+
+ } else if status.InReplyToURI != "" {
+ var ids []string
+
+ // A parent status exists but is not
+ // yet stored. See if any siblings for
+ // this shared parent exist with their
+ // own thread IDs.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("in_reply_to_uri"), status.InReplyToURI).
+ Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("error selecting status siblings: %w", err)
}
+
+ // Append possible IDs to threads slice.
+ threadIDs = append(threadIDs, ids...)
}
- // change the status ID of the media
- // attachments to the current status
- for _, a := range status.Attachments {
- a.StatusID = status.ID
- if _, err := tx.
- NewUpdate().
- Model(a).
- Column("status_id").
- Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ if !*status.Local {
+ var ids []string
+
+ // For remote statuses specifically, check to
+ // see if any children are stored for this new
+ // stored parent with their own thread IDs.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("in_reply_to_uri"), status.URI).
+ Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("error selecting status children: %w", err)
}
+
+ // Append possible IDs to threads slice.
+ threadIDs = append(threadIDs, ids...)
}
- // If the status is threaded, create
- // link between thread and status.
- if status.ThreadID != "" {
+ // Ensure only *unique* posssible thread IDs.
+ threadIDs = xslices.Deduplicate(threadIDs)
+ switch len(threadIDs) {
+
+ case 0:
+ // No related status with thread ID already exists,
+ // so create new thread ID from status creation time.
+ threadID := id.NewULIDFromTime(status.CreatedAt)
+
+ // Insert new thread.
if _, err := tx.
NewInsert().
- Model(&gtsmodel.ThreadToStatus{
- ThreadID: status.ThreadID,
- StatusID: status.ID,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")).
+ Model(&gtsmodel.Thread{ID: threadID}).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return gtserror.Newf("error inserting thread: %w", err)
+ }
+
+ // Update status thread ID.
+ status.ThreadID = threadID
+
+ case 1:
+ // Inherit single known thread.
+ status.ThreadID = threadIDs[0]
+
+ default:
+ var err error
+ log.Infof(ctx, "reconciling status threading for %s: [%s]", status.URI, strings.Join(threadIDs, ","))
+ status.ThreadID, err = s.fixStatusThreading(ctx, tx, threadIDs)
+ if err != nil {
+ return err
}
}
- // Finally, insert the status
- _, err := tx.NewInsert().
- Model(status).
- Exec(ctx)
- return err
+ // And after threading, insert status.
+ // This will error if ThreadID is unset.
+ return insertStatus(ctx, tx, status)
})
})
}
+// fixStatusThreading can be called to reconcile statuses in the same thread but known to be using multiple given threads.
+func (s *statusDB) fixStatusThreading(ctx context.Context, tx bun.Tx, threadIDs []string) (string, error) {
+ if len(threadIDs) <= 1 {
+ panic("invalid call to fixStatusThreading()")
+ }
+
+ // Sort ascending, i.e.
+ // oldest thread ID first.
+ slices.Sort(threadIDs)
+
+ // Drop the oldest thread ID
+ // from slice, we'll keep this.
+ threadID := threadIDs[0]
+ threadIDs = threadIDs[1:]
+
+ // On updates, gather IDs of changed model
+ // IDs for later stage of cache invalidation,
+ // preallocating slices for worst-case scenarios.
+ statusIDs := make([]string, 0, 4*len(threadIDs))
+ muteIDs := make([]string, 0, 4*len(threadIDs))
+
+ // Update all statuses with
+ // thread IDs to use oldest.
+ if _, err := tx.
+ NewUpdate().
+ Table("statuses").
+ Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
+ Set("? = ?", bun.Ident("thread_id"), threadID).
+ Returning("?", bun.Ident("id")).
+ Exec(ctx, &statusIDs); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return "", gtserror.Newf("error updating statuses: %w", err)
+ }
+
+ // Update all thread mutes with
+ // thread IDs to use oldest.
+ if _, err := tx.
+ NewUpdate().
+ Table("thread_mutes").
+ Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
+ Set("? = ?", bun.Ident("thread_id"), threadID).
+ Returning("?", bun.Ident("id")).
+ Exec(ctx, &muteIDs); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return "", gtserror.Newf("error updating thread mutes: %w", err)
+ }
+
+ // Delete all now
+ // unused thread IDs.
+ if _, err := tx.
+ NewDelete().
+ Table("threads").
+ Where("? IN (?)", bun.Ident("id"), bun.In(threadIDs)).
+ Exec(ctx); err != nil {
+ return "", gtserror.Newf("error deleting threads: %w", err)
+ }
+
+ // Invalidate caches for changed statuses and mutes.
+ s.state.Caches.DB.Status.InvalidateIDs("ID", statusIDs)
+ s.state.Caches.DB.ThreadMute.InvalidateIDs("ID", muteIDs)
+
+ return threadID, nil
+}
+
+// insertStatus handles the base status insert logic, that is the status itself,
+// any intermediary table links, and updating media attachments to point to status.
+func insertStatus(ctx context.Context, tx bun.Tx, status *gtsmodel.Status) error {
+
+ // create links between this
+ // status and any emojis it uses
+ for _, id := range status.EmojiIDs {
+ if _, err := tx.
+ NewInsert().
+ Model(&gtsmodel.StatusToEmoji{
+ StatusID: status.ID,
+ EmojiID: id,
+ }).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error inserting status_to_emoji: %w", err)
+ }
+ }
+
+ // create links between this
+ // status and any tags it uses
+ for _, id := range status.TagIDs {
+ if _, err := tx.
+ NewInsert().
+ Model(&gtsmodel.StatusToTag{
+ StatusID: status.ID,
+ TagID: id,
+ }).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error inserting status_to_tag: %w", err)
+ }
+ }
+
+ // change the status ID of the media
+ // attachments to the current status
+ for _, a := range status.Attachments {
+ a.StatusID = status.ID
+ if _, err := tx.
+ NewUpdate().
+ Model(a).
+ Column("status_id").
+ Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error updating media: %w", err)
+ }
+ }
+
+ // Finally, insert the status
+ if _, err := tx.NewInsert().
+ Model(status).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error inserting status: %w", err)
+ }
+
+ return nil
+}
+
func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, columns ...string) error {
return s.state.Caches.DB.Status.Store(status, func() error {
// It is safe to run this database transaction within cache.Store
// as the cache does not attempt a mutex lock until AFTER hook.
//
return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
- // create links between this status and any emojis it uses
- for _, i := range status.EmojiIDs {
+
+ // create links between this
+ // status and any emojis it uses
+ for _, id := range status.EmojiIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToEmoji{
StatusID: status.ID,
- EmojiID: i,
+ EmojiID: id,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return err
}
}
- // create links between this status and any tags it uses
- for _, i := range status.TagIDs {
+ // create links between this
+ // status and any tags it uses
+ for _, id := range status.TagIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToTag{
StatusID: status.ID,
- TagID: i,
+ TagID: id,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return err
}
}
@@ -457,26 +628,7 @@ func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, co
Column("status_id").
Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
- }
- }
-
- // If the status is threaded, create
- // link between thread and status.
- if status.ThreadID != "" {
- if _, err := tx.
- NewInsert().
- Model(&gtsmodel.ThreadToStatus{
- ThreadID: status.ThreadID,
- StatusID: status.ID,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return err
}
}
@@ -499,7 +651,9 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
// Delete status from database and any related links in a transaction.
if err := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
- // delete links between this status and any emojis it uses
+
+ // delete links between this
+ // status and any emojis it uses
if _, err := tx.
NewDelete().
TableExpr("? AS ?", bun.Ident("status_to_emojis"), bun.Ident("status_to_emoji")).
@@ -508,7 +662,8 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
return err
}
- // delete links between this status and any tags it uses
+ // delete links between this
+ // status and any tags it uses
if _, err := tx.
NewDelete().
TableExpr("? AS ?", bun.Ident("status_to_tags"), bun.Ident("status_to_tag")).
@@ -517,16 +672,6 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
return err
}
- // Delete links between this status
- // and any threads it was a part of.
- if _, err := tx.
- NewDelete().
- TableExpr("? AS ?", bun.Ident("thread_to_statuses"), bun.Ident("thread_to_status")).
- Where("? = ?", bun.Ident("thread_to_status.status_id"), id).
- Exec(ctx); err != nil {
- return err
- }
-
// delete the status itself
if _, err := tx.
NewDelete().
diff --git a/internal/db/bundb/status_test.go b/internal/db/bundb/status_test.go
index 9c1eb73bd..7d33763df 100644
--- a/internal/db/bundb/status_test.go
+++ b/internal/db/bundb/status_test.go
@@ -21,8 +21,12 @@ import (
"testing"
"time"
+ "code.superseriousbusiness.org/gotosocial/internal/ap"
"code.superseriousbusiness.org/gotosocial/internal/db"
+ "code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
+ "code.superseriousbusiness.org/gotosocial/internal/id"
+ "code.superseriousbusiness.org/gotosocial/internal/util"
"github.com/stretchr/testify/suite"
)
@@ -253,6 +257,302 @@ func (suite *StatusTestSuite) TestPutPopulatedStatus() {
)
}
+func (suite *StatusTestSuite) TestPutStatusThreadingBoostOfIDSet() {
+ ctx := suite.T().Context()
+
+ // Fake account details.
+ accountID := id.NewULID()
+ accountURI := "https://example.com/users/" + accountID
+
+ var err error
+
+ // Prepare new status.
+ statusID := id.NewULID()
+ statusURI := accountURI + "/statuses/" + statusID
+ status := &gtsmodel.Status{
+ ID: statusID,
+ URI: statusURI,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ }
+
+ // Insert original status into database.
+ err = suite.db.PutStatus(ctx, status)
+ suite.NoError(err)
+ suite.NotEmpty(status.ThreadID)
+
+ // Prepare new boost.
+ boostID := id.NewULID()
+ boostURI := accountURI + "/statuses/" + boostID
+ boost := &gtsmodel.Status{
+ ID: boostID,
+ URI: boostURI,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ BoostOfID: statusID,
+ BoostOfAccountID: accountID,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ }
+
+ // Insert boost wrapper into database.
+ err = suite.db.PutStatus(ctx, boost)
+ suite.NoError(err)
+
+ // Boost wrapper should have inherited thread.
+ suite.Equal(status.ThreadID, boost.ThreadID)
+}
+
+func (suite *StatusTestSuite) TestPutStatusThreadingInReplyToIDSet() {
+ ctx := suite.T().Context()
+
+ // Fake account details.
+ accountID := id.NewULID()
+ accountURI := "https://example.com/users/" + accountID
+
+ var err error
+
+ // Prepare new status.
+ statusID := id.NewULID()
+ statusURI := accountURI + "/statuses/" + statusID
+ status := &gtsmodel.Status{
+ ID: statusID,
+ URI: statusURI,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ }
+
+ // Insert original status into database.
+ err = suite.db.PutStatus(ctx, status)
+ suite.NoError(err)
+ suite.NotEmpty(status.ThreadID)
+
+ // Prepare new reply.
+ replyID := id.NewULID()
+ replyURI := accountURI + "/statuses/" + replyID
+ reply := &gtsmodel.Status{
+ ID: replyID,
+ URI: replyURI,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ InReplyToID: statusID,
+ InReplyToURI: statusURI,
+ InReplyToAccountID: accountID,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ }
+
+ // Insert status reply into database.
+ err = suite.db.PutStatus(ctx, reply)
+ suite.NoError(err)
+
+ // Status reply should have inherited thread.
+ suite.Equal(status.ThreadID, reply.ThreadID)
+}
+
+func (suite *StatusTestSuite) TestPutStatusThreadingSiblings() {
+ ctx := suite.T().Context()
+
+ // Fake account details.
+ accountID := id.NewULID()
+ accountURI := "https://example.com/users/" + accountID
+
+ // Main parent status ID.
+ statusID := id.NewULID()
+ statusURI := accountURI + "/statuses/" + statusID
+ status := &gtsmodel.Status{
+ ID: statusID,
+ URI: statusURI,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ }
+
+ const siblingCount = 10
+ var statuses []*gtsmodel.Status
+ for range siblingCount {
+ id := id.NewULID()
+ uri := accountURI + "/statuses/" + id
+
+ // Note here that inReplyToID not being set,
+ // so as they get inserted it's as if children
+ // are being dereferenced ahead of stored parent.
+ //
+ // Which is where out-of-sync threads can occur.
+ statuses = append(statuses, &gtsmodel.Status{
+ ID: id,
+ URI: uri,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ InReplyToURI: statusURI,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ })
+ }
+
+ var err error
+ var threadID string
+
+ // Insert all of the sibling children
+ // into the database, they should all
+ // still get correctly threaded together.
+ for _, child := range statuses {
+ err = suite.db.PutStatus(ctx, child)
+ suite.NoError(err)
+ suite.NotEmpty(child.ThreadID)
+ if threadID == "" {
+ threadID = child.ThreadID
+ } else {
+ suite.Equal(threadID, child.ThreadID)
+ }
+ }
+
+ // Finally, insert the parent status.
+ err = suite.db.PutStatus(ctx, status)
+ suite.NoError(err)
+
+ // Parent should have inherited thread.
+ suite.Equal(threadID, status.ThreadID)
+}
+
+func (suite *StatusTestSuite) TestPutStatusThreadingReconcile() {
+ ctx := suite.T().Context()
+
+ // Fake account details.
+ accountID := id.NewULID()
+ accountURI := "https://example.com/users/" + accountID
+
+ const threadLength = 10
+ var statuses []*gtsmodel.Status
+ var lastURI, lastID string
+
+ // Generate front-half of thread.
+ for range threadLength / 2 {
+ id := id.NewULID()
+ uri := accountURI + "/statuses/" + id
+ statuses = append(statuses, &gtsmodel.Status{
+ ID: id,
+ URI: uri,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ InReplyToID: lastID,
+ InReplyToURI: lastURI,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ })
+ lastURI = uri
+ lastID = id
+ }
+
+ // Generate back-half of thread.
+ //
+ // Note here that inReplyToID not being set past
+ // the first item, so as they get inserted it's
+ // as if the children are dereferenced ahead of
+ // the stored parent, i.e. an out-of-sync thread.
+ for range threadLength / 2 {
+ id := id.NewULID()
+ uri := accountURI + "/statuses/" + id
+ statuses = append(statuses, &gtsmodel.Status{
+ ID: id,
+ URI: uri,
+ AccountID: accountID,
+ AccountURI: accountURI,
+ InReplyToID: lastID,
+ InReplyToURI: lastURI,
+ Local: util.Ptr(false),
+ Federated: util.Ptr(true),
+ ActivityStreamsType: ap.ObjectNote,
+ })
+ lastURI = uri
+ lastID = ""
+ }
+
+ var err error
+
+ // Thread IDs we expect to see for
+ // head statuses as we add them, and
+ // for tail statuses as we add them.
+ var thread0, threadN string
+
+ // Insert status thread from head and tail,
+ // specifically stopping before the middle.
+ // These should each get threaded separately.
+ for i := range (threadLength / 2) - 1 {
+ i0, iN := i, len(statuses)-1-i
+
+ // Insert i'th status from the start.
+ err = suite.db.PutStatus(ctx, statuses[i0])
+ suite.NoError(err)
+ suite.NotEmpty(statuses[i0].ThreadID)
+
+ // Check i0 thread.
+ if thread0 == "" {
+ thread0 = statuses[i0].ThreadID
+ } else {
+ suite.Equal(thread0, statuses[i0].ThreadID)
+ }
+
+ // Insert i'th status from the end.
+ err = suite.db.PutStatus(ctx, statuses[iN])
+ suite.NoError(err)
+ suite.NotEmpty(statuses[iN].ThreadID)
+
+ // Check iN thread.
+ if threadN == "" {
+ threadN = statuses[iN].ThreadID
+ } else {
+ suite.Equal(threadN, statuses[iN].ThreadID)
+ }
+ }
+
+ // Finally, insert remaining statuses,
+ // at some point among these it should
+ // trigger a status thread reconcile.
+ for _, status := range statuses {
+
+ if status.ThreadID != "" {
+ // already inserted
+ continue
+ }
+
+ // Insert remaining status into db.
+ err = suite.db.PutStatus(ctx, status)
+ suite.NoError(err)
+ }
+
+ // The reconcile should pick the older,
+ // i.e. smaller of two ULID thread IDs.
+ finalThreadID := min(thread0, threadN)
+ for _, status := range statuses {
+
+ // Get ID of status.
+ id := status.ID
+
+ // Fetch latest status the from database.
+ status, err := suite.db.GetStatusByID(
+ gtscontext.SetBarebones(ctx),
+ id,
+ )
+ suite.NoError(err)
+
+ // Ensure after reconcile uses expected thread.
+ suite.Equal(finalThreadID, status.ThreadID)
+ }
+}
+
func TestStatusTestSuite(t *testing.T) {
suite.Run(t, new(StatusTestSuite))
}