diff options
Diffstat (limited to 'internal/db/bundb/status.go')
| -rw-r--r-- | internal/db/bundb/status.go | 343 |
1 files changed, 244 insertions, 99 deletions
diff --git a/internal/db/bundb/status.go b/internal/db/bundb/status.go index cf4a2549a..81aba8726 100644 --- a/internal/db/bundb/status.go +++ b/internal/db/bundb/status.go @@ -21,11 +21,13 @@ import ( "context" "errors" "slices" + "strings" "code.superseriousbusiness.org/gotosocial/internal/db" "code.superseriousbusiness.org/gotosocial/internal/gtscontext" "code.superseriousbusiness.org/gotosocial/internal/gtserror" "code.superseriousbusiness.org/gotosocial/internal/gtsmodel" + "code.superseriousbusiness.org/gotosocial/internal/id" "code.superseriousbusiness.org/gotosocial/internal/log" "code.superseriousbusiness.org/gotosocial/internal/state" "code.superseriousbusiness.org/gotosocial/internal/util/xslices" @@ -335,115 +337,284 @@ func (s *statusDB) PutStatus(ctx context.Context, status *gtsmodel.Status) error // as the cache does not attempt a mutex lock until AFTER hook. // return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - // create links between this status and any emojis it uses - for _, i := range status.EmojiIDs { - if _, err := tx. - NewInsert(). - Model(>smodel.StatusToEmoji{ - StatusID: status.ID, - EmojiID: i, - }). - On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")). - Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } + if status.BoostOfID != "" { + var threadID string + + // Boost wrappers always inherit thread + // of the origin status they're boosting. + if err := tx. + NewSelect(). + Table("statuses"). + Column("thread_id"). + Where("? = ?", bun.Ident("id"), status.BoostOfID). + Scan(ctx, &threadID); err != nil { + return gtserror.Newf("error selecting boosted status: %w", err) } + + // Set the selected thread. + status.ThreadID = threadID + + // They also require no further + // checks! Simply insert status here. + return insertStatus(ctx, tx, status) } - // create links between this status and any tags it uses - for _, i := range status.TagIDs { - if _, err := tx. - NewInsert(). - Model(>smodel.StatusToTag{ - StatusID: status.ID, - TagID: i, - }). - On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")). - Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } + // Gather a list of possible thread IDs + // of all the possible related statuses + // to this one. If one exists we can use + // the end result, and if too many exist + // we can fix the status threading. + var threadIDs []string + + if status.InReplyToID != "" { + var threadID string + + // A stored parent status exists, + // select its thread ID to ideally + // inherit this for status. + if err := tx. + NewSelect(). + Table("statuses"). + Column("thread_id"). + Where("? = ?", bun.Ident("id"), status.InReplyToID). + Scan(ctx, &threadID); err != nil { + return gtserror.Newf("error selecting status parent: %w", err) + } + + // Append possible ID to threads slice. + threadIDs = append(threadIDs, threadID) + + } else if status.InReplyToURI != "" { + var ids []string + + // A parent status exists but is not + // yet stored. See if any siblings for + // this shared parent exist with their + // own thread IDs. + if err := tx. + NewSelect(). + Table("statuses"). + Column("thread_id"). + Where("? = ?", bun.Ident("in_reply_to_uri"), status.InReplyToURI). + Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("error selecting status siblings: %w", err) } + + // Append possible IDs to threads slice. + threadIDs = append(threadIDs, ids...) } - // change the status ID of the media - // attachments to the current status - for _, a := range status.Attachments { - a.StatusID = status.ID - if _, err := tx. - NewUpdate(). - Model(a). - Column("status_id"). - Where("? = ?", bun.Ident("media_attachment.id"), a.ID). - Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } + if !*status.Local { + var ids []string + + // For remote statuses specifically, check to + // see if any children are stored for this new + // stored parent with their own thread IDs. + if err := tx. + NewSelect(). + Table("statuses"). + Column("thread_id"). + Where("? = ?", bun.Ident("in_reply_to_uri"), status.URI). + Scan(ctx, &ids); err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("error selecting status children: %w", err) } + + // Append possible IDs to threads slice. + threadIDs = append(threadIDs, ids...) } - // If the status is threaded, create - // link between thread and status. - if status.ThreadID != "" { + // Ensure only *unique* posssible thread IDs. + threadIDs = xslices.Deduplicate(threadIDs) + switch len(threadIDs) { + + case 0: + // No related status with thread ID already exists, + // so create new thread ID from status creation time. + threadID := id.NewULIDFromTime(status.CreatedAt) + + // Insert new thread. if _, err := tx. NewInsert(). - Model(>smodel.ThreadToStatus{ - ThreadID: status.ThreadID, - StatusID: status.ID, - }). - On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")). + Model(>smodel.Thread{ID: threadID}). Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } + return gtserror.Newf("error inserting thread: %w", err) + } + + // Update status thread ID. + status.ThreadID = threadID + + case 1: + // Inherit single known thread. + status.ThreadID = threadIDs[0] + + default: + var err error + log.Infof(ctx, "reconciling status threading for %s: [%s]", status.URI, strings.Join(threadIDs, ",")) + status.ThreadID, err = s.fixStatusThreading(ctx, tx, threadIDs) + if err != nil { + return err } } - // Finally, insert the status - _, err := tx.NewInsert(). - Model(status). - Exec(ctx) - return err + // And after threading, insert status. + // This will error if ThreadID is unset. + return insertStatus(ctx, tx, status) }) }) } +// fixStatusThreading can be called to reconcile statuses in the same thread but known to be using multiple given threads. +func (s *statusDB) fixStatusThreading(ctx context.Context, tx bun.Tx, threadIDs []string) (string, error) { + if len(threadIDs) <= 1 { + panic("invalid call to fixStatusThreading()") + } + + // Sort ascending, i.e. + // oldest thread ID first. + slices.Sort(threadIDs) + + // Drop the oldest thread ID + // from slice, we'll keep this. + threadID := threadIDs[0] + threadIDs = threadIDs[1:] + + // On updates, gather IDs of changed model + // IDs for later stage of cache invalidation, + // preallocating slices for worst-case scenarios. + statusIDs := make([]string, 0, 4*len(threadIDs)) + muteIDs := make([]string, 0, 4*len(threadIDs)) + + // Update all statuses with + // thread IDs to use oldest. + if _, err := tx. + NewUpdate(). + Table("statuses"). + Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). + Set("? = ?", bun.Ident("thread_id"), threadID). + Returning("?", bun.Ident("id")). + Exec(ctx, &statusIDs); err != nil && !errors.Is(err, db.ErrNoEntries) { + return "", gtserror.Newf("error updating statuses: %w", err) + } + + // Update all thread mutes with + // thread IDs to use oldest. + if _, err := tx. + NewUpdate(). + Table("thread_mutes"). + Where("? IN (?)", bun.Ident("thread_id"), bun.In(threadIDs)). + Set("? = ?", bun.Ident("thread_id"), threadID). + Returning("?", bun.Ident("id")). + Exec(ctx, &muteIDs); err != nil && !errors.Is(err, db.ErrNoEntries) { + return "", gtserror.Newf("error updating thread mutes: %w", err) + } + + // Delete all now + // unused thread IDs. + if _, err := tx. + NewDelete(). + Table("threads"). + Where("? IN (?)", bun.Ident("id"), bun.In(threadIDs)). + Exec(ctx); err != nil { + return "", gtserror.Newf("error deleting threads: %w", err) + } + + // Invalidate caches for changed statuses and mutes. + s.state.Caches.DB.Status.InvalidateIDs("ID", statusIDs) + s.state.Caches.DB.ThreadMute.InvalidateIDs("ID", muteIDs) + + return threadID, nil +} + +// insertStatus handles the base status insert logic, that is the status itself, +// any intermediary table links, and updating media attachments to point to status. +func insertStatus(ctx context.Context, tx bun.Tx, status *gtsmodel.Status) error { + + // create links between this + // status and any emojis it uses + for _, id := range status.EmojiIDs { + if _, err := tx. + NewInsert(). + Model(>smodel.StatusToEmoji{ + StatusID: status.ID, + EmojiID: id, + }). + Exec(ctx); err != nil { + return gtserror.Newf("error inserting status_to_emoji: %w", err) + } + } + + // create links between this + // status and any tags it uses + for _, id := range status.TagIDs { + if _, err := tx. + NewInsert(). + Model(>smodel.StatusToTag{ + StatusID: status.ID, + TagID: id, + }). + Exec(ctx); err != nil { + return gtserror.Newf("error inserting status_to_tag: %w", err) + } + } + + // change the status ID of the media + // attachments to the current status + for _, a := range status.Attachments { + a.StatusID = status.ID + if _, err := tx. + NewUpdate(). + Model(a). + Column("status_id"). + Where("? = ?", bun.Ident("media_attachment.id"), a.ID). + Exec(ctx); err != nil { + return gtserror.Newf("error updating media: %w", err) + } + } + + // Finally, insert the status + if _, err := tx.NewInsert(). + Model(status). + Exec(ctx); err != nil { + return gtserror.Newf("error inserting status: %w", err) + } + + return nil +} + func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, columns ...string) error { return s.state.Caches.DB.Status.Store(status, func() error { // It is safe to run this database transaction within cache.Store // as the cache does not attempt a mutex lock until AFTER hook. // return s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - // create links between this status and any emojis it uses - for _, i := range status.EmojiIDs { + + // create links between this + // status and any emojis it uses + for _, id := range status.EmojiIDs { if _, err := tx. NewInsert(). Model(>smodel.StatusToEmoji{ StatusID: status.ID, - EmojiID: i, + EmojiID: id, }). On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("emoji_id")). Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } + return err } } - // create links between this status and any tags it uses - for _, i := range status.TagIDs { + // create links between this + // status and any tags it uses + for _, id := range status.TagIDs { if _, err := tx. NewInsert(). Model(>smodel.StatusToTag{ StatusID: status.ID, - TagID: i, + TagID: id, }). On("CONFLICT (?, ?) DO NOTHING", bun.Ident("status_id"), bun.Ident("tag_id")). Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } + return err } } @@ -457,26 +628,7 @@ func (s *statusDB) UpdateStatus(ctx context.Context, status *gtsmodel.Status, co Column("status_id"). Where("? = ?", bun.Ident("media_attachment.id"), a.ID). Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } - } - } - - // If the status is threaded, create - // link between thread and status. - if status.ThreadID != "" { - if _, err := tx. - NewInsert(). - Model(>smodel.ThreadToStatus{ - ThreadID: status.ThreadID, - StatusID: status.ID, - }). - On("CONFLICT (?, ?) DO NOTHING", bun.Ident("thread_id"), bun.Ident("status_id")). - Exec(ctx); err != nil { - if !errors.Is(err, db.ErrAlreadyExists) { - return err - } + return err } } @@ -499,7 +651,9 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error { // Delete status from database and any related links in a transaction. if err := s.db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - // delete links between this status and any emojis it uses + + // delete links between this + // status and any emojis it uses if _, err := tx. NewDelete(). TableExpr("? AS ?", bun.Ident("status_to_emojis"), bun.Ident("status_to_emoji")). @@ -508,7 +662,8 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error { return err } - // delete links between this status and any tags it uses + // delete links between this + // status and any tags it uses if _, err := tx. NewDelete(). TableExpr("? AS ?", bun.Ident("status_to_tags"), bun.Ident("status_to_tag")). @@ -517,16 +672,6 @@ func (s *statusDB) DeleteStatusByID(ctx context.Context, id string) error { return err } - // Delete links between this status - // and any threads it was a part of. - if _, err := tx. - NewDelete(). - TableExpr("? AS ?", bun.Ident("thread_to_statuses"), bun.Ident("thread_to_status")). - Where("? = ?", bun.Ident("thread_to_status.status_id"), id). - Exec(ctx); err != nil { - return err - } - // delete the status itself if _, err := tx. NewDelete(). |
