summaryrefslogtreecommitdiff
path: root/internal/db/bundb/status.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/db/bundb/status.go')
-rw-r--r--internal/db/bundb/status.go343
1 files changed, 244 insertions, 99 deletions
diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go
index cf4a2549a..81aba8726 100644
--- a/internal/db/bundb/status.go
+++ b/internal/db/bundb/status.go
@@ -21,11 +21,13 @@ import (
"context"
"errors"
"slices"
+ "strings"
"code.superseriousbusiness.org/gotosocial/internal/db"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
+ "code.superseriousbusiness.org/gotosocial/internal/id"
"code.superseriousbusiness.org/gotosocial/internal/log"
"code.superseriousbusiness.org/gotosocial/internal/state"
"code.superseriousbusiness.org/gotosocial/internal/util/xslices"
@@ -335,115 +337,284 @@ func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) error
// as the cache does not attempt a mutex lock until AFTER hook.
//
return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
- // create links between this status and any emojis it uses
- for _, i := range status.EmojiIDs {
- if _, err := tx.
- NewInsert().
- Model(&gtsmodel.StatusToEmoji{
- StatusID: status.ID,
- EmojiID: i,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ if status.BoostOfID != "" {
+ var threadID string
+
+ // Boost wrappers always inherit thread
+ // of the origin status they're boosting.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("id"), status.BoostOfID).
+ Scan(ctx, &threadID); err != nil {
+ return gtserror.Newf("error selecting boosted status: %w", err)
}
+
+ // Set the selected thread.
+ status.ThreadID = threadID
+
+ // They also require no further
+ // checks! Simply insert status here.
+ return insertStatus(ctx, tx, status)
}
- // create links between this status and any tags it uses
- for _, i := range status.TagIDs {
- if _, err := tx.
- NewInsert().
- Model(&gtsmodel.StatusToTag{
- StatusID: status.ID,
- TagID: i,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ // Gather a list of possible thread IDs
+ // of all the possible related statuses
+ // to this one. If one exists we can use
+ // the end result, and if too many exist
+ // we can fix the status threading.
+ var threadIDs []string
+
+ if status.InReplyToID != "" {
+ var threadID string
+
+ // A stored parent status exists,
+ // select its thread ID to ideally
+ // inherit this for status.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("id"), status.InReplyToID).
+ Scan(ctx, &threadID); err != nil {
+ return gtserror.Newf("error selecting status parent: %w", err)
+ }
+
+ // Append possible ID to threads slice.
+ threadIDs = append(threadIDs, threadID)
+
+ } else if status.InReplyToURI != "" {
+ var ids []string
+
+ // A parent status exists but is not
+ // yet stored. See if any siblings for
+ // this shared parent exist with their
+ // own thread IDs.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("in_reply_to_uri"), status.InReplyToURI).
+ Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("error selecting status siblings: %w", err)
}
+
+ // Append possible IDs to threads slice.
+ threadIDs = append(threadIDs, ids...)
}
- // change the status ID of the media
- // attachments to the current status
- for _, a := range status.Attachments {
- a.StatusID = status.ID
- if _, err := tx.
- NewUpdate().
- Model(a).
- Column("status_id").
- Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ if !*status.Local {
+ var ids []string
+
+ // For remote statuses specifically, check to
+ // see if any children are stored for this new
+ // stored parent with their own thread IDs.
+ if err := tx.
+ NewSelect().
+ Table("statuses").
+ Column("thread_id").
+ Where("? = ?", bun.Ident("in_reply_to_uri"), status.URI).
+ Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("error selecting status children: %w", err)
}
+
+ // Append possible IDs to threads slice.
+ threadIDs = append(threadIDs, ids...)
}
- // If the status is threaded, create
- // link between thread and status.
- if status.ThreadID != "" {
+ // Ensure only *unique* posssible thread IDs.
+ threadIDs = xslices.Deduplicate(threadIDs)
+ switch len(threadIDs) {
+
+ case 0:
+ // No related status with thread ID already exists,
+ // so create new thread ID from status creation time.
+ threadID := id.NewULIDFromTime(status.CreatedAt)
+
+ // Insert new thread.
if _, err := tx.
NewInsert().
- Model(&gtsmodel.ThreadToStatus{
- ThreadID: status.ThreadID,
- StatusID: status.ID,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")).
+ Model(&gtsmodel.Thread{ID: threadID}).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return gtserror.Newf("error inserting thread: %w", err)
+ }
+
+ // Update status thread ID.
+ status.ThreadID = threadID
+
+ case 1:
+ // Inherit single known thread.
+ status.ThreadID = threadIDs[0]
+
+ default:
+ var err error
+ log.Infof(ctx, "reconciling status threading for %s: [%s]", status.URI, strings.Join(threadIDs, ","))
+ status.ThreadID, err = s.fixStatusThreading(ctx, tx, threadIDs)
+ if err != nil {
+ return err
}
}
- // Finally, insert the status
- _, err := tx.NewInsert().
- Model(status).
- Exec(ctx)
- return err
+ // And after threading, insert status.
+ // This will error if ThreadID is unset.
+ return insertStatus(ctx, tx, status)
})
})
}
+// fixStatusThreading can be called to reconcile statuses in the same thread but known to be using multiple given threads.
+func (s *statusDB) fixStatusThreading(ctx context.Context, tx bun.Tx, threadIDs []string) (string, error) {
+ if len(threadIDs) <= 1 {
+ panic("invalid call to fixStatusThreading()")
+ }
+
+ // Sort ascending, i.e.
+ // oldest thread ID first.
+ slices.Sort(threadIDs)
+
+ // Drop the oldest thread ID
+ // from slice, we'll keep this.
+ threadID := threadIDs[0]
+ threadIDs = threadIDs[1:]
+
+ // On updates, gather IDs of changed model
+ // IDs for later stage of cache invalidation,
+ // preallocating slices for worst-case scenarios.
+ statusIDs := make([]string, 0, 4*len(threadIDs))
+ muteIDs := make([]string, 0, 4*len(threadIDs))
+
+ // Update all statuses with
+ // thread IDs to use oldest.
+ if _, err := tx.
+ NewUpdate().
+ Table("statuses").
+ Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
+ Set("? = ?", bun.Ident("thread_id"), threadID).
+ Returning("?", bun.Ident("id")).
+ Exec(ctx, &statusIDs); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return "", gtserror.Newf("error updating statuses: %w", err)
+ }
+
+ // Update all thread mutes with
+ // thread IDs to use oldest.
+ if _, err := tx.
+ NewUpdate().
+ Table("thread_mutes").
+ Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)).
+ Set("? = ?", bun.Ident("thread_id"), threadID).
+ Returning("?", bun.Ident("id")).
+ Exec(ctx, &muteIDs); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return "", gtserror.Newf("error updating thread mutes: %w", err)
+ }
+
+ // Delete all now
+ // unused thread IDs.
+ if _, err := tx.
+ NewDelete().
+ Table("threads").
+ Where("? IN (?)", bun.Ident("id"), bun.In(threadIDs)).
+ Exec(ctx); err != nil {
+ return "", gtserror.Newf("error deleting threads: %w", err)
+ }
+
+ // Invalidate caches for changed statuses and mutes.
+ s.state.Caches.DB.Status.InvalidateIDs("ID", statusIDs)
+ s.state.Caches.DB.ThreadMute.InvalidateIDs("ID", muteIDs)
+
+ return threadID, nil
+}
+
+// insertStatus handles the base status insert logic, that is the status itself,
+// any intermediary table links, and updating media attachments to point to status.
+func insertStatus(ctx context.Context, tx bun.Tx, status *gtsmodel.Status) error {
+
+ // create links between this
+ // status and any emojis it uses
+ for _, id := range status.EmojiIDs {
+ if _, err := tx.
+ NewInsert().
+ Model(&gtsmodel.StatusToEmoji{
+ StatusID: status.ID,
+ EmojiID: id,
+ }).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error inserting status_to_emoji: %w", err)
+ }
+ }
+
+ // create links between this
+ // status and any tags it uses
+ for _, id := range status.TagIDs {
+ if _, err := tx.
+ NewInsert().
+ Model(&gtsmodel.StatusToTag{
+ StatusID: status.ID,
+ TagID: id,
+ }).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error inserting status_to_tag: %w", err)
+ }
+ }
+
+ // change the status ID of the media
+ // attachments to the current status
+ for _, a := range status.Attachments {
+ a.StatusID = status.ID
+ if _, err := tx.
+ NewUpdate().
+ Model(a).
+ Column("status_id").
+ Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error updating media: %w", err)
+ }
+ }
+
+ // Finally, insert the status
+ if _, err := tx.NewInsert().
+ Model(status).
+ Exec(ctx); err != nil {
+ return gtserror.Newf("error inserting status: %w", err)
+ }
+
+ return nil
+}
+
func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, columns ...string) error {
return s.state.Caches.DB.Status.Store(status, func() error {
// It is safe to run this database transaction within cache.Store
// as the cache does not attempt a mutex lock until AFTER hook.
//
return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
- // create links between this status and any emojis it uses
- for _, i := range status.EmojiIDs {
+
+ // create links between this
+ // status and any emojis it uses
+ for _, id := range status.EmojiIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToEmoji{
StatusID: status.ID,
- EmojiID: i,
+ EmojiID: id,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return err
}
}
- // create links between this status and any tags it uses
- for _, i := range status.TagIDs {
+ // create links between this
+ // status and any tags it uses
+ for _, id := range status.TagIDs {
if _, err := tx.
NewInsert().
Model(&gtsmodel.StatusToTag{
StatusID: status.ID,
- TagID: i,
+ TagID: id,
}).
On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return err
}
}
@@ -457,26 +628,7 @@ func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, co
Column("status_id").
Where("? = ?", bun.Ident("media_attachment.id"), a.ID).
Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
- }
- }
-
- // If the status is threaded, create
- // link between thread and status.
- if status.ThreadID != "" {
- if _, err := tx.
- NewInsert().
- Model(&gtsmodel.ThreadToStatus{
- ThreadID: status.ThreadID,
- StatusID: status.ID,
- }).
- On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")).
- Exec(ctx); err != nil {
- if !errors.Is(err, db.ErrAlreadyExists) {
- return err
- }
+ return err
}
}
@@ -499,7 +651,9 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
// Delete status from database and any related links in a transaction.
if err := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
- // delete links between this status and any emojis it uses
+
+ // delete links between this
+ // status and any emojis it uses
if _, err := tx.
NewDelete().
TableExpr("? AS ?", bun.Ident("status_to_emojis"), bun.Ident("status_to_emoji")).
@@ -508,7 +662,8 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
return err
}
- // delete links between this status and any tags it uses
+ // delete links between this
+ // status and any tags it uses
if _, err := tx.
NewDelete().
TableExpr("? AS ?", bun.Ident("status_to_tags"), bun.Ident("status_to_tag")).
@@ -517,16 +672,6 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error {
return err
}
- // Delete links between this status
- // and any threads it was a part of.
- if _, err := tx.
- NewDelete().
- TableExpr("? AS ?", bun.Ident("thread_to_statuses"), bun.Ident("thread_to_status")).
- Where("? = ?", bun.Ident("thread_to_status.status_id"), id).
- Exec(ctx); err != nil {
- return err
- }
-
// delete the status itself
if _, err := tx.
NewDelete().