diff options
| author | 2025-04-26 09:56:15 +0000 | |
|---|---|---|
| committer | 2025-04-26 09:56:15 +0000 | |
| commit | 6a6a4993338262f87df34c9be051bfaac75c1829 (patch) | |
| tree | bfbda090dc4b25efdd34145c016d7cc7b9c14d6e /internal/processing | |
| parent | [chore] Move deps to code.superseriousbusiness.org (#4054) (diff) | |
| download | gotosocial-6a6a4993338262f87df34c9be051bfaac75c1829.tar.xz | |
[performance] rewrite timelines to rely on new timeline cache type (#3941)
* start work rewriting timeline cache type
* further work rewriting timeline caching
* more work integration new timeline code
* remove old code
* add local timeline, fix up merge conflicts
* remove old use of go-bytes
* implement new timeline code into more areas of codebase, pull in latest go-mangler, go-mutexes, go-structr
* remove old timeline package, add local timeline cache
* remove references to old timeline types that needed starting up in tests
* start adding page validation
* fix test-identified timeline cache package issues
* fix up more tests, fix missing required changes, etc
* add exclusion for test.out in gitignore
* clarify some things better in code comments
* tweak cache size limits
* fix list timeline cache fetching
* further list timeline fixes
* linter, ssssssssshhhhhhhhhhhh please
* fix linter hints
* reslice the output if it's beyond length of 'lim'
* remove old timeline initialization code, bump go-structr to v0.9.4
* continued from previous commit
* improved code comments
* don't allow multiple entries for BoostOfID values to prevent repeated boosts of same boosts
* finish writing more code comments
* some variable renaming, for ease of following
* change the way we update lo,hi paging values during timeline load
* improved code comments for updated / returned lo , hi paging values
* finish writing code comments for the StatusTimeline{} type itself
* fill in more code comments
* update go-structr version to latest with changed timeline unique indexing logic
* have a local and public timeline *per user*
* rewrite calls to public / local timeline calls
* remove the zero length check, as lo, hi values might still be set
* simplify timeline cache loading, fix lo/hi returns, fix timeline invalidation side-effects missing for some federated actions
* swap the lo, hi values :facepalm:
* add (now) missing slice reverse of tag timeline statuses when paging ASC
* remove local / public caches (is out of scope for this work), share more timeline code
* remove unnecessary change
* again, remove more unused code
* remove unused function to appease the linter
* move boost checking to prepare function
* fix use of timeline.lastOrder, fix incorrect range functions used
* remove comments for repeat code
* remove the boost logic from prepare function
* do a maximum of 5 loads, not 10
* add repeat boost filtering logic, update go-structr, general improvements
* more code comments
* add important note
* fix timeline tests now that timelines are returned in page order
* remove unused field
* add StatusTimeline{} tests
* add more status timeline tests
* start adding preloading support
* ensure repeat boosts are marked in preloaded entries
* share a bunch of the database load code in timeline cache, don't clear timelines on relationship change
* add logic to allow dynamic clear / preloading of timelines
* comment-out unused functions, but leave in place as we might end-up using them
* fix timeline preload state check
* much improved status timeline code comments
* more code comments, don't bother inserting statuses if timeline not preloaded
* shift around some logic to make sure things aren't accidentally left set
* finish writing code comments
* remove trim-after-insert behaviour
* fix-up some comments referring to old logic
* remove unsetting of lo, hi
* fix preload repeatBoost checking logic
* don't return on status filter errors, these are usually transient
* better concurrency safety in Clear() and Done()
* fix test broken due to addition of preloader
* fix repeatBoost logic that doesn't account for already-hidden repeatBoosts
* ensure edit submodels are dropped on cache insertion
* update code-comment to expand CAS accronym
* use a plus1hULID() instead of 24h
* remove unused functions
* add note that public / local timeline requester can be nil
* fix incorrect visibility filtering of tag timeline statuses
* ensure we filter home timeline statuses on local only
* some small re-orderings to confirm query params in correct places
* fix the local only home timeline filter func
Diffstat (limited to 'internal/processing')
20 files changed, 731 insertions, 821 deletions
diff --git a/internal/processing/account/account_test.go b/internal/processing/account/account_test.go index 4173162cc..5b0c5f01e 100644 --- a/internal/processing/account/account_test.go +++ b/internal/processing/account/account_test.go @@ -95,12 +95,6 @@ func (suite *AccountStandardTestSuite) SetupTest() { suite.state.AdminActions = admin.New(suite.state.DB, &suite.state.Workers) suite.tc = typeutils.NewConverter(&suite.state) - testrig.StartTimelines( - &suite.state, - visibility.NewFilter(&suite.state), - suite.tc, - ) - suite.storage = testrig.NewInMemoryStorage() suite.state.Storage = suite.storage suite.mediaManager = testrig.NewTestMediaManager(&suite.state) diff --git a/internal/processing/admin/admin_test.go b/internal/processing/admin/admin_test.go index 804abbc62..93f20d5e6 100644 --- a/internal/processing/admin/admin_test.go +++ b/internal/processing/admin/admin_test.go @@ -92,12 +92,6 @@ func (suite *AdminStandardTestSuite) SetupTest() { suite.state.AdminActions = adminactions.New(suite.state.DB, &suite.state.Workers) suite.tc = typeutils.NewConverter(&suite.state) - testrig.StartTimelines( - &suite.state, - visibility.NewFilter(&suite.state), - suite.tc, - ) - suite.storage = testrig.NewInMemoryStorage() suite.state.Storage = suite.storage suite.mediaManager = testrig.NewTestMediaManager(&suite.state) diff --git a/internal/processing/common/status.go b/internal/processing/common/status.go index 01f2ab72d..532b531e5 100644 --- a/internal/processing/common/status.go +++ b/internal/processing/common/status.go @@ -306,25 +306,10 @@ func (p *Processor) InvalidateTimelinedStatus(ctx context.Context, accountID str return gtserror.Newf("db error getting lists for account %s: %w", accountID, err) } - // Start new log entry with - // the above calling func's name. - l := log. - WithContext(ctx). - WithField("caller", log.Caller(3)). - WithField("accountID", accountID). - WithField("statusID", statusID) - - // Unprepare item from home + list timelines, just log - // if something goes wrong since this is not a showstopper. - - if err := p.state.Timelines.Home.UnprepareItem(ctx, accountID, statusID); err != nil { - l.Errorf("error unpreparing item from home timeline: %v", err) - } - + // Unprepare item from home + list timelines. + p.state.Caches.Timelines.Home.MustGet(accountID).UnprepareByStatusIDs(statusID) for _, list := range lists { - if err := p.state.Timelines.List.UnprepareItem(ctx, list.ID, statusID); err != nil { - l.Errorf("error unpreparing item from list timeline %s: %v", list.ID, err) - } + p.state.Caches.Timelines.List.MustGet(list.ID).UnprepareByStatusIDs(statusID) } return nil diff --git a/internal/processing/conversations/conversations_test.go b/internal/processing/conversations/conversations_test.go index fecaf5666..06eef0e97 100644 --- a/internal/processing/conversations/conversations_test.go +++ b/internal/processing/conversations/conversations_test.go @@ -106,12 +106,6 @@ func (suite *ConversationsTestSuite) SetupTest() { suite.tc = typeutils.NewConverter(&suite.state) suite.filter = visibility.NewFilter(&suite.state) - testrig.StartTimelines( - &suite.state, - suite.filter, - suite.tc, - ) - suite.storage = testrig.NewInMemoryStorage() suite.state.Storage = suite.storage suite.mediaManager = testrig.NewTestMediaManager(&suite.state) diff --git a/internal/processing/processor_test.go b/internal/processing/processor_test.go index 4b6406b03..a743f75ee 100644 --- a/internal/processing/processor_test.go +++ b/internal/processing/processor_test.go @@ -109,12 +109,6 @@ func (suite *ProcessingStandardTestSuite) SetupTest() { suite.state.Storage = suite.storage suite.typeconverter = typeutils.NewConverter(&suite.state) - testrig.StartTimelines( - &suite.state, - visibility.NewFilter(&suite.state), - suite.typeconverter, - ) - suite.httpClient = testrig.NewMockHTTPClient(nil, "../../testrig/media") suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople() suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses() diff --git a/internal/processing/status/status_test.go b/internal/processing/status/status_test.go index c163f95a7..19f3f5ebc 100644 --- a/internal/processing/status/status_test.go +++ b/internal/processing/status/status_test.go @@ -93,11 +93,6 @@ func (suite *StatusStandardTestSuite) SetupTest() { visFilter := visibility.NewFilter(&suite.state) intFilter := interaction.NewFilter(&suite.state) - testrig.StartTimelines( - &suite.state, - visFilter, - suite.typeConverter, - ) common := common.New(&suite.state, suite.mediaManager, suite.typeConverter, suite.federator, visFilter) polls := polls.New(&common, &suite.state, suite.typeConverter) diff --git a/internal/processing/timeline/common.go b/internal/processing/timeline/common.go deleted file mode 100644 index 6d29d81d6..000000000 --- a/internal/processing/timeline/common.go +++ /dev/null @@ -1,71 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package timeline - -import ( - "context" - - "github.com/superseriousbusiness/gotosocial/internal/timeline" -) - -// SkipInsert returns a function that satisifes SkipInsertFunction. -func SkipInsert() timeline.SkipInsertFunction { - // Gap to allow between a status or boost of status, - // and reinsertion of a new boost of that status. - // This is useful to avoid a heavily boosted status - // showing up way too often in a user's timeline. - const boostReinsertionDepth = 50 - - return func( - ctx context.Context, - newItemID string, - newItemAccountID string, - newItemBoostOfID string, - newItemBoostOfAccountID string, - nextItemID string, - nextItemAccountID string, - nextItemBoostOfID string, - nextItemBoostOfAccountID string, - depth int, - ) (bool, error) { - if newItemID == nextItemID { - // Don't insert duplicates. - return true, nil - } - - if newItemBoostOfID != "" { - if newItemBoostOfID == nextItemBoostOfID && - depth < boostReinsertionDepth { - // Don't insert boosts of items - // we've seen boosted recently. - return true, nil - } - - if newItemBoostOfID == nextItemID && - depth < boostReinsertionDepth { - // Don't insert boosts of items when - // we've seen the original recently. - return true, nil - } - } - - // Proceed with insertion - // (that's what she said!). - return false, nil - } -} diff --git a/internal/processing/timeline/faved.go b/internal/processing/timeline/faved.go index 6e915f4ef..bdafcac36 100644 --- a/internal/processing/timeline/faved.go +++ b/internal/processing/timeline/faved.go @@ -31,6 +31,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/util" ) +// FavedTimelineGet ... func (p *Processor) FavedTimelineGet(ctx context.Context, authed *apiutil.Auth, maxID string, minID string, limit int) (*apimodel.PageableResponse, gtserror.WithCode) { statuses, nextMaxID, prevMinID, err := p.state.DB.GetFavedTimeline(ctx, authed.Account.ID, maxID, minID, limit) if err != nil && !errors.Is(err, db.ErrNoEntries) { diff --git a/internal/processing/timeline/home.go b/internal/processing/timeline/home.go index 38cf38405..61fef005b 100644 --- a/internal/processing/timeline/home.go +++ b/internal/processing/timeline/home.go @@ -19,132 +19,85 @@ package timeline import ( "context" - "errors" + "net/url" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" - "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/state" - "github.com/superseriousbusiness/gotosocial/internal/timeline" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) -// HomeTimelineGrab returns a function that satisfies GrabFunction for home timelines. -func HomeTimelineGrab(state *state.State) timeline.GrabFunction { - return func(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { - statuses, err := state.DB.GetHomeTimeline(ctx, accountID, maxID, sinceID, minID, limit, false) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses from db: %w", err) - return nil, false, err +// HomeTimelineGet gets a pageable timeline of statuses +// in the home timeline of the requesting account. +func (p *Processor) HomeTimelineGet( + ctx context.Context, + requester *gtsmodel.Account, + page *paging.Page, + local bool, +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + + var pageQuery url.Values + var postFilter func(*gtsmodel.Status) bool + if local { + // Set local = true query. + pageQuery = localOnlyTrue + postFilter = func(s *gtsmodel.Status) bool { + return !*s.Local } - - count := len(statuses) - if count == 0 { - // We just don't have enough statuses - // left in the db so return stop = true. - return nil, true, nil - } - - items := make([]timeline.Timelineable, count) - for i, s := range statuses { - items[i] = s - } - - return items, false, nil - } -} - -// HomeTimelineFilter returns a function that satisfies FilterFunction for home timelines. -func HomeTimelineFilter(state *state.State, visFilter *visibility.Filter) timeline.FilterFunction { - return func(ctx context.Context, accountID string, item timeline.Timelineable) (shouldIndex bool, err error) { - status, ok := item.(*gtsmodel.Status) - if !ok { - err = gtserror.New("could not convert item to *gtsmodel.Status") - return false, err - } - - requestingAccount, err := state.DB.GetAccountByID(ctx, accountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", accountID, err) - return false, err - } - - timelineable, err := visFilter.StatusHomeTimelineable(ctx, requestingAccount, status) - if err != nil { - err = gtserror.Newf("error checking hometimelineability of status %s for account %s: %w", status.ID, accountID, err) - return false, err - } - - return timelineable, nil - } -} - -// HomeTimelineStatusPrepare returns a function that satisfies PrepareFunction for home timelines. -func HomeTimelineStatusPrepare(state *state.State, converter *typeutils.Converter) timeline.PrepareFunction { - return func(ctx context.Context, accountID string, itemID string) (timeline.Preparable, error) { - status, err := state.DB.GetStatusByID(ctx, itemID) - if err != nil { - err = gtserror.Newf("error getting status with id %s: %w", itemID, err) - return nil, err - } - - requestingAccount, err := state.DB.GetAccountByID(ctx, accountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", accountID, err) - return nil, err - } - - filters, err := state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requestingAccount.ID, err) - return nil, err - } - - mutes, err := state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requestingAccount.ID, err) - return nil, err - } - compiledMutes := usermute.NewCompiledUserMuteList(mutes) - - return converter.StatusToAPIStatus(ctx, status, requestingAccount, statusfilter.FilterContextHome, filters, compiledMutes) + } else { + // Set local = false query. + pageQuery = localOnlyFalse + postFilter = nil } -} - -func (p *Processor) HomeTimelineGet(ctx context.Context, authed *apiutil.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { - statuses, err := p.state.Timelines.Home.GetTimeline(ctx, authed.Account.ID, maxID, sinceID, minID, limit, local) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } - - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } - - var ( - items = make([]interface{}, count) - nextMaxIDValue = statuses[count-1].GetID() - prevMinIDValue = statuses[0].GetID() + return p.getStatusTimeline(ctx, + + // Auth'd + // account. + requester, + + // Keyed-by-account-ID, home timeline cache. + p.state.Caches.Timelines.Home.MustGet(requester.ID), + + // Current + // page. + page, + + // Home timeline endpoint. + "/api/v1/timelines/home", + + // Set local-only timeline + // page query flag, (this map + // later gets copied before + // any further usage). + pageQuery, + + // Status filter context. + statusfilter.FilterContextHome, + + // Database load function. + func(pg *paging.Page) (statuses []*gtsmodel.Status, err error) { + return p.state.DB.GetHomeTimeline(ctx, requester.ID, pg) + }, + + // Filtering function, + // i.e. filter before caching. + func(s *gtsmodel.Status) bool { + + // Check the visibility of passed status to requesting user. + ok, err := p.visFilter.StatusHomeTimelineable(ctx, requester, s) + if err != nil { + log.Errorf(ctx, "error filtering status %s: %v", s.URI, err) + } + return !ok + }, + + // Post filtering funtion, + // i.e. filter after caching. + postFilter, ) - - for i := range statuses { - items[i] = statuses[i] - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "/api/v1/timelines/home", - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) } diff --git a/internal/processing/timeline/home_test.go b/internal/processing/timeline/home_test.go index ea56418f6..50025b9a8 100644 --- a/internal/processing/timeline/home_test.go +++ b/internal/processing/timeline/home_test.go @@ -23,13 +23,9 @@ import ( "github.com/stretchr/testify/suite" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" - tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" - "github.com/superseriousbusiness/gotosocial/internal/timeline" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -37,25 +33,7 @@ type HomeTestSuite struct { TimelineStandardTestSuite } -func (suite *HomeTestSuite) SetupTest() { - suite.TimelineStandardTestSuite.SetupTest() - - suite.state.Timelines.Home = timeline.NewManager( - tlprocessor.HomeTimelineGrab(&suite.state), - tlprocessor.HomeTimelineFilter(&suite.state, visibility.NewFilter(&suite.state)), - tlprocessor.HomeTimelineStatusPrepare(&suite.state, typeutils.NewConverter(&suite.state)), - tlprocessor.SkipInsert(), - ) - if err := suite.state.Timelines.Home.Start(); err != nil { - suite.FailNow(err.Error()) - } -} - func (suite *HomeTestSuite) TearDownTest() { - if err := suite.state.Timelines.Home.Stop(); err != nil { - suite.FailNow(err.Error()) - } - suite.TimelineStandardTestSuite.TearDownTest() } @@ -64,7 +42,6 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() { var ( ctx = context.Background() requester = suite.testAccounts["local_account_1"] - authed = &apiutil.Auth{Account: requester} maxID = "" sinceID = "" minID = "01F8MHAAY43M6RJ473VQFCVH36" // 1 before filteredStatus @@ -97,11 +74,12 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() { // Fetch the timeline to make sure the status we're going to filter is in that section of it. resp, errWithCode := suite.timeline.HomeTimelineGet( ctx, - authed, - maxID, - sinceID, - minID, - limit, + requester, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) suite.NoError(errWithCode) @@ -114,10 +92,9 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() { if !filteredStatusFound { suite.FailNow("precondition failed: status we would filter isn't present in unfiltered timeline") } - // Prune the timeline to drop cached prepared statuses, a side effect of this precondition check. - if _, err := suite.state.Timelines.Home.Prune(ctx, requester.ID, 0, 0); err != nil { - suite.FailNow(err.Error()) - } + + // Clear the timeline to drop all cached statuses. + suite.state.Caches.Timelines.Home.Clear(requester.ID) // Create a filter to hide one status on the timeline. if err := suite.db.PutFilter(ctx, filter); err != nil { @@ -127,11 +104,12 @@ func (suite *HomeTestSuite) TestHomeTimelineGetHideFiltered() { // Fetch the timeline again with the filter in place. resp, errWithCode = suite.timeline.HomeTimelineGet( ctx, - authed, - maxID, - sinceID, - minID, - limit, + requester, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) diff --git a/internal/processing/timeline/list.go b/internal/processing/timeline/list.go index 147f87ab4..10a7bb388 100644 --- a/internal/processing/timeline/list.go +++ b/internal/processing/timeline/list.go @@ -22,155 +22,93 @@ import ( "errors" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/state" - "github.com/superseriousbusiness/gotosocial/internal/timeline" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) -// ListTimelineGrab returns a function that satisfies GrabFunction for list timelines. -func ListTimelineGrab(state *state.State) timeline.GrabFunction { - return func(ctx context.Context, listID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { - statuses, err := state.DB.GetListTimeline(ctx, listID, maxID, sinceID, minID, limit) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses from db: %w", err) - return nil, false, err - } - - count := len(statuses) - if count == 0 { - // We just don't have enough statuses - // left in the db so return stop = true. - return nil, true, nil - } - - items := make([]timeline.Timelineable, count) - for i, s := range statuses { - items[i] = s - } - - return items, false, nil +// ListTimelineGet gets a pageable timeline of statuses +// in the list timeline of ID by the requesting account. +func (p *Processor) ListTimelineGet( + ctx context.Context, + requester *gtsmodel.Account, + listID string, + page *paging.Page, +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + // Fetch the requested list with ID. + list, err := p.state.DB.GetListByID( + gtscontext.SetBarebones(ctx), + listID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError(err) } -} -// ListTimelineFilter returns a function that satisfies FilterFunction for list timelines. -func ListTimelineFilter(state *state.State, visFilter *visibility.Filter) timeline.FilterFunction { - return func(ctx context.Context, listID string, item timeline.Timelineable) (shouldIndex bool, err error) { - status, ok := item.(*gtsmodel.Status) - if !ok { - err = gtserror.New("could not convert item to *gtsmodel.Status") - return false, err - } - - list, err := state.DB.GetListByID(ctx, listID) - if err != nil { - err = gtserror.Newf("error getting list with id %s: %w", listID, err) - return false, err - } - - requestingAccount, err := state.DB.GetAccountByID(ctx, list.AccountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", list.AccountID, err) - return false, err - } - - timelineable, err := visFilter.StatusHomeTimelineable(ctx, requestingAccount, status) - if err != nil { - err = gtserror.Newf("error checking hometimelineability of status %s for account %s: %w", status.ID, list.AccountID, err) - return false, err - } - - return timelineable, nil + // Check exists. + if list == nil { + const text = "list not found" + return nil, gtserror.NewErrorNotFound( + errors.New(text), + text, + ) } -} -// ListTimelineStatusPrepare returns a function that satisfies PrepareFunction for list timelines. -func ListTimelineStatusPrepare(state *state.State, converter *typeutils.Converter) timeline.PrepareFunction { - return func(ctx context.Context, listID string, itemID string) (timeline.Preparable, error) { - status, err := state.DB.GetStatusByID(ctx, itemID) - if err != nil { - err = gtserror.Newf("error getting status with id %s: %w", itemID, err) - return nil, err - } - - list, err := state.DB.GetListByID(ctx, listID) - if err != nil { - err = gtserror.Newf("error getting list with id %s: %w", listID, err) - return nil, err - } - - requestingAccount, err := state.DB.GetAccountByID(ctx, list.AccountID) - if err != nil { - err = gtserror.Newf("error getting account with id %s: %w", list.AccountID, err) - return nil, err - } - - filters, err := state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requestingAccount.ID, err) - return nil, err - } - - mutes, err := state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requestingAccount.ID, err) - return nil, err - } - compiledMutes := usermute.NewCompiledUserMuteList(mutes) - - return converter.StatusToAPIStatus(ctx, status, requestingAccount, statusfilter.FilterContextHome, filters, compiledMutes) + // Check list owned by auth'd account. + if list.AccountID != requester.ID { + err := gtserror.New("list does not belong to account") + return nil, gtserror.NewErrorNotFound(err) } -} -func (p *Processor) ListTimelineGet(ctx context.Context, authed *apiutil.Auth, listID string, maxID string, sinceID string, minID string, limit int) (*apimodel.PageableResponse, gtserror.WithCode) { - // Ensure list exists + is owned by this account. - list, err := p.state.DB.GetListByID(ctx, listID) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - return nil, gtserror.NewErrorNotFound(err) - } - return nil, gtserror.NewErrorInternalError(err) - } + // Fetch status timeline for list. + return p.getStatusTimeline(ctx, - if list.AccountID != authed.Account.ID { - err = gtserror.Newf("list with id %s does not belong to account %s", list.ID, authed.Account.ID) - return nil, gtserror.NewErrorNotFound(err) - } + // Auth'd + // account. + requester, - statuses, err := p.state.Timelines.List.GetTimeline(ctx, listID, maxID, sinceID, minID, limit, false) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } + // Keyed-by-list-ID, list timeline cache. + p.state.Caches.Timelines.List.MustGet(listID), - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } + // Current + // page. + page, - var ( - items = make([]interface{}, count) - nextMaxIDValue = statuses[count-1].GetID() - prevMinIDValue = statuses[0].GetID() - ) + // List timeline ID's endpoint. + "/api/v1/timelines/list/"+listID, - for i := range statuses { - items[i] = statuses[i] - } + // No page + // query. + nil, - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "/api/v1/timelines/list/" + listID, - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) + // Status filter context. + statusfilter.FilterContextHome, + + // Database load function. + func(pg *paging.Page) (statuses []*gtsmodel.Status, err error) { + return p.state.DB.GetListTimeline(ctx, listID, pg) + }, + + // Filtering function, + // i.e. filter before caching. + func(s *gtsmodel.Status) bool { + + // Check the visibility of passed status to requesting user. + ok, err := p.visFilter.StatusHomeTimelineable(ctx, requester, s) + if err != nil { + log.Errorf(ctx, "error filtering status %s: %v", s.URI, err) + } + return !ok + }, + + // Post filtering funtion, + // i.e. filter after caching. + nil, + ) } diff --git a/internal/processing/timeline/notification.go b/internal/processing/timeline/notification.go index 04a898198..ba1e3dba8 100644 --- a/internal/processing/timeline/notification.go +++ b/internal/processing/timeline/notification.go @@ -36,6 +36,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/util" ) +// NotificationsGet ... func (p *Processor) NotificationsGet( ctx context.Context, authed *apiutil.Auth, diff --git a/internal/processing/timeline/public.go b/internal/processing/timeline/public.go index dc00688e3..0e675da14 100644 --- a/internal/processing/timeline/public.go +++ b/internal/processing/timeline/public.go @@ -19,152 +19,143 @@ package timeline import ( "context" - "errors" - "strconv" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" - "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/paging" ) +// PublicTimelineGet gets a pageable timeline of public statuses +// for the given requesting account. It ensures that each status +// in timeline is visible to the account before returning it. +// +// The local argument limits this to local-only statuses. func (p *Processor) PublicTimelineGet( ctx context.Context, requester *gtsmodel.Account, - maxID string, - sinceID string, - minID string, - limit int, + page *paging.Page, local bool, -) (*apimodel.PageableResponse, gtserror.WithCode) { - const maxAttempts = 3 - var ( - nextMaxIDValue string - prevMinIDValue string - items = make([]any, 0, limit) - ) - - var filters []*gtsmodel.Filter - var compiledMutes *usermute.CompiledUserMuteList - if requester != nil { - var err error - filters, err = p.state.DB.GetFiltersForAccountID(ctx, requester.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requester.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } - - mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requester.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requester.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } - compiledMutes = usermute.NewCompiledUserMuteList(mutes) +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + if local { + return p.localTimelineGet(ctx, requester, page) } + return p.publicTimelineGet(ctx, requester, page) +} - // Try a few times to select appropriate public - // statuses from the db, paging up or down to - // reattempt if nothing suitable is found. -outer: - for attempts := 1; ; attempts++ { - // Select slightly more than the limit to try to avoid situations where - // we filter out all the entries, and have to make another db call. - // It's cheaper to select more in 1 query than it is to do multiple queries. - statuses, err := p.state.DB.GetPublicTimeline(ctx, maxID, sinceID, minID, limit+5, local) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("db error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } - - count := len(statuses) - if count == 0 { - // Nothing relevant (left) in the db. - return util.EmptyPageableResponse(), nil - } - - // Page up from first status in slice - // (ie., one with the highest ID). - prevMinIDValue = statuses[0].ID - - inner: - for _, s := range statuses { - // Push back the next page down ID to - // this status, regardless of whether - // we end up filtering it out or not. - nextMaxIDValue = s.ID - - timelineable, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s) - if err != nil { - log.Errorf(ctx, "error checking status visibility: %v", err) - continue inner - } +func (p *Processor) publicTimelineGet( + ctx context.Context, + requester *gtsmodel.Account, + page *paging.Page, +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + return p.getStatusTimeline(ctx, + + // Auth acconut, + // can be nil. + requester, + + // No cache. + nil, + + // Current + // page. + page, + + // Public timeline endpoint. + "/api/v1/timelines/public", + + // Set local-only timeline + // page query flag, (this map + // later gets copied before + // any further usage). + localOnlyFalse, + + // Status filter context. + statusfilter.FilterContextPublic, + + // Database load function. + func(pg *paging.Page) (statuses []*gtsmodel.Status, err error) { + return p.state.DB.GetPublicTimeline(ctx, pg) + }, - if !timelineable { - continue inner - } + // Pre-filtering function, + // i.e. filter before caching. + func(s *gtsmodel.Status) bool { - apiStatus, err := p.converter.StatusToAPIStatus(ctx, s, requester, statusfilter.FilterContextPublic, filters, compiledMutes) - if errors.Is(err, statusfilter.ErrHideStatus) { - continue - } + // Check the visibility of passed status to requesting user. + ok, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s) if err != nil { - log.Errorf(ctx, "error converting to api status: %v", err) - continue inner + log.Errorf(ctx, "error filtering status %s: %v", s.URI, err) } + return !ok + }, - // Looks good, add this. - items = append(items, apiStatus) + // Post filtering funtion, + // i.e. filter after caching. + nil, + ) +} - // We called the db with a little - // more than the desired limit. - // - // Ensure we don't return more - // than the caller asked for. - if len(items) == limit { - break outer - } - } - - if len(items) != 0 { - // We've got some items left after - // filtering, happily break + return. - break - } - - if attempts >= maxAttempts { - // We reached our attempts limit. - // Be nice + warn about it. - log.Warn(ctx, "reached max attempts to find items in public timeline") - break - } - - // We filtered out all items before we - // found anything we could return, but - // we still have attempts left to try - // fetching again. Set paging params - // and allow loop to continue. - if minID != "" { - // Paging up. - minID = prevMinIDValue - } else { - // Paging down. - maxID = nextMaxIDValue - } - } +func (p *Processor) localTimelineGet( + ctx context.Context, + requester *gtsmodel.Account, + page *paging.Page, +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + return p.getStatusTimeline(ctx, + + // Auth acconut, + // can be nil. + requester, + + // No cache. + nil, + + // Current + // page. + page, + + // Public timeline endpoint. + "/api/v1/timelines/public", + + // Set local-only timeline + // page query flag, (this map + // later gets copied before + // any further usage). + localOnlyTrue, + + // Status filter context. + statusfilter.FilterContextPublic, + + // Database load function. + func(pg *paging.Page) (statuses []*gtsmodel.Status, err error) { + return p.state.DB.GetLocalTimeline(ctx, pg) + }, + + // Filtering function, + // i.e. filter before caching. + func(s *gtsmodel.Status) bool { - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "/api/v1/timelines/public", - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - ExtraQueryParams: []string{ - "local=" + strconv.FormatBool(local), + // Check the visibility of passed status to requesting user. + ok, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s) + if err != nil { + log.Errorf(ctx, "error filtering status %s: %v", s.URI, err) + } + return !ok }, - }) + + // Post filtering funtion, + // i.e. filter after caching. + nil, + ) } diff --git a/internal/processing/timeline/public_test.go b/internal/processing/timeline/public_test.go index ab8e33429..b5017af71 100644 --- a/internal/processing/timeline/public_test.go +++ b/internal/processing/timeline/public_test.go @@ -25,6 +25,7 @@ import ( apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -46,10 +47,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGet() { resp, errWithCode := suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) @@ -79,10 +81,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGetNotEmpty() { resp, errWithCode := suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) @@ -90,9 +93,9 @@ func (suite *PublicTestSuite) TestPublicTimelineGetNotEmpty() { // some other statuses were filtered out. suite.NoError(errWithCode) suite.Len(resp.Items, 1) - suite.Equal(`<http://localhost:8080/api/v1/timelines/public?limit=1&max_id=01F8MHCP5P2NWYQ416SBA0XSEV&local=false>; rel="next", <http://localhost:8080/api/v1/timelines/public?limit=1&min_id=01HE7XJ1CG84TBKH5V9XKBVGF5&local=false>; rel="prev"`, resp.LinkHeader) - suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&max_id=01F8MHCP5P2NWYQ416SBA0XSEV&local=false`, resp.NextLink) - suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&min_id=01HE7XJ1CG84TBKH5V9XKBVGF5&local=false`, resp.PrevLink) + suite.Equal(`<http://localhost:8080/api/v1/timelines/public?limit=1&local=false&max_id=01F8MHCP5P2NWYQ416SBA0XSEV>; rel="next", <http://localhost:8080/api/v1/timelines/public?limit=1&local=false&min_id=01HE7XJ1CG84TBKH5V9XKBVGF5>; rel="prev"`, resp.LinkHeader) + suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&local=false&max_id=01F8MHCP5P2NWYQ416SBA0XSEV`, resp.NextLink) + suite.Equal(`http://localhost:8080/api/v1/timelines/public?limit=1&local=false&min_id=01HE7XJ1CG84TBKH5V9XKBVGF5`, resp.PrevLink) } // A timeline containing a status hidden due to filtering should return other statuses with no error. @@ -133,10 +136,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGetHideFiltered() { resp, errWithCode := suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) suite.NoError(errWithCode) @@ -149,8 +153,6 @@ func (suite *PublicTestSuite) TestPublicTimelineGetHideFiltered() { if !filteredStatusFound { suite.FailNow("precondition failed: status we would filter isn't present in unfiltered timeline") } - // The public timeline has no prepared status cache and doesn't need to be pruned, - // as in the home timeline version of this test. // Create a filter to hide one status on the timeline. if err := suite.db.PutFilter(ctx, filter); err != nil { @@ -161,10 +163,11 @@ func (suite *PublicTestSuite) TestPublicTimelineGetHideFiltered() { resp, errWithCode = suite.timeline.PublicTimelineGet( ctx, requester, - maxID, - sinceID, - minID, - limit, + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, local, ) diff --git a/internal/processing/timeline/tag.go b/internal/processing/timeline/tag.go index 811d0bb33..685bac376 100644 --- a/internal/processing/timeline/tag.go +++ b/internal/processing/timeline/tag.go @@ -20,18 +20,16 @@ package timeline import ( "context" "errors" - "fmt" + "net/http" apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/db" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" - "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/text" - "github.com/superseriousbusiness/gotosocial/internal/util" ) // TagTimelineGet gets a pageable timeline for the given @@ -40,37 +38,77 @@ import ( // to requestingAcct before returning it. func (p *Processor) TagTimelineGet( ctx context.Context, - requestingAcct *gtsmodel.Account, + requester *gtsmodel.Account, tagName string, maxID string, sinceID string, minID string, limit int, ) (*apimodel.PageableResponse, gtserror.WithCode) { + + // Fetch the requested tag with name. tag, errWithCode := p.getTag(ctx, tagName) if errWithCode != nil { return nil, errWithCode } + // Check for a useable returned tag for endpoint. if tag == nil || !*tag.Useable || !*tag.Listable { + // Obey mastodon API by returning 404 for this. - err := fmt.Errorf("tag was not found, or not useable/listable on this instance") - return nil, gtserror.NewErrorNotFound(err, err.Error()) + const text = "tag was not found, or not useable/listable on this instance" + return nil, gtserror.NewWithCode(http.StatusNotFound, text) } - statuses, err := p.state.DB.GetTagTimeline(ctx, tag.ID, maxID, sinceID, minID, limit) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - err = gtserror.Newf("db error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } + // Fetch status timeline for tag. + return p.getStatusTimeline(ctx, + + // Auth'd + // account. + requester, + + // No + // cache. + nil, - return p.packageTagResponse( - ctx, - requestingAcct, - statuses, - limit, - // Use API URL for tag. + // Current + // page. + &paging.Page{ + Min: paging.EitherMinID(minID, sinceID), + Max: paging.MaxID(maxID), + Limit: limit, + }, + + // Tag timeline name's endpoint. "/api/v1/timelines/tag/"+tagName, + + // No page + // query. + nil, + + // Status filter context. + statusfilter.FilterContextPublic, + + // Database load function. + func(pg *paging.Page) (statuses []*gtsmodel.Status, err error) { + return p.state.DB.GetTagTimeline(ctx, tag.ID, pg) + }, + + // Filtering function, + // i.e. filter before caching. + func(s *gtsmodel.Status) bool { + + // Check the visibility of passed status to requesting user. + ok, err := p.visFilter.StatusPublicTimelineable(ctx, requester, s) + if err != nil { + log.Errorf(ctx, "error filtering status %s: %v", s.URI, err) + } + return !ok + }, + + // Post filtering funtion, + // i.e. filter after caching. + nil, ) } @@ -92,69 +130,3 @@ func (p *Processor) getTag(ctx context.Context, tagName string) (*gtsmodel.Tag, return tag, nil } - -func (p *Processor) packageTagResponse( - ctx context.Context, - requestingAcct *gtsmodel.Account, - statuses []*gtsmodel.Status, - limit int, - requestPath string, -) (*apimodel.PageableResponse, gtserror.WithCode) { - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } - - var ( - items = make([]interface{}, 0, count) - - // Set next + prev values before filtering and API - // converting, so caller can still page properly. - nextMaxIDValue = statuses[count-1].ID - prevMinIDValue = statuses[0].ID - ) - - filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAcct.ID) - if err != nil { - err = gtserror.Newf("couldn't retrieve filters for account %s: %w", requestingAcct.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } - - mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAcct.ID, nil) - if err != nil { - err = gtserror.Newf("couldn't retrieve mutes for account %s: %w", requestingAcct.ID, err) - return nil, gtserror.NewErrorInternalError(err) - } - compiledMutes := usermute.NewCompiledUserMuteList(mutes) - - for _, s := range statuses { - timelineable, err := p.visFilter.StatusTagTimelineable(ctx, requestingAcct, s) - if err != nil { - log.Errorf(ctx, "error checking status visibility: %v", err) - continue - } - - if !timelineable { - continue - } - - apiStatus, err := p.converter.StatusToAPIStatus(ctx, s, requestingAcct, statusfilter.FilterContextPublic, filters, compiledMutes) - if errors.Is(err, statusfilter.ErrHideStatus) { - continue - } - if err != nil { - log.Errorf(ctx, "error converting to api status: %v", err) - continue - } - - items = append(items, apiStatus) - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: requestPath, - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) -} diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go index 5966fe864..54ea2cccd 100644 --- a/internal/processing/timeline/timeline.go +++ b/internal/processing/timeline/timeline.go @@ -18,9 +18,33 @@ package timeline import ( + "context" + "errors" + "net/http" + "net/url" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + timelinepkg "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" + "github.com/superseriousbusiness/gotosocial/internal/db" + statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/paging" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/util/xslices" +) + +var ( + // pre-prepared URL values to be passed in to + // paging response forms. The paging package always + // copies values before any modifications so it's + // safe to only use a single map variable for these. + localOnlyTrue = url.Values{"local": {"true"}} + localOnlyFalse = url.Values{"local": {"false"}} ) type Processor struct { @@ -36,3 +60,114 @@ func New(state *state.State, converter *typeutils.Converter, visFilter *visibili visFilter: visFilter, } } + +func (p *Processor) getStatusTimeline( + ctx context.Context, + requester *gtsmodel.Account, + timeline *timelinepkg.StatusTimeline, + page *paging.Page, + pagePath string, + pageQuery url.Values, + filterCtx statusfilter.FilterContext, + loadPage func(*paging.Page) (statuses []*gtsmodel.Status, err error), + filter func(*gtsmodel.Status) (delete bool), + postFilter func(*gtsmodel.Status) (remove bool), +) ( + *apimodel.PageableResponse, + gtserror.WithCode, +) { + var err error + var filters []*gtsmodel.Filter + var mutes *usermute.CompiledUserMuteList + + if requester != nil { + // Fetch all filters relevant for requesting account. + filters, err = p.state.DB.GetFiltersForAccountID(ctx, + requester.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("error getting account filters: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + // Get a list of all account mutes for requester. + allMutes, err := p.state.DB.GetAccountMutes(ctx, + requester.ID, + nil, // i.e. all + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("error getting account mutes: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + // Compile all account mutes to useable form. + mutes = usermute.NewCompiledUserMuteList(allMutes) + } + + // Ensure we have valid + // input paging cursor. + id.ValidatePage(page) + + // Load status page via timeline cache, also + // getting lo, hi values for next, prev pages. + // + // NOTE: this safely handles the case of a nil + // input timeline, i.e. uncached timeline type. + apiStatuses, lo, hi, err := timeline.Load(ctx, + + // Status page + // to load. + page, + + // Caller provided database + // status page loading function. + loadPage, + + // Status load function for cached timeline entries. + func(ids []string) ([]*gtsmodel.Status, error) { + return p.state.DB.GetStatusesByIDs(ctx, ids) + }, + + // Call provided status + // filtering function. + filter, + + // Frontend API model preparation function. + func(status *gtsmodel.Status) (*apimodel.Status, error) { + + // Check if status needs filtering OUTSIDE of caching stage. + // TODO: this will be moved to separate postFilter hook when + // all filtering has been removed from the type converter. + if postFilter != nil && postFilter(status) { + return nil, nil + } + + // Finally, pass status to get converted to API model. + apiStatus, err := p.converter.StatusToAPIStatus(ctx, + status, + requester, + filterCtx, + filters, + mutes, + ) + if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { + return nil, err + } + return apiStatus, nil + }, + ) + + if err != nil { + err := gtserror.Newf("error loading timeline: %w", err) + return nil, gtserror.WrapWithCode(http.StatusInternalServerError, err) + } + + // Package returned API statuses as pageable response. + return paging.PackageResponse(paging.ResponseParams{ + Items: xslices.ToAny(apiStatuses), + Path: pagePath, + Next: page.Next(lo, hi), + Prev: page.Prev(lo, hi), + Query: pageQuery, + }), nil +} diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index 28a2b37b9..661fea866 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -371,7 +371,7 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -413,7 +413,7 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien } // Interaction counts changed on the source status, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) + p.surface.invalidateStatusFromTimelines(vote.Poll.StatusID) return nil } @@ -565,7 +565,7 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + p.surface.invalidateStatusFromTimelines(fave.StatusID) return nil } @@ -671,7 +671,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } @@ -682,22 +682,20 @@ func (p *clientAPI) CreateBlock(ctx context.Context, cMsg *messages.FromClientAP return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) } - // Remove blockee's statuses from blocker's timeline. - if err := p.state.Timelines.Home.WipeItemsFromAccountID( - ctx, - block.AccountID, - block.TargetAccountID, - ); err != nil { - return gtserror.Newf("error wiping timeline items for block: %w", err) + if block.Account.IsLocal() { + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.AccountID, + block.TargetAccountID, + ) } - // Remove blocker's statuses from blockee's timeline. - if err := p.state.Timelines.Home.WipeItemsFromAccountID( - ctx, - block.TargetAccountID, - block.AccountID, - ); err != nil { - return gtserror.Newf("error wiping timeline items for block: %w", err) + if block.TargetAccount.IsLocal() { + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.TargetAccountID, + block.AccountID, + ) } // TODO: same with notifications? @@ -737,7 +735,7 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA } // Status representation has changed, invalidate from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -858,6 +856,22 @@ func (p *clientAPI) UndoFollow(ctx context.Context, cMsg *messages.FromClientAPI log.Errorf(ctx, "error updating account stats: %v", err) } + if follow.Account.IsLocal() { + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.AccountID, + follow.TargetAccountID, + ) + } + + if follow.TargetAccount.IsLocal() { + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.TargetAccountID, + follow.AccountID, + ) + } + if err := p.federate.UndoFollow(ctx, follow); err != nil { log.Errorf(ctx, "error federating follow undo: %v", err) } @@ -890,7 +904,7 @@ func (p *clientAPI) UndoFave(ctx context.Context, cMsg *messages.FromClientAPI) // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + p.surface.invalidateStatusFromTimelines(statusFave.StatusID) return nil } @@ -910,9 +924,8 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error updating account stats: %v", err) } - if err := p.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil { - log.Errorf(ctx, "error removing timelined status: %v", err) - } + // Delete the boost wrapper status from timelines. + p.surface.deleteStatusFromTimelines(ctx, status.ID) if err := p.federate.UndoAnnounce(ctx, status); err != nil { log.Errorf(ctx, "error federating announce undo: %v", err) @@ -920,7 +933,7 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + p.surface.invalidateStatusFromTimelines(status.BoostOfID) return nil } @@ -983,7 +996,7 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg *messages.FromClientA if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -1026,6 +1039,23 @@ func (p *clientAPI) DeleteAccountOrUser(ctx context.Context, cMsg *messages.From p.state.Workers.Federator.Queue.Delete("Receiving.ID", account.ID) p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) + // Remove any entries authored by account from timelines. + p.surface.removeTimelineEntriesByAccount(account.ID) + + // Remove any of their cached timelines. + p.state.Caches.Timelines.Home.Delete(account.ID) + + // Get the IDs of all the lists owned by the given account ID. + listIDs, err := p.state.DB.GetListIDsByAccountID(ctx, account.ID) + if err != nil { + log.Errorf(ctx, "error getting lists for account %s: %v", account.ID, err) + } + + // Remove list timelines of account. + for _, listID := range listIDs { + p.state.Caches.Timelines.List.Delete(listID) + } + if err := p.federate.DeleteAccount(ctx, cMsg.Target); err != nil { log.Errorf(ctx, "error federating account delete: %v", err) } @@ -1169,7 +1199,7 @@ func (p *clientAPI) AcceptLike(ctx context.Context, cMsg *messages.FromClientAPI // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, req.Like.StatusID) + p.surface.invalidateStatusFromTimelines(req.Like.StatusID) return nil } @@ -1202,7 +1232,7 @@ func (p *clientAPI) AcceptReply(ctx context.Context, cMsg *messages.FromClientAP // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, reply.InReplyToID) + p.surface.invalidateStatusFromTimelines(reply.InReplyToID) return nil } @@ -1240,7 +1270,7 @@ func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClien // Interaction counts changed on the original status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index 2e513449b..3e0f0ba59 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -197,9 +197,22 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg *messages.FromF // UNDO SOMETHING case ap.ActivityUndo: + switch fMsg.APObjectType { + // UNDO FOLLOW + case ap.ActivityFollow: + return p.fediAPI.UndoFollow(ctx, fMsg) + + // UNDO BLOCK + case ap.ActivityBlock: + return p.fediAPI.UndoBlock(ctx, fMsg) + // UNDO ANNOUNCE - if fMsg.APObjectType == ap.ActivityAnnounce { + case ap.ActivityAnnounce: return p.fediAPI.UndoAnnounce(ctx, fMsg) + + // UNDO LIKE + case ap.ActivityLike: + return p.fediAPI.UndoFave(ctx, fMsg) } } @@ -346,7 +359,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // Interaction counts changed on the replied status; uncache the // prepared version from all timelines. The status dereferencer // functions will ensure necessary ancestors exist before this point. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -393,7 +406,7 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI } // Interaction counts changed, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -428,7 +441,7 @@ func (p *fediAPI) UpdatePollVote(ctx context.Context, fMsg *messages.FromFediAPI } // Interaction counts changed, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -573,7 +586,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // Interaction counts changed on the faved status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + p.surface.invalidateStatusFromTimelines(fave.StatusID) return nil } @@ -690,7 +703,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // Interaction counts changed on the original status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } @@ -701,53 +714,32 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) e return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) } - // Remove each account's posts from the other's timelines. - // - // First home timelines. - if err := p.state.Timelines.Home.WipeItemsFromAccountID( - ctx, - block.AccountID, - block.TargetAccountID, - ); err != nil { - log.Errorf(ctx, "error wiping items from block -> target's home timeline: %v", err) - } - - if err := p.state.Timelines.Home.WipeItemsFromAccountID( - ctx, - block.TargetAccountID, - block.AccountID, - ); err != nil { - log.Errorf(ctx, "error wiping items from target -> block's home timeline: %v", err) - } - - // Now list timelines. - if err := p.state.Timelines.List.WipeItemsFromAccountID( - ctx, - block.AccountID, - block.TargetAccountID, - ); err != nil { - log.Errorf(ctx, "error wiping items from block -> target's list timeline(s): %v", err) + if block.Account.IsLocal() { + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.AccountID, + block.TargetAccountID, + ) } - if err := p.state.Timelines.List.WipeItemsFromAccountID( - ctx, - block.TargetAccountID, - block.AccountID, - ); err != nil { - log.Errorf(ctx, "error wiping items from target -> block's list timeline(s): %v", err) + if block.TargetAccount.IsLocal() { + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + block.TargetAccountID, + block.AccountID, + ) } // Remove any follows that existed between blocker + blockee. - if err := p.state.DB.DeleteFollow( - ctx, + // (note this handles removing any necessary list entries). + if err := p.state.DB.DeleteFollow(ctx, block.AccountID, block.TargetAccountID, ); err != nil { log.Errorf(ctx, "error deleting follow from block -> target: %v", err) } - if err := p.state.DB.DeleteFollow( - ctx, + if err := p.state.DB.DeleteFollow(ctx, block.TargetAccountID, block.AccountID, ); err != nil { @@ -755,16 +747,14 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) e } // Remove any follow requests that existed between blocker + blockee. - if err := p.state.DB.DeleteFollowRequest( - ctx, + if err := p.state.DB.DeleteFollowRequest(ctx, block.AccountID, block.TargetAccountID, ); err != nil { log.Errorf(ctx, "error deleting follow request from block -> target: %v", err) } - if err := p.state.DB.DeleteFollowRequest( - ctx, + if err := p.state.DB.DeleteFollowRequest(ctx, block.TargetAccountID, block.AccountID, ); err != nil { @@ -871,7 +861,7 @@ func (p *fediAPI) AcceptReply(ctx context.Context, fMsg *messages.FromFediAPI) e // Interaction counts changed on the replied-to status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) return nil } @@ -920,11 +910,11 @@ func (p *fediAPI) AcceptRemoteStatus(ctx context.Context, fMsg *messages.FromFed // Interaction counts changed on the interacted status; // uncache the prepared version from all timelines. if status.InReplyToID != "" { - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } if status.BoostOfID != "" { - p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + p.surface.invalidateStatusFromTimelines(status.BoostOfID) } return nil @@ -953,7 +943,7 @@ func (p *fediAPI) AcceptAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) return nil } @@ -1004,7 +994,7 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI) } // Status representation was refetched, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) + p.surface.invalidateStatusFromTimelines(status.ID) return nil } @@ -1063,7 +1053,7 @@ func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg *messages.FromFediAPI) if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + p.surface.invalidateStatusFromTimelines(status.InReplyToID) } return nil @@ -1090,6 +1080,9 @@ func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg *messages.FromFediAPI) p.state.Workers.Federator.Queue.Delete("Requesting.ID", account.ID) p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) + // Remove any entries authored by account from timelines. + p.surface.removeTimelineEntriesByAccount(account.ID) + // First perform the actual account deletion. if err := p.account.Delete(ctx, account, account.ID); err != nil { log.Errorf(ctx, "error deleting account: %v", err) @@ -1208,6 +1201,42 @@ func (p *fediAPI) RejectAnnounce(ctx context.Context, fMsg *messages.FromFediAPI return nil } +func (p *fediAPI) UndoFollow(ctx context.Context, fMsg *messages.FromFediAPI) error { + follow, ok := fMsg.GTSModel.(*gtsmodel.Follow) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Follow", fMsg.GTSModel) + } + + if follow.Account.IsLocal() { + // Remove posts by target from origin's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.AccountID, + follow.TargetAccountID, + ) + } + + if follow.TargetAccount.IsLocal() { + // Remove posts by origin from target's timelines. + p.surface.removeRelationshipFromTimelines(ctx, + follow.TargetAccountID, + follow.AccountID, + ) + } + + return nil +} + +func (p *fediAPI) UndoBlock(ctx context.Context, fMsg *messages.FromFediAPI) error { + _, ok := fMsg.GTSModel.(*gtsmodel.Block) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) + } + + // TODO: any required changes + + return nil +} + func (p *fediAPI) UndoAnnounce( ctx context.Context, fMsg *messages.FromFediAPI, @@ -1228,13 +1257,24 @@ func (p *fediAPI) UndoAnnounce( } // Remove the boost wrapper from all timelines. - if err := p.surface.deleteStatusFromTimelines(ctx, boost.ID); err != nil { - log.Errorf(ctx, "error removing timelined boost: %v", err) - } + p.surface.deleteStatusFromTimelines(ctx, boost.ID) // Interaction counts changed on the boosted status; // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + p.surface.invalidateStatusFromTimelines(boost.BoostOfID) + + return nil +} + +func (p *fediAPI) UndoFave(ctx context.Context, fMsg *messages.FromFediAPI) error { + statusFave, ok := fMsg.GTSModel.(*gtsmodel.StatusFave) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", fMsg.GTSModel) + } + + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(statusFave.StatusID) return nil } diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index b071bd72e..0f2e80d0f 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -21,6 +21,7 @@ import ( "context" "errors" + "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" @@ -28,7 +29,6 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/stream" - "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -161,21 +161,16 @@ func (s *Surface) timelineAndNotifyStatusForFollowers( // Add status to home timeline for owner of // this follow (origin account), if applicable. - homeTimelined, err = s.timelineStatus(ctx, - s.State.Timelines.Home.IngestOne, - follow.AccountID, // home timelines are keyed by account ID + if homeTimelined := s.timelineStatus(ctx, + s.State.Caches.Timelines.Home.MustGet(follow.AccountID), follow.Account, status, stream.TimelineHome, + statusfilter.FilterContextHome, filters, mutes, - ) - if err != nil { - log.Errorf(ctx, "error home timelining status: %v", err) - continue - } + ); homeTimelined { - if homeTimelined { // If hometimelined, add to list of returned account IDs. homeTimelinedAccountIDs = append(homeTimelinedAccountIDs, follow.AccountID) } @@ -261,22 +256,16 @@ func (s *Surface) listTimelineStatusForFollow( exclusive = exclusive || *list.Exclusive // At this point we are certain this status - // should be included in the timeline of the - // list that this list entry belongs to. - listTimelined, err := s.timelineStatus( - ctx, - s.State.Timelines.List.IngestOne, - list.ID, // list timelines are keyed by list ID + // should be included in timeline of this list. + listTimelined := s.timelineStatus(ctx, + s.State.Caches.Timelines.List.MustGet(list.ID), follow.Account, status, stream.TimelineList+":"+list.ID, // key streamType to this specific list + statusfilter.FilterContextHome, filters, mutes, ) - if err != nil { - log.Errorf(ctx, "error adding status to list timeline: %v", err) - continue - } // Update flag based on if timelined. timelined = timelined || listTimelined @@ -367,53 +356,48 @@ func (s *Surface) listEligible( } } -// timelineStatus uses the provided ingest function to put the given -// status in a timeline with the given ID, if it's timelineable. -// -// If the status was inserted into the timeline, true will be returned -// + it will also be streamed to the user using the given streamType. +// timelineStatus will insert the given status into the given timeline, if it's +// timelineable. if the status was inserted into the timeline, true will be returned. func (s *Surface) timelineStatus( ctx context.Context, - ingest func(context.Context, string, timeline.Timelineable) (bool, error), - timelineID string, + timeline *timeline.StatusTimeline, account *gtsmodel.Account, status *gtsmodel.Status, streamType string, + filterCtx statusfilter.FilterContext, filters []*gtsmodel.Filter, mutes *usermute.CompiledUserMuteList, -) (bool, error) { - - // Ingest status into given timeline using provided function. - if inserted, err := ingest(ctx, timelineID, status); err != nil && - !errors.Is(err, statusfilter.ErrHideStatus) { - err := gtserror.Newf("error ingesting status %s: %w", status.ID, err) - return false, err - } else if !inserted { - // Nothing more to do. - return false, nil - } +) bool { - // Convert updated database model to frontend model. - apiStatus, err := s.Converter.StatusToAPIStatus(ctx, + // Attempt to convert status to frontend API representation, + // this will check whether status is filtered / muted. + apiModel, err := s.Converter.StatusToAPIStatus(ctx, status, account, - statusfilter.FilterContextHome, + filterCtx, filters, mutes, ) if err != nil && !errors.Is(err, statusfilter.ErrHideStatus) { - err := gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err) - return true, err + log.Error(ctx, "error converting status %s to frontend: %v", status.URI, err) } - if apiStatus != nil { - // The status was inserted so stream it to the user. - s.Stream.Update(ctx, account, apiStatus, streamType) - return true, nil + // Insert status to timeline cache regardless of + // if API model was succesfully prepared or not. + repeatBoost := timeline.InsertOne(status, apiModel) + + if apiModel == nil { + // Status was + // filtered / muted. + return false + } + + if !repeatBoost { + // Only stream if not repeated boost of recent status. + s.Stream.Update(ctx, account, apiModel, streamType) } - // Status was hidden. - return false, nil + return true } // timelineAndNotifyStatusForTagFollowers inserts the status into the @@ -444,23 +428,15 @@ func (s *Surface) timelineAndNotifyStatusForTagFollowers( continue } - if _, err := s.timelineStatus( - ctx, - s.State.Timelines.Home.IngestOne, - tagFollowerAccount.ID, // home timelines are keyed by account ID + _ = s.timelineStatus(ctx, + s.State.Caches.Timelines.Home.MustGet(tagFollowerAccount.ID), tagFollowerAccount, status, stream.TimelineHome, + statusfilter.FilterContextHome, filters, mutes, - ); err != nil { - errs.Appendf( - "error inserting status %s into home timeline for account %s: %w", - status.ID, - tagFollowerAccount.ID, - err, - ) - } + ) } return errs.Combine() @@ -550,39 +526,6 @@ func (s *Surface) tagFollowersForStatus( return visibleTagFollowerAccounts, errs.Combine() } -// deleteStatusFromTimelines completely removes the given status from all timelines. -// It will also stream deletion of the status to all open streams. -func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) error { - if err := s.State.Timelines.Home.WipeItemFromAllTimelines(ctx, statusID); err != nil { - return err - } - if err := s.State.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil { - return err - } - s.Stream.Delete(ctx, statusID) - return nil -} - -// invalidateStatusFromTimelines does cache invalidation on the given status by -// unpreparing it from all timelines, forcing it to be prepared again (with updated -// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes -// both for the status itself, and for any boosts of the status. -func (s *Surface) invalidateStatusFromTimelines(ctx context.Context, statusID string) { - if err := s.State.Timelines.Home.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { - log. - WithContext(ctx). - WithField("statusID", statusID). - Errorf("error unpreparing status from home timelines: %v", err) - } - - if err := s.State.Timelines.List.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { - log. - WithContext(ctx). - WithField("statusID", statusID). - Errorf("error unpreparing status from list timelines: %v", err) - } -} - // timelineStatusUpdate looks up HOME and LIST timelines of accounts // that follow the the status author or tags and pushes edit messages into any // active streams. @@ -859,3 +802,47 @@ func (s *Surface) timelineStatusUpdateForTagFollowers( } return errs.Combine() } + +// deleteStatusFromTimelines completely removes the given status from all timelines. +// It will also stream deletion of the status to all open streams. +func (s *Surface) deleteStatusFromTimelines(ctx context.Context, statusID string) { + s.State.Caches.Timelines.Home.RemoveByStatusIDs(statusID) + s.State.Caches.Timelines.List.RemoveByStatusIDs(statusID) + s.Stream.Delete(ctx, statusID) +} + +// invalidateStatusFromTimelines does cache invalidation on the given status by +// unpreparing it from all timelines, forcing it to be prepared again (with updated +// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes +// both for the status itself, and for any boosts of the status. +func (s *Surface) invalidateStatusFromTimelines(statusID string) { + s.State.Caches.Timelines.Home.UnprepareByStatusIDs(statusID) + s.State.Caches.Timelines.List.UnprepareByStatusIDs(statusID) +} + +// removeTimelineEntriesByAccount removes all cached timeline entries authored by account ID. +func (s *Surface) removeTimelineEntriesByAccount(accountID string) { + s.State.Caches.Timelines.Home.RemoveByAccountIDs(accountID) + s.State.Caches.Timelines.List.RemoveByAccountIDs(accountID) +} + +func (s *Surface) removeRelationshipFromTimelines(ctx context.Context, timelineAccountID string, targetAccountID string) { + // Remove all statuses by target account + // from given account's home timeline. + s.State.Caches.Timelines.Home. + MustGet(timelineAccountID). + RemoveByAccountIDs(targetAccountID) + + // Get the IDs of all the lists owned by the given account ID. + listIDs, err := s.State.DB.GetListIDsByAccountID(ctx, timelineAccountID) + if err != nil { + log.Errorf(ctx, "error getting lists for account %s: %v", timelineAccountID, err) + } + + for _, listID := range listIDs { + // Remove all statuses by target account + // from given account's list timelines. + s.State.Caches.Timelines.List.MustGet(listID). + RemoveByAccountIDs(targetAccountID) + } +} diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go index b358dc951..d844ab762 100644 --- a/internal/processing/workers/util.go +++ b/internal/processing/workers/util.go @@ -172,15 +172,11 @@ func (u *utils) wipeStatus( } // Remove the boost from any and all timelines. - if err := u.surface.deleteStatusFromTimelines(ctx, boost.ID); err != nil { - errs.Appendf("error deleting boost from timelines: %w", err) - } + u.surface.deleteStatusFromTimelines(ctx, boost.ID) } // Delete the status itself from any and all timelines. - if err := u.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil { - errs.Appendf("error deleting status from timelines: %w", err) - } + u.surface.deleteStatusFromTimelines(ctx, status.ID) // Delete this status from any conversations it's part of. if err := u.state.DB.DeleteStatusFromConversations(ctx, status.ID); err != nil { |
