diff options
author | 2023-12-16 11:55:49 +0000 | |
---|---|---|
committer | 2023-12-16 12:55:49 +0100 | |
commit | 285d55dda8b4de70661b16db4986d47e4e586ea2 (patch) | |
tree | 17e460f916370898d3568c6ba569686e9350aedd /internal/processing/workers | |
parent | [feature] Run ANALYZE after migrations on SQLite (#2428) (diff) | |
download | gotosocial-285d55dda8b4de70661b16db4986d47e4e586ea2.tar.xz |
[feature] Push status edit messages into open streams (#2418)
* push status edit messages into open streams
* fix a few comments
* test++
* commented out code? moi?
Diffstat (limited to 'internal/processing/workers')
-rw-r--r-- | internal/processing/workers/fromclientapi.go | 5 | ||||
-rw-r--r-- | internal/processing/workers/fromfediapi.go | 5 | ||||
-rw-r--r-- | internal/processing/workers/surfacetimeline.go | 173 |
3 files changed, 183 insertions, 0 deletions
diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index e3f1e2d76..05b9acc1f 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -416,6 +416,11 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg messages.FromClientAP } } + // Push message that the status has been edited to streams. + if err := p.surface.timelineStatusUpdate(ctx, status); err != nil { + log.Errorf(ctx, "error streaming status edit: %v", err) + } + return nil } diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index d04e4ab8d..6dd4e543d 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -530,6 +530,11 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e } } + // Push message that the status has been edited to streams. + if err := p.surface.timelineStatusUpdate(ctx, status); err != nil { + log.Errorf(ctx, "error streaming status edit: %v", err) + } + return nil } diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index baebdbc66..e63b8a7c0 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -390,3 +390,176 @@ func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID st Errorf("error unpreparing status from list timelines: %v", err) } } + +// timelineStatusUpdate looks up HOME and LIST timelines of accounts +// that follow the the status author and pushes edit messages into any +// active streams. +// Note that calling invalidateStatusFromTimelines takes care of the +// state in general, we just need to do this for any streams that are +// open right now. +func (s *surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Status) error { + // Ensure status fully populated; including account, mentions, etc. + if err := s.state.DB.PopulateStatus(ctx, status); err != nil { + return gtserror.Newf("error populating status with id %s: %w", status.ID, err) + } + + // Get all local followers of the account that posted the status. + follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID) + if err != nil { + return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err) + } + + // If the poster is also local, add a fake entry for them + // so they can see their own status in their timeline. + if status.Account.IsLocal() { + follows = append(follows, >smodel.Follow{ + AccountID: status.AccountID, + Account: status.Account, + Notify: func() *bool { b := false; return &b }(), // Account shouldn't notify itself. + ShowReblogs: func() *bool { b := true; return &b }(), // Account should show own reblogs. + }) + } + + // Push to streams for each local follower of this account. + if err := s.timelineStatusUpdateForFollowers(ctx, status, follows); err != nil { + return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err) + } + + return nil +} + +// timelineStatusUpdateForFollowers iterates through the given +// slice of followers of the account that posted the given status, +// pushing update messages into open list/home streams of each +// follower. +func (s *surface) timelineStatusUpdateForFollowers( + ctx context.Context, + status *gtsmodel.Status, + follows []*gtsmodel.Follow, +) error { + var ( + errs gtserror.MultiError + ) + + for _, follow := range follows { + // Check to see if the status is timelineable for this follower, + // taking account of its visibility, who it replies to, and, if + // it's a reblog, whether follower account wants to see reblogs. + // + // If it's not timelineable, we can just stop early, since lists + // are prettymuch subsets of the home timeline, so if it shouldn't + // appear there, it shouldn't appear in lists either. + timelineable, err := s.filter.StatusHomeTimelineable( + ctx, follow.Account, status, + ) + if err != nil { + errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err) + continue + } + + if !timelineable { + // Nothing to do. + continue + } + + // Add status to any relevant lists + // for this follow, if applicable. + s.listTimelineStatusUpdateForFollow( + ctx, + status, + follow, + &errs, + ) + + // Add status to home timeline for owner + // of this follow, if applicable. + err = s.timelineStreamStatusUpdate( + ctx, + follow.Account, + status, + stream.TimelineHome, + ) + if err != nil { + errs.Appendf("error home timelining status: %w", err) + continue + } + } + + return errs.Combine() +} + +// listTimelineStatusUpdateForFollow pushes edits of the given status +// into any eligible lists streams opened by the given follower. +func (s *surface) listTimelineStatusUpdateForFollow( + ctx context.Context, + status *gtsmodel.Status, + follow *gtsmodel.Follow, + errs *gtserror.MultiError, +) { + // To put this status in appropriate list timelines, + // we need to get each listEntry that pertains to + // this follow. Then, we want to iterate through all + // those list entries, and add the status to the list + // that the entry belongs to if it meets criteria for + // inclusion in the list. + + // Get every list entry that targets this follow's ID. + listEntries, err := s.state.DB.GetListEntriesForFollowID( + // We only need the list IDs. + gtscontext.SetBarebones(ctx), + follow.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + errs.Appendf("error getting list entries: %w", err) + return + } + + // Check eligibility for each list entry (if any). + for _, listEntry := range listEntries { + eligible, err := s.listEligible(ctx, listEntry, status) + if err != nil { + errs.Appendf("error checking list eligibility: %w", err) + continue + } + + if !eligible { + // Don't add this. + continue + } + + // At this point we are certain this status + // should be included in the timeline of the + // list that this list entry belongs to. + if err := s.timelineStreamStatusUpdate( + ctx, + follow.Account, + status, + stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list + ); err != nil { + errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err) + // implicit continue + } + } +} + +// timelineStatusUpdate streams the edited status to the user using the +// given streamType. +func (s *surface) timelineStreamStatusUpdate( + ctx context.Context, + account *gtsmodel.Account, + status *gtsmodel.Status, + streamType string, +) error { + apiStatus, err := s.converter.StatusToAPIStatus(ctx, status, account) + if err != nil { + err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err) + return err + } + + if err := s.stream.StatusUpdate(apiStatus, account, []string{streamType}); err != nil { + err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err) + return err + } + + return nil +} |