summaryrefslogtreecommitdiff
path: root/internal/processing/workers
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2025-04-26 09:56:15 +0000
committerLibravatar GitHub <noreply@github.com>2025-04-26 09:56:15 +0000
commit6a6a4993338262f87df34c9be051bfaac75c1829 (patch)
treebfbda090dc4b25efdd34145c016d7cc7b9c14d6e /internal/processing/workers
parent[chore] Move deps to code.superseriousbusiness.org (#4054) (diff)
downloadgotosocial-6a6a4993338262f87df34c9be051bfaac75c1829.tar.xz
[performance] rewrite timelines to rely on new timeline cache type (#3941)
* start work rewriting timeline cache type * further work rewriting timeline caching * more work integration new timeline code * remove old code * add local timeline, fix up merge conflicts * remove old use of go-bytes * implement new timeline code into more areas of codebase, pull in latest go-mangler, go-mutexes, go-structr * remove old timeline package, add local timeline cache * remove references to old timeline types that needed starting up in tests * start adding page validation * fix test-identified timeline cache package issues * fix up more tests, fix missing required changes, etc * add exclusion for test.out in gitignore * clarify some things better in code comments * tweak cache size limits * fix list timeline cache fetching * further list timeline fixes * linter, ssssssssshhhhhhhhhhhh please * fix linter hints * reslice the output if it's beyond length of 'lim' * remove old timeline initialization code, bump go-structr to v0.9.4 * continued from previous commit * improved code comments * don't allow multiple entries for BoostOfID values to prevent repeated boosts of same boosts * finish writing more code comments * some variable renaming, for ease of following * change the way we update lo,hi paging values during timeline load * improved code comments for updated / returned lo , hi paging values * finish writing code comments for the StatusTimeline{} type itself * fill in more code comments * update go-structr version to latest with changed timeline unique indexing logic * have a local and public timeline *per user* * rewrite calls to public / local timeline calls * remove the zero length check, as lo, hi values might still be set * simplify timeline cache loading, fix lo/hi returns, fix timeline invalidation side-effects missing for some federated actions * swap the lo, hi values :facepalm: * add (now) missing slice reverse of tag timeline statuses when paging ASC * remove local / public caches (is out of scope for this work), share more timeline code * remove unnecessary change * again, remove more unused code * remove unused function to appease the linter * move boost checking to prepare function * fix use of timeline.lastOrder, fix incorrect range functions used * remove comments for repeat code * remove the boost logic from prepare function * do a maximum of 5 loads, not 10 * add repeat boost filtering logic, update go-structr, general improvements * more code comments * add important note * fix timeline tests now that timelines are returned in page order * remove unused field * add StatusTimeline{} tests * add more status timeline tests * start adding preloading support * ensure repeat boosts are marked in preloaded entries * share a bunch of the database load code in timeline cache, don't clear timelines on relationship change * add logic to allow dynamic clear / preloading of timelines * comment-out unused functions, but leave in place as we might end-up using them * fix timeline preload state check * much improved status timeline code comments * more code comments, don't bother inserting statuses if timeline not preloaded * shift around some logic to make sure things aren't accidentally left set * finish writing code comments * remove trim-after-insert behaviour * fix-up some comments referring to old logic * remove unsetting of lo, hi * fix preload repeatBoost checking logic * don't return on status filter errors, these are usually transient * better concurrency safety in Clear() and Done() * fix test broken due to addition of preloader * fix repeatBoost logic that doesn't account for already-hidden repeatBoosts * ensure edit submodels are dropped on cache insertion * update code-comment to expand CAS accronym * use a plus1hULID() instead of 24h * remove unused functions * add note that public / local timeline requester can be nil * fix incorrect visibility filtering of tag timeline statuses * ensure we filter home timeline statuses on local only * some small re-orderings to confirm query params in correct places * fix the local only home timeline filter func
Diffstat (limited to 'internal/processing/workers')
-rw-r--r--internal/processing/workers/fromclientapi.go86
-rw-r--r--internal/processing/workers/fromfediapi.go152
-rw-r--r--internal/processing/workers/surfacetimeline.go175
-rw-r--r--internal/processing/workers/util.go8
4 files changed, 237 insertions, 184 deletions
diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go
index 28a2b37b9..661fea866 100644
--- a/internal/processing/workers/fromclientapi.go
+++ b/internal/processing/workers/fromclientapi.go
@@ -371,7 +371,7 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA
if status.InReplyToID != "" {
// Interaction counts changed on the replied status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ p.surface.invalidateStatusFromTimelines(status.InReplyToID)
}
return nil
@@ -413,7 +413,7 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien
}
// Interaction counts changed on the source status, uncache from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID)
+ p.surface.invalidateStatusFromTimelines(vote.Poll.StatusID)
return nil
}
@@ -565,7 +565,7 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI
// Interaction counts changed on the faved status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID)
+ p.surface.invalidateStatusFromTimelines(fave.StatusID)
return nil
}
@@ -671,7 +671,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien
// Interaction counts changed on the boosted status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+ p.surface.invalidateStatusFromTimelines(boost.BoostOfID)
return nil
}
@@ -682,22 +682,20 @@ func (p *clientAPI) CreateBlock(ctx context.Context, cMsg *messages.FromClientAP
return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel)
}
- // Remove blockee's statuses from blocker's timeline.
- if err := p.state.Timelines.Home.WipeItemsFromAccountID(
- ctx,
- block.AccountID,
- block.TargetAccountID,
- ); err != nil {
- return gtserror.Newf("error wiping timeline items for block: %w", err)
+ if block.Account.IsLocal() {
+ // Remove posts by target from origin's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ block.AccountID,
+ block.TargetAccountID,
+ )
}
- // Remove blocker's statuses from blockee's timeline.
- if err := p.state.Timelines.Home.WipeItemsFromAccountID(
- ctx,
- block.TargetAccountID,
- block.AccountID,
- ); err != nil {
- return gtserror.Newf("error wiping timeline items for block: %w", err)
+ if block.TargetAccount.IsLocal() {
+ // Remove posts by origin from target's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ block.TargetAccountID,
+ block.AccountID,
+ )
}
// TODO: same with notifications?
@@ -737,7 +735,7 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA
}
// Status representation has changed, invalidate from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.ID)
+ p.surface.invalidateStatusFromTimelines(status.ID)
return nil
}
@@ -858,6 +856,22 @@ func (p *clientAPI) UndoFollow(ctx context.Context, cMsg *messages.FromClientAPI
log.Errorf(ctx, "error updating account stats: %v", err)
}
+ if follow.Account.IsLocal() {
+ // Remove posts by target from origin's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ follow.AccountID,
+ follow.TargetAccountID,
+ )
+ }
+
+ if follow.TargetAccount.IsLocal() {
+ // Remove posts by origin from target's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ follow.TargetAccountID,
+ follow.AccountID,
+ )
+ }
+
if err := p.federate.UndoFollow(ctx, follow); err != nil {
log.Errorf(ctx, "error federating follow undo: %v", err)
}
@@ -890,7 +904,7 @@ func (p *clientAPI) UndoFave(ctx context.Context, cMsg *messages.FromClientAPI)
// Interaction counts changed on the faved status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID)
+ p.surface.invalidateStatusFromTimelines(statusFave.StatusID)
return nil
}
@@ -910,9 +924,8 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA
log.Errorf(ctx, "error updating account stats: %v", err)
}
- if err := p.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil {
- log.Errorf(ctx, "error removing timelined status: %v", err)
- }
+ // Delete the boost wrapper status from timelines.
+ p.surface.deleteStatusFromTimelines(ctx, status.ID)
if err := p.federate.UndoAnnounce(ctx, status); err != nil {
log.Errorf(ctx, "error federating announce undo: %v", err)
@@ -920,7 +933,7 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA
// Interaction counts changed on the boosted status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID)
+ p.surface.invalidateStatusFromTimelines(status.BoostOfID)
return nil
}
@@ -983,7 +996,7 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg *messages.FromClientA
if status.InReplyToID != "" {
// Interaction counts changed on the replied status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ p.surface.invalidateStatusFromTimelines(status.InReplyToID)
}
return nil
@@ -1026,6 +1039,23 @@ func (p *clientAPI) DeleteAccountOrUser(ctx context.Context, cMsg *messages.From
p.state.Workers.Federator.Queue.Delete("Receiving.ID", account.ID)
p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI)
+ // Remove any entries authored by account from timelines.
+ p.surface.removeTimelineEntriesByAccount(account.ID)
+
+ // Remove any of their cached timelines.
+ p.state.Caches.Timelines.Home.Delete(account.ID)
+
+ // Get the IDs of all the lists owned by the given account ID.
+ listIDs, err := p.state.DB.GetListIDsByAccountID(ctx, account.ID)
+ if err != nil {
+ log.Errorf(ctx, "error getting lists for account %s: %v", account.ID, err)
+ }
+
+ // Remove list timelines of account.
+ for _, listID := range listIDs {
+ p.state.Caches.Timelines.List.Delete(listID)
+ }
+
if err := p.federate.DeleteAccount(ctx, cMsg.Target); err != nil {
log.Errorf(ctx, "error federating account delete: %v", err)
}
@@ -1169,7 +1199,7 @@ func (p *clientAPI) AcceptLike(ctx context.Context, cMsg *messages.FromClientAPI
// Interaction counts changed on the faved status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, req.Like.StatusID)
+ p.surface.invalidateStatusFromTimelines(req.Like.StatusID)
return nil
}
@@ -1202,7 +1232,7 @@ func (p *clientAPI) AcceptReply(ctx context.Context, cMsg *messages.FromClientAP
// Interaction counts changed on the replied status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, reply.InReplyToID)
+ p.surface.invalidateStatusFromTimelines(reply.InReplyToID)
return nil
}
@@ -1240,7 +1270,7 @@ func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClien
// Interaction counts changed on the original status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+ p.surface.invalidateStatusFromTimelines(boost.BoostOfID)
return nil
}
diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go
index 2e513449b..3e0f0ba59 100644
--- a/internal/processing/workers/fromfediapi.go
+++ b/internal/processing/workers/fromfediapi.go
@@ -197,9 +197,22 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg *messages.FromF
// UNDO SOMETHING
case ap.ActivityUndo:
+ switch fMsg.APObjectType {
+ // UNDO FOLLOW
+ case ap.ActivityFollow:
+ return p.fediAPI.UndoFollow(ctx, fMsg)
+
+ // UNDO BLOCK
+ case ap.ActivityBlock:
+ return p.fediAPI.UndoBlock(ctx, fMsg)
+
// UNDO ANNOUNCE
- if fMsg.APObjectType == ap.ActivityAnnounce {
+ case ap.ActivityAnnounce:
return p.fediAPI.UndoAnnounce(ctx, fMsg)
+
+ // UNDO LIKE
+ case ap.ActivityLike:
+ return p.fediAPI.UndoFave(ctx, fMsg)
}
}
@@ -346,7 +359,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
// Interaction counts changed on the replied status; uncache the
// prepared version from all timelines. The status dereferencer
// functions will ensure necessary ancestors exist before this point.
- p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ p.surface.invalidateStatusFromTimelines(status.InReplyToID)
}
return nil
@@ -393,7 +406,7 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI
}
// Interaction counts changed, uncache from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.ID)
+ p.surface.invalidateStatusFromTimelines(status.ID)
return nil
}
@@ -428,7 +441,7 @@ func (p *fediAPI) UpdatePollVote(ctx context.Context, fMsg *messages.FromFediAPI
}
// Interaction counts changed, uncache from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.ID)
+ p.surface.invalidateStatusFromTimelines(status.ID)
return nil
}
@@ -573,7 +586,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er
// Interaction counts changed on the faved status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID)
+ p.surface.invalidateStatusFromTimelines(fave.StatusID)
return nil
}
@@ -690,7 +703,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
// Interaction counts changed on the original status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+ p.surface.invalidateStatusFromTimelines(boost.BoostOfID)
return nil
}
@@ -701,53 +714,32 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) e
return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel)
}
- // Remove each account's posts from the other's timelines.
- //
- // First home timelines.
- if err := p.state.Timelines.Home.WipeItemsFromAccountID(
- ctx,
- block.AccountID,
- block.TargetAccountID,
- ); err != nil {
- log.Errorf(ctx, "error wiping items from block -> target's home timeline: %v", err)
- }
-
- if err := p.state.Timelines.Home.WipeItemsFromAccountID(
- ctx,
- block.TargetAccountID,
- block.AccountID,
- ); err != nil {
- log.Errorf(ctx, "error wiping items from target -> block's home timeline: %v", err)
- }
-
- // Now list timelines.
- if err := p.state.Timelines.List.WipeItemsFromAccountID(
- ctx,
- block.AccountID,
- block.TargetAccountID,
- ); err != nil {
- log.Errorf(ctx, "error wiping items from block -> target's list timeline(s): %v", err)
+ if block.Account.IsLocal() {
+ // Remove posts by target from origin's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ block.AccountID,
+ block.TargetAccountID,
+ )
}
- if err := p.state.Timelines.List.WipeItemsFromAccountID(
- ctx,
- block.TargetAccountID,
- block.AccountID,
- ); err != nil {
- log.Errorf(ctx, "error wiping items from target -> block's list timeline(s): %v", err)
+ if block.TargetAccount.IsLocal() {
+ // Remove posts by origin from target's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ block.TargetAccountID,
+ block.AccountID,
+ )
}
// Remove any follows that existed between blocker + blockee.
- if err := p.state.DB.DeleteFollow(
- ctx,
+ // (note this handles removing any necessary list entries).
+ if err := p.state.DB.DeleteFollow(ctx,
block.AccountID,
block.TargetAccountID,
); err != nil {
log.Errorf(ctx, "error deleting follow from block -> target: %v", err)
}
- if err := p.state.DB.DeleteFollow(
- ctx,
+ if err := p.state.DB.DeleteFollow(ctx,
block.TargetAccountID,
block.AccountID,
); err != nil {
@@ -755,16 +747,14 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) e
}
// Remove any follow requests that existed between blocker + blockee.
- if err := p.state.DB.DeleteFollowRequest(
- ctx,
+ if err := p.state.DB.DeleteFollowRequest(ctx,
block.AccountID,
block.TargetAccountID,
); err != nil {
log.Errorf(ctx, "error deleting follow request from block -> target: %v", err)
}
- if err := p.state.DB.DeleteFollowRequest(
- ctx,
+ if err := p.state.DB.DeleteFollowRequest(ctx,
block.TargetAccountID,
block.AccountID,
); err != nil {
@@ -871,7 +861,7 @@ func (p *fediAPI) AcceptReply(ctx context.Context, fMsg *messages.FromFediAPI) e
// Interaction counts changed on the replied-to status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ p.surface.invalidateStatusFromTimelines(status.InReplyToID)
return nil
}
@@ -920,11 +910,11 @@ func (p *fediAPI) AcceptRemoteStatus(ctx context.Context, fMsg *messages.FromFed
// Interaction counts changed on the interacted status;
// uncache the prepared version from all timelines.
if status.InReplyToID != "" {
- p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ p.surface.invalidateStatusFromTimelines(status.InReplyToID)
}
if status.BoostOfID != "" {
- p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID)
+ p.surface.invalidateStatusFromTimelines(status.BoostOfID)
}
return nil
@@ -953,7 +943,7 @@ func (p *fediAPI) AcceptAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
// Interaction counts changed on the boosted status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+ p.surface.invalidateStatusFromTimelines(boost.BoostOfID)
return nil
}
@@ -1004,7 +994,7 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
}
// Status representation was refetched, uncache from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.ID)
+ p.surface.invalidateStatusFromTimelines(status.ID)
return nil
}
@@ -1063,7 +1053,7 @@ func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg *messages.FromFediAPI)
if status.InReplyToID != "" {
// Interaction counts changed on the replied status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ p.surface.invalidateStatusFromTimelines(status.InReplyToID)
}
return nil
@@ -1090,6 +1080,9 @@ func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg *messages.FromFediAPI)
p.state.Workers.Federator.Queue.Delete("Requesting.ID", account.ID)
p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI)
+ // Remove any entries authored by account from timelines.
+ p.surface.removeTimelineEntriesByAccount(account.ID)
+
// First perform the actual account deletion.
if err := p.account.Delete(ctx, account, account.ID); err != nil {
log.Errorf(ctx, "error deleting account: %v", err)
@@ -1208,6 +1201,42 @@ func (p *fediAPI) RejectAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
return nil
}
+func (p *fediAPI) UndoFollow(ctx context.Context, fMsg *messages.FromFediAPI) error {
+ follow, ok := fMsg.GTSModel.(*gtsmodel.Follow)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Follow", fMsg.GTSModel)
+ }
+
+ if follow.Account.IsLocal() {
+ // Remove posts by target from origin's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ follow.AccountID,
+ follow.TargetAccountID,
+ )
+ }
+
+ if follow.TargetAccount.IsLocal() {
+ // Remove posts by origin from target's timelines.
+ p.surface.removeRelationshipFromTimelines(ctx,
+ follow.TargetAccountID,
+ follow.AccountID,
+ )
+ }
+
+ return nil
+}
+
+func (p *fediAPI) UndoBlock(ctx context.Context, fMsg *messages.FromFediAPI) error {
+ _, ok := fMsg.GTSModel.(*gtsmodel.Block)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel)
+ }
+
+ // TODO: any required changes
+
+ return nil
+}
+
func (p *fediAPI) UndoAnnounce(
ctx context.Context,
fMsg *messages.FromFediAPI,
@@ -1228,13 +1257,24 @@ func (p *fediAPI) UndoAnnounce(
}
// Remove the boost wrapper from all timelines.
- if err := p.surface.deleteStatusFromTimelines(ctx, boost.ID); err != nil {
- log.Errorf(ctx, "error removing timelined boost: %v", err)
- }
+ p.surface.deleteStatusFromTimelines(ctx, boost.ID)
// Interaction counts changed on the boosted status;
// uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+ p.surface.invalidateStatusFromTimelines(boost.BoostOfID)
+
+ return nil
+}
+
+func (p *fediAPI) UndoFave(ctx context.Context, fMsg *messages.FromFediAPI) error {
+ statusFave, ok := fMsg.GTSModel.(*gtsmodel.StatusFave)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", fMsg.GTSModel)
+ }
+
+ // Interaction counts changed on the faved status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(statusFave.StatusID)
return nil
}
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
index b071bd72e..0f2e80d0f 100644
--- a/internal/processing/workers/surfacetimeline.go
+++ b/internal/processing/workers/surfacetimeline.go
@@ -21,6 +21,7 @@ import (
"context"
"errors"
+ "github.com/superseriousbusiness/gotosocial/internal/cache/timeline"
statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
"github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
@@ -28,7 +29,6 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/stream"
- "github.com/superseriousbusiness/gotosocial/internal/timeline"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
@@ -161,21 +161,16 @@ func (s *Surface) timelineAndNotifyStatusForFollowers(
// Add status to home timeline for owner of
// this follow (origin account), if applicable.
- homeTimelined, err = s.timelineStatus(ctx,
- s.State.Timelines.Home.IngestOne,
- follow.AccountID, // home timelines are keyed by account ID
+ if homeTimelined := s.timelineStatus(ctx,
+ s.State.Caches.Timelines.Home.MustGet(follow.AccountID),
follow.Account,
status,
stream.TimelineHome,
+ statusfilter.FilterContextHome,
filters,
mutes,
- )
- if err != nil {
- log.Errorf(ctx, "error home timelining status: %v", err)
- continue
- }
+ ); homeTimelined {
- if homeTimelined {
// If hometimelined, add to list of returned account IDs.
homeTimelinedAccountIDs = append(homeTimelinedAccountIDs, follow.AccountID)
}
@@ -261,22 +256,16 @@ func (s *Surface) listTimelineStatusForFollow(
exclusive = exclusive || *list.Exclusive
// At this point we are certain this status
- // should be included in the timeline of the
- // list that this list entry belongs to.
- listTimelined, err := s.timelineStatus(
- ctx,
- s.State.Timelines.List.IngestOne,
- list.ID, // list timelines are keyed by list ID
+ // should be included in timeline of this list.
+ listTimelined := s.timelineStatus(ctx,
+ s.State.Caches.Timelines.List.MustGet(list.ID),
follow.Account,
status,
stream.TimelineList+":"+list.ID, // key streamType to this specific list
+ statusfilter.FilterContextHome,
filters,
mutes,
)
- if err != nil {
- log.Errorf(ctx, "error adding status to list timeline: %v", err)
- continue
- }
// Update flag based on if timelined.
timelined = timelined || listTimelined
@@ -367,53 +356,48 @@ func (s *Surface) listEligible(
}
}
-// timelineStatus uses the provided ingest function to put the given
-// status in a timeline with the given ID, if it's timelineable.
-//
-// If the status was inserted into the timeline, true will be returned
-// + it will also be streamed to the user using the given streamType.
+// timelineStatus will insert the given status into the given timeline, if it's
+// timelineable. if the status was inserted into the timeline, true will be returned.
func (s *Surface) timelineStatus(
ctx context.Context,
- ingest func(context.Context, string, timeline.Timelineable) (bool, error),
- timelineID string,
+ timeline *timeline.StatusTimeline,
account *gtsmodel.Account,
status *gtsmodel.Status,
streamType string,
+ filterCtx statusfilter.FilterContext,
filters []*gtsmodel.Filter,
mutes *usermute.CompiledUserMuteList,
-) (bool, error) {
-
- // Ingest status into given timeline using provided function.
- if inserted, err := ingest(ctx, timelineID, status); err != nil &&
- !errors.Is(err, statusfilter.ErrHideStatus) {
- err := gtserror.Newf("error ingesting status %s: %w", status.ID, err)
- return false, err
- } else if !inserted {
- // Nothing more to do.
- return false, nil
- }
+) bool {
- // Convert updated database model to frontend model.
- apiStatus, err := s.Converter.StatusToAPIStatus(ctx,
+ // Attempt to convert status to frontend API representation,
+ // this will check whether status is filtered / muted.
+ apiModel, err := s.Converter.StatusToAPIStatus(ctx,
status,
account,
- statusfilter.FilterContextHome,
+ filterCtx,
filters,
mutes,
)
if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) {
- err := gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err)
- return true, err
+ log.Error(ctx, "error converting status %s to frontend: %v", status.URI, err)
}
- if apiStatus != nil {
- // The status was inserted so stream it to the user.
- s.Stream.Update(ctx, account, apiStatus, streamType)
- return true, nil
+ // Insert status to timeline cache regardless of
+ // if API model was succesfully prepared or not.
+ repeatBoost := timeline.InsertOne(status, apiModel)
+
+ if apiModel == nil {
+ // Status was
+ // filtered / muted.
+ return false
+ }
+
+ if !repeatBoost {
+ // Only stream if not repeated boost of recent status.
+ s.Stream.Update(ctx, account, apiModel, streamType)
}
- // Status was hidden.
- return false, nil
+ return true
}
// timelineAndNotifyStatusForTagFollowers inserts the status into the
@@ -444,23 +428,15 @@ func (s *Surface) timelineAndNotifyStatusForTagFollowers(
continue
}
- if _, err := s.timelineStatus(
- ctx,
- s.State.Timelines.Home.IngestOne,
- tagFollowerAccount.ID, // home timelines are keyed by account ID
+ _ = s.timelineStatus(ctx,
+ s.State.Caches.Timelines.Home.MustGet(tagFollowerAccount.ID),
tagFollowerAccount,
status,
stream.TimelineHome,
+ statusfilter.FilterContextHome,
filters,
mutes,
- ); err != nil {
- errs.Appendf(
- "error inserting status %s into home timeline for account %s: %w",
- status.ID,
- tagFollowerAccount.ID,
- err,
- )
- }
+ )
}
return errs.Combine()
@@ -550,39 +526,6 @@ func (s *Surface) tagFollowersForStatus(
return visibleTagFollowerAccounts, errs.Combine()
}
-// deleteStatusFromTimelines completely removes the given status from all timelines.
-// It will also stream deletion of the status to all open streams.
-func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) error {
- if err := s.State.Timelines.Home.WipeItemFromAllTimelines(ctx, statusID); err != nil {
- return err
- }
- if err := s.State.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil {
- return err
- }
- s.Stream.Delete(ctx, statusID)
- return nil
-}
-
-// invalidateStatusFromTimelines does cache invalidation on the given status by
-// unpreparing it from all timelines, forcing it to be prepared again (with updated
-// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes
-// both for the status itself, and for any boosts of the status.
-func (s *Surface) invalidateStatusFromTimelines(ctx context.Context, statusID string) {
- if err := s.State.Timelines.Home.UnprepareItemFromAllTimelines(ctx, statusID); err != nil {
- log.
- WithContext(ctx).
- WithField("statusID", statusID).
- Errorf("error unpreparing status from home timelines: %v", err)
- }
-
- if err := s.State.Timelines.List.UnprepareItemFromAllTimelines(ctx, statusID); err != nil {
- log.
- WithContext(ctx).
- WithField("statusID", statusID).
- Errorf("error unpreparing status from list timelines: %v", err)
- }
-}
-
// timelineStatusUpdate looks up HOME and LIST timelines of accounts
// that follow the the status author or tags and pushes edit messages into any
// active streams.
@@ -859,3 +802,47 @@ func (s *Surface) timelineStatusUpdateForTagFollowers(
}
return errs.Combine()
}
+
+// deleteStatusFromTimelines completely removes the given status from all timelines.
+// It will also stream deletion of the status to all open streams.
+func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) {
+ s.State.Caches.Timelines.Home.RemoveByStatusIDs(statusID)
+ s.State.Caches.Timelines.List.RemoveByStatusIDs(statusID)
+ s.Stream.Delete(ctx, statusID)
+}
+
+// invalidateStatusFromTimelines does cache invalidation on the given status by
+// unpreparing it from all timelines, forcing it to be prepared again (with updated
+// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes
+// both for the status itself, and for any boosts of the status.
+func (s *Surface) invalidateStatusFromTimelines(statusID string) {
+ s.State.Caches.Timelines.Home.UnprepareByStatusIDs(statusID)
+ s.State.Caches.Timelines.List.UnprepareByStatusIDs(statusID)
+}
+
+// removeTimelineEntriesByAccount removes all cached timeline entries authored by account ID.
+func (s *Surface) removeTimelineEntriesByAccount(accountID string) {
+ s.State.Caches.Timelines.Home.RemoveByAccountIDs(accountID)
+ s.State.Caches.Timelines.List.RemoveByAccountIDs(accountID)
+}
+
+func (s *Surface) removeRelationshipFromTimelines(ctx context.Context, timelineAccountID string, targetAccountID string) {
+ // Remove all statuses by target account
+ // from given account's home timeline.
+ s.State.Caches.Timelines.Home.
+ MustGet(timelineAccountID).
+ RemoveByAccountIDs(targetAccountID)
+
+ // Get the IDs of all the lists owned by the given account ID.
+ listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, timelineAccountID)
+ if err != nil {
+ log.Errorf(ctx, "error getting lists for account %s: %v", timelineAccountID, err)
+ }
+
+ for _, listID := range listIDs {
+ // Remove all statuses by target account
+ // from given account's list timelines.
+ s.State.Caches.Timelines.List.MustGet(listID).
+ RemoveByAccountIDs(targetAccountID)
+ }
+}
diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go
index b358dc951..d844ab762 100644
--- a/internal/processing/workers/util.go
+++ b/internal/processing/workers/util.go
@@ -172,15 +172,11 @@ func (u *utils) wipeStatus(
}
// Remove the boost from any and all timelines.
- if err := u.surface.deleteStatusFromTimelines(ctx, boost.ID); err != nil {
- errs.Appendf("error deleting boost from timelines: %w", err)
- }
+ u.surface.deleteStatusFromTimelines(ctx, boost.ID)
}
// Delete the status itself from any and all timelines.
- if err := u.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil {
- errs.Appendf("error deleting status from timelines: %w", err)
- }
+ u.surface.deleteStatusFromTimelines(ctx, status.ID)
// Delete this status from any conversations it's part of.
if err := u.state.DB.DeleteStatusFromConversations(ctx, status.ID); err != nil {