summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/cache/db.go1
-rw-r--r--internal/cache/invalidate.go3
-rw-r--r--internal/db/bundb/user.go72
-rw-r--r--internal/processing/workers/fromclientapi_test.go121
-rw-r--r--internal/processing/workers/surfacetimeline.go110
5 files changed, 274 insertions, 33 deletions
diff --git a/internal/cache/db.go b/internal/cache/db.go
index 6b482d5f8..d31017ccd 100644
--- a/internal/cache/db.go
+++ b/internal/cache/db.go
@@ -154,6 +154,7 @@ type DBCaches struct {
Domains atomic.Pointer[int]
Statuses atomic.Pointer[int]
Users atomic.Pointer[int]
+ UserIDs atomic.Pointer[[]string]
}
// InteractionRequest provides access to the gtsmodel InteractionRequest database cache.
diff --git a/internal/cache/invalidate.go b/internal/cache/invalidate.go
index 569238e9b..863719b77 100644
--- a/internal/cache/invalidate.go
+++ b/internal/cache/invalidate.go
@@ -365,7 +365,8 @@ func (c *Caches) OnInvalidateUser(user *gtsmodel.User) {
c.Visibility.Invalidate("ItemID", user.AccountID)
c.Visibility.Invalidate("RequesterID", user.AccountID)
- // Invalidate the local users count.
+ // Invalidate the local user IDs / count.
+ c.DB.LocalInstance.UserIDs.Store(nil)
c.DB.LocalInstance.Users.Store(nil)
}
diff --git a/internal/db/bundb/user.go b/internal/db/bundb/user.go
index 2800a32e9..f51d1bf74 100644
--- a/internal/db/bundb/user.go
+++ b/internal/db/bundb/user.go
@@ -19,12 +19,15 @@ package bundb
import (
"context"
+ "slices"
"time"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
"code.superseriousbusiness.org/gotosocial/internal/gtsmodel"
+ "code.superseriousbusiness.org/gotosocial/internal/log"
"code.superseriousbusiness.org/gotosocial/internal/state"
+ "code.superseriousbusiness.org/gotosocial/internal/util/xslices"
"github.com/uptrace/bun"
)
@@ -45,27 +48,47 @@ func (u *userDB) GetUserByID(ctx context.Context, id string) (*gtsmodel.User, er
}
func (u *userDB) GetUsersByIDs(ctx context.Context, ids []string) ([]*gtsmodel.User, error) {
- var (
- users = make([]*gtsmodel.User, 0, len(ids))
-
- // Collect errors instead of
- // returning early on any.
- errs gtserror.MultiError
+ // Load all input user IDs via cache loader callback.
+ users, err := u.state.Caches.DB.User.LoadIDs("ID",
+ ids,
+ func(uncached []string) ([]*gtsmodel.User, error) {
+ // Preallocate expected length of uncached users.
+ users := make([]*gtsmodel.User, 0, len(uncached))
+
+ // Perform database query scanning
+ // the remaining (uncached) user IDs.
+ if err := u.db.NewSelect().
+ Model(&users).
+ Where("? IN (?)", bun.Ident("id"), bun.In(uncached)).
+ Scan(ctx); err != nil {
+ return nil, err
+ }
+
+ return users, nil
+ },
)
+ if err != nil {
+ return nil, err
+ }
- for _, id := range ids {
- // Attempt to fetch user from DB.
- user, err := u.GetUserByID(ctx, id)
- if err != nil {
- errs.Appendf("error getting user %s: %w", id, err)
- continue
- }
+ // Reorder the users by their
+ // IDs to ensure in correct order.
+ getID := func(s *gtsmodel.User) string { return s.ID }
+ xslices.OrderBy(users, ids, getID)
- // Append user to return slice.
- users = append(users, user)
+ if gtscontext.Barebones(ctx) {
+ // no need to fully populate.
+ return users, nil
}
- return users, errs.Combine()
+ // Populate all loaded users.
+ for _, user := range users {
+ if err := u.PopulateUser(ctx, user); err != nil {
+ log.Errorf(ctx, "error populating user %s: %v", user.ID, err)
+ }
+ }
+
+ return users, nil
}
func (u *userDB) GetUserByAccountID(ctx context.Context, accountID string) (*gtsmodel.User, error) {
@@ -161,7 +184,11 @@ func (u *userDB) PopulateUser(ctx context.Context, user *gtsmodel.User) error {
return errs.Combine()
}
-func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) {
+func (u *userDB) GetAllUserIDs(ctx context.Context) ([]string, error) {
+ if p := u.state.Caches.DB.LocalInstance.UserIDs.Load(); p != nil {
+ return slices.Clone(*p), nil
+ }
+
var userIDs []string
// Scan all user IDs into slice.
@@ -172,7 +199,16 @@ func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) {
return nil, err
}
- // Transform user IDs into user slice.
+ // Store the scanned user IDs in our local cache ptr.
+ u.state.Caches.DB.LocalInstance.UserIDs.Store(&userIDs)
+ return userIDs, nil
+}
+
+func (u *userDB) GetAllUsers(ctx context.Context) ([]*gtsmodel.User, error) {
+ userIDs, err := u.GetAllUserIDs(ctx)
+ if err != nil {
+ return nil, err
+ }
return u.GetUsersByIDs(ctx, userIDs)
}
diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go
index 7da34ff42..5967d4d34 100644
--- a/internal/processing/workers/fromclientapi_test.go
+++ b/internal/processing/workers/fromclientapi_test.go
@@ -262,9 +262,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
receivingAccount,
[]string{testList.ID},
)
- homeStream = streams[stream.TimelineHome]
- listStream = streams[stream.TimelineList+":"+testList.ID]
- notifStream = streams[stream.TimelineNotifications]
+ publicStream = streams[stream.TimelinePublic]
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+ notifStream = streams[stream.TimelineNotifications]
// Admin account posts a new top-level status.
status = suite.newStatus(
@@ -310,6 +311,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
receivingAccount,
)
+ // Check message in public stream.
+ suite.checkStreamed(
+ publicStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+
// Check message in home stream.
suite.checkStreamed(
homeStream,
@@ -379,9 +388,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBackfilledStatusWithNotifi
receivingAccount,
[]string{testList.ID},
)
- homeStream = streams[stream.TimelineHome]
- listStream = streams[stream.TimelineList+":"+testList.ID]
- notifStream = streams[stream.TimelineNotifications]
+ publicStream = streams[stream.TimelinePublic]
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+ notifStream = streams[stream.TimelineNotifications]
// Admin account posts a new top-level status.
status = suite.newStatus(
@@ -420,6 +430,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBackfilledStatusWithNotifi
suite.FailNow(err.Error())
}
+ // There should be no message in public stream.
+ suite.checkStreamed(
+ publicStream,
+ false,
+ "",
+ "",
+ )
+
// There should be no message in the home stream.
suite.checkStreamed(
homeStream,
@@ -530,6 +548,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
receivingAccount = suite.testAccounts["local_account_1"]
testList = suite.testLists["local_account_1_list_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
+ publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@@ -571,6 +590,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
receivingAccount,
)
+ // Check message *not* in public stream.
+ suite.checkStreamed(
+ publicStream,
+ false,
+ "",
+ "",
+ )
+
// Check message in home stream.
suite.checkStreamed(
homeStream,
@@ -732,6 +759,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
postingAccount = suite.testAccounts["admin_account"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
+ publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@@ -778,6 +806,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
receivingAccount,
)
+ // Check message *not* in public stream.
+ suite.checkStreamed(
+ publicStream,
+ false,
+ "",
+ "",
+ )
+
// Check message in home stream.
suite.checkStreamed(
homeStream,
@@ -811,6 +847,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
postingAccount = suite.testAccounts["admin_account"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
+ publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@@ -863,6 +900,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
receivingAccount,
)
+ // Check message *not* in public stream.
+ suite.checkStreamed(
+ publicStream,
+ false,
+ "",
+ "",
+ )
+
// Check message in home stream.
suite.checkStreamed(
homeStream,
@@ -896,6 +941,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
postingAccount = suite.testAccounts["admin_account"]
receivingAccount = suite.testAccounts["local_account_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
+ publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@@ -942,6 +988,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
receivingAccount,
)
+ // Check message *not* in public stream.
+ suite.checkStreamed(
+ publicStream,
+ false,
+ "",
+ "",
+ )
+
// Check message in home stream.
suite.checkStreamed(
homeStream,
@@ -972,6 +1026,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
receivingAccount = suite.testAccounts["local_account_1"]
testList = suite.testLists["local_account_1_list_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
+ publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@@ -1009,6 +1064,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
receivingAccount,
)
+ // Check message *not* in public stream.
+ suite.checkStreamed(
+ publicStream,
+ false,
+ "",
+ "",
+ )
+
// Check message in home stream.
suite.checkStreamed(
homeStream,
@@ -1039,6 +1102,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
receivingAccount = suite.testAccounts["local_account_1"]
testList = suite.testLists["local_account_1_list_1"]
streams = suite.openStreams(ctx, testStructs.Processor, receivingAccount, []string{testList.ID})
+ publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
listStream = streams[stream.TimelineList+":"+testList.ID]
@@ -1078,6 +1142,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
suite.FailNow(err.Error())
}
+ // Check message *not* in public stream.
+ suite.checkStreamed(
+ publicStream,
+ false,
+ "",
+ "",
+ )
+
// Check message NOT in home stream.
suite.checkStreamed(
homeStream,
@@ -1763,8 +1835,9 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
receivingAccount,
[]string{testList.ID},
)
- homeStream = streams[stream.TimelineHome]
- listStream = streams[stream.TimelineList+":"+testList.ID]
+ publicStream = streams[stream.TimelinePublic]
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
// postingAccount posts a new public status not mentioning anyone.
status = suite.newStatus(
@@ -1802,6 +1875,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
suite.FailNow(err.Error())
}
+ // Check status in public stream.
+ suite.checkStreamed(
+ publicStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
// Check status in list stream.
suite.checkStreamed(
listStream,
@@ -1857,6 +1938,7 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
testExclusiveList.ID,
},
)
+ publicStream = streams[stream.TimelinePublic]
homeStream = streams[stream.TimelineHome]
inclusiveListStream = streams[stream.TimelineList+":"+testInclusiveList.ID]
exclusiveListStream = streams[stream.TimelineList+":"+testExclusiveList.ID]
@@ -1911,6 +1993,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
suite.FailNow(err.Error())
}
+ // Check status in public stream.
+ suite.checkStreamed(
+ publicStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
// Check status in inclusive list stream.
suite.checkStreamed(
inclusiveListStream,
@@ -1957,9 +2047,10 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
receivingAccount,
[]string{testList.ID},
)
- homeStream = streams[stream.TimelineHome]
- listStream = streams[stream.TimelineList+":"+testList.ID]
- notifStream = streams[stream.TimelineNotifications]
+ publicStream = streams[stream.TimelinePublic]
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+ notifStream = streams[stream.TimelineNotifications]
// postingAccount posts a new public status not mentioning anyone.
status = suite.newStatus(
@@ -2005,6 +2096,14 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithAuthorOnExclusiv
suite.FailNow(err.Error())
}
+ // Check status in public stream.
+ suite.checkStreamed(
+ publicStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
// Check status in list stream.
suite.checkStreamed(
listStream,
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
index b1177cd28..0e30f54f7 100644
--- a/internal/processing/workers/surfacetimeline.go
+++ b/internal/processing/workers/surfacetimeline.go
@@ -20,6 +20,7 @@ package workers
import (
"context"
+ apimodel "code.superseriousbusiness.org/gotosocial/internal/api/model"
"code.superseriousbusiness.org/gotosocial/internal/cache/timeline"
"code.superseriousbusiness.org/gotosocial/internal/gtscontext"
"code.superseriousbusiness.org/gotosocial/internal/gtserror"
@@ -37,6 +38,7 @@ import (
// the account, notifications for any local accounts that want
// to know when this account posts, and conversations containing the status.
func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {
+
// Ensure status fully populated; including account, mentions, etc.
if err := s.State.DB.PopulateStatus(ctx, status); err != nil {
return gtserror.Newf("error populating status with id %s: %w", status.ID, err)
@@ -59,6 +61,11 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
})
}
+ // Stream the status for public timelines for all local users as update msg.
+ if err := s.timelineStatusForPublic(ctx, status, s.Stream.Update); err != nil {
+ return err
+ }
+
// Timeline the status for each local follower of this account. This will
// also handle notifying any followers with notify set to true on their follow.
homeTimelinedAccountIDs := s.timelineAndNotifyStatusForFollowers(ctx, status, follows)
@@ -68,16 +75,18 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return gtserror.Newf("error timelining status %s for tag followers: %w", status.ID, err)
}
- // Notify each local account that's mentioned by this status.
+ // Notify each local account mentioned by status.
if err := s.notifyMentions(ctx, status); err != nil {
return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)
}
- // Update any conversations containing this status, and send conversation notifications.
+ // Update any conversations containing this status, and get notifications for them.
notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status)
if err != nil {
return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err)
}
+
+ // Stream these conversation notfications.
for _, notification := range notifications {
s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation)
}
@@ -85,6 +94,95 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return nil
}
+// timelineStatusForPublic timelines the given status
+// to LOCAL and PUBLIC (i.e. federated) timelines.
+func (s *Surface) timelineStatusForPublic(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ streamFn func(context.Context, *gtsmodel.Account, *apimodel.Status, string),
+) error {
+ // Nil check function
+ // outside main loop.
+ if streamFn == nil {
+ panic("nil func")
+ }
+
+ if status.Visibility != gtsmodel.VisibilityPublic ||
+ status.BoostOfID != "" {
+ // Fast code path, if it's not "public"
+ // or a boost, don't public timeline it.
+ return nil
+ }
+
+ // Get a list of all our local users.
+ users, err := s.State.DB.GetAllUsers(ctx)
+ if err != nil {
+ return gtserror.Newf("error getting local users: %v", err)
+ }
+
+ // Iterate our list of users.
+ isLocal := status.IsLocal()
+ for _, user := range users {
+
+ // Check whether this status should be visible this user on public timelines.
+ visible, err := s.VisFilter.StatusPublicTimelineable(ctx, user.Account, status)
+ if err != nil {
+ log.Errorf(ctx, "error checking status %s visibility: %v", status.URI, err)
+ continue
+ }
+
+ if !visible {
+ continue
+ }
+
+ // Check whether this status is muted in any form by this user.
+ muted, err := s.MuteFilter.StatusMuted(ctx, user.Account, status)
+ if err != nil {
+ log.Errorf(ctx, "error checking status %s mutes: %v", status.URI, err)
+ continue
+ }
+
+ if muted {
+ continue
+ }
+
+ // Get status-filter results for this status in context by this user.
+ filtered, hidden, err := s.StatusFilter.StatusFilterResultsInContext(ctx,
+ user.Account,
+ status,
+ gtsmodel.FilterContextPublic,
+ )
+ if err != nil {
+ log.Errorf(ctx, "error getting status %s filter results: %v", status.URI, err)
+ continue
+ }
+
+ if hidden {
+ continue
+ }
+
+ // Now all checks / filters are passed, convert status to frontend model.
+ apiStatus, err := s.Converter.StatusToAPIStatus(ctx, status, user.Account)
+ if err != nil {
+ log.Errorf(ctx, "error converting status %s: %v", status.URI, err)
+ continue
+ }
+
+ // Set API model filter results.
+ apiStatus.Filtered = filtered
+
+ if isLocal {
+ // This is local status, send it to local timeline stream.
+ streamFn(ctx, user.Account, apiStatus, stream.TimelineLocal)
+ }
+
+ // For public timeline stream, send all local / remote statuses.
+ streamFn(ctx, user.Account, apiStatus, stream.TimelinePublic)
+ }
+
+ return nil
+}
+
// timelineAndNotifyStatusForFollowers iterates through the given
// slice of followers of the account that posted the given status,
// adding the status to list timelines + home timelines of each
@@ -409,8 +507,9 @@ func (s *Surface) timelineAndNotifyStatusForTagFollowers(
status = status.BoostOf
}
+ var errs gtserror.MultiError
+
// Insert the status into the home timeline of each tag follower.
- errs := gtserror.MultiError{}
for _, tagFollowerAccount := range tagFollowerAccounts {
_ = s.timelineStatus(ctx,
s.State.Caches.Timelines.Home.MustGet(tagFollowerAccount.ID),
@@ -537,6 +636,11 @@ func (s *Surface) timelineStatusUpdate(ctx context.Context, status *gtsmodel.Sta
})
}
+ // Stream the status update for public timelines for all of our local users.
+ if err := s.timelineStatusForPublic(ctx, status, s.Stream.StatusUpdate); err != nil {
+ return err
+ }
+
// Push updated status to streams for each local follower of this account.
homeTimelinedAccountIDs := s.timelineStatusUpdateForFollowers(ctx, status, follows)