summaryrefslogtreecommitdiff
path: root/internal/processing/workers
diff options
context:
space:
mode:
authorLibravatar Sam Lade <sam@sentynel.com>2023-12-16 11:55:49 +0000
committerLibravatar GitHub <noreply@github.com>2023-12-16 12:55:49 +0100
commit285d55dda8b4de70661b16db4986d47e4e586ea2 (patch)
tree17e460f916370898d3568c6ba569686e9350aedd /internal/processing/workers
parent[feature] Run ANALYZE after migrations on SQLite (#2428) (diff)
downloadgotosocial-285d55dda8b4de70661b16db4986d47e4e586ea2.tar.xz
[feature] Push status edit messages into open streams (#2418)
* push status edit messages into open streams * fix a few comments * test++ * commented out code? moi?
Diffstat (limited to 'internal/processing/workers')
-rw-r--r--internal/processing/workers/fromclientapi.go5
-rw-r--r--internal/processing/workers/fromfediapi.go5
-rw-r--r--internal/processing/workers/surfacetimeline.go173
3 files changed, 183 insertions, 0 deletions
diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go
index e3f1e2d76..05b9acc1f 100644
--- a/internal/processing/workers/fromclientapi.go
+++ b/internal/processing/workers/fromclientapi.go
@@ -416,6 +416,11 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg messages.FromClientAP
}
}
+ // Push message that the status has been edited to streams.
+ if err := p.surface.timelineStatusUpdate(ctx, status); err != nil {
+ log.Errorf(ctx, "error streaming status edit: %v", err)
+ }
+
return nil
}
diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go
index d04e4ab8d..6dd4e543d 100644
--- a/internal/processing/workers/fromfediapi.go
+++ b/internal/processing/workers/fromfediapi.go
@@ -530,6 +530,11 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e
}
}
+ // Push message that the status has been edited to streams.
+ if err := p.surface.timelineStatusUpdate(ctx, status); err != nil {
+ log.Errorf(ctx, "error streaming status edit: %v", err)
+ }
+
return nil
}
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
index baebdbc66..e63b8a7c0 100644
--- a/internal/processing/workers/surfacetimeline.go
+++ b/internal/processing/workers/surfacetimeline.go
@@ -390,3 +390,176 @@ func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID st
Errorf("error unpreparing status from list timelines: %v", err)
}
}
+
+// timelineStatusUpdate looks up HOME and LIST timelines of accounts
+// that follow the the status author and pushes edit messages into any
+// active streams.
+// Note that calling invalidateStatusFromTimelines takes care of the
+// state in general, we just need to do this for any streams that are
+// open right now.
+func (s *surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Status) error {
+ // Ensure status fully populated; including account, mentions, etc.
+ if err := s.state.DB.PopulateStatus(ctx, status); err != nil {
+ return gtserror.Newf("error populating status with id %s: %w", status.ID, err)
+ }
+
+ // Get all local followers of the account that posted the status.
+ follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID)
+ if err != nil {
+ return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err)
+ }
+
+ // If the poster is also local, add a fake entry for them
+ // so they can see their own status in their timeline.
+ if status.Account.IsLocal() {
+ follows = append(follows, &gtsmodel.Follow{
+ AccountID: status.AccountID,
+ Account: status.Account,
+ Notify: func() *bool { b := false; return &b }(), // Account shouldn't notify itself.
+ ShowReblogs: func() *bool { b := true; return &b }(), // Account should show own reblogs.
+ })
+ }
+
+ // Push to streams for each local follower of this account.
+ if err := s.timelineStatusUpdateForFollowers(ctx, status, follows); err != nil {
+ return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err)
+ }
+
+ return nil
+}
+
+// timelineStatusUpdateForFollowers iterates through the given
+// slice of followers of the account that posted the given status,
+// pushing update messages into open list/home streams of each
+// follower.
+func (s *surface) timelineStatusUpdateForFollowers(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ follows []*gtsmodel.Follow,
+) error {
+ var (
+ errs gtserror.MultiError
+ )
+
+ for _, follow := range follows {
+ // Check to see if the status is timelineable for this follower,
+ // taking account of its visibility, who it replies to, and, if
+ // it's a reblog, whether follower account wants to see reblogs.
+ //
+ // If it's not timelineable, we can just stop early, since lists
+ // are prettymuch subsets of the home timeline, so if it shouldn't
+ // appear there, it shouldn't appear in lists either.
+ timelineable, err := s.filter.StatusHomeTimelineable(
+ ctx, follow.Account, status,
+ )
+ if err != nil {
+ errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err)
+ continue
+ }
+
+ if !timelineable {
+ // Nothing to do.
+ continue
+ }
+
+ // Add status to any relevant lists
+ // for this follow, if applicable.
+ s.listTimelineStatusUpdateForFollow(
+ ctx,
+ status,
+ follow,
+ &errs,
+ )
+
+ // Add status to home timeline for owner
+ // of this follow, if applicable.
+ err = s.timelineStreamStatusUpdate(
+ ctx,
+ follow.Account,
+ status,
+ stream.TimelineHome,
+ )
+ if err != nil {
+ errs.Appendf("error home timelining status: %w", err)
+ continue
+ }
+ }
+
+ return errs.Combine()
+}
+
+// listTimelineStatusUpdateForFollow pushes edits of the given status
+// into any eligible lists streams opened by the given follower.
+func (s *surface) listTimelineStatusUpdateForFollow(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ follow *gtsmodel.Follow,
+ errs *gtserror.MultiError,
+) {
+ // To put this status in appropriate list timelines,
+ // we need to get each listEntry that pertains to
+ // this follow. Then, we want to iterate through all
+ // those list entries, and add the status to the list
+ // that the entry belongs to if it meets criteria for
+ // inclusion in the list.
+
+ // Get every list entry that targets this follow's ID.
+ listEntries, err := s.state.DB.GetListEntriesForFollowID(
+ // We only need the list IDs.
+ gtscontext.SetBarebones(ctx),
+ follow.ID,
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ errs.Appendf("error getting list entries: %w", err)
+ return
+ }
+
+ // Check eligibility for each list entry (if any).
+ for _, listEntry := range listEntries {
+ eligible, err := s.listEligible(ctx, listEntry, status)
+ if err != nil {
+ errs.Appendf("error checking list eligibility: %w", err)
+ continue
+ }
+
+ if !eligible {
+ // Don't add this.
+ continue
+ }
+
+ // At this point we are certain this status
+ // should be included in the timeline of the
+ // list that this list entry belongs to.
+ if err := s.timelineStreamStatusUpdate(
+ ctx,
+ follow.Account,
+ status,
+ stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list
+ ); err != nil {
+ errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err)
+ // implicit continue
+ }
+ }
+}
+
+// timelineStatusUpdate streams the edited status to the user using the
+// given streamType.
+func (s *surface) timelineStreamStatusUpdate(
+ ctx context.Context,
+ account *gtsmodel.Account,
+ status *gtsmodel.Status,
+ streamType string,
+) error {
+ apiStatus, err := s.converter.StatusToAPIStatus(ctx, status, account)
+ if err != nil {
+ err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err)
+ return err
+ }
+
+ if err := s.stream.StatusUpdate(apiStatus, account, []string{streamType}); err != nil {
+ err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err)
+ return err
+ }
+
+ return nil
+}