diff options
| author | 2025-04-26 09:56:15 +0000 | |
|---|---|---|
| committer | 2025-04-26 09:56:15 +0000 | |
| commit | 6a6a4993338262f87df34c9be051bfaac75c1829 (patch) | |
| tree | bfbda090dc4b25efdd34145c016d7cc7b9c14d6e /internal/cache | |
| parent | [chore] Move deps to code.superseriousbusiness.org (#4054) (diff) | |
| download | gotosocial-6a6a4993338262f87df34c9be051bfaac75c1829.tar.xz | |
[performance] rewrite timelines to rely on new timeline cache type (#3941)
* start work rewriting timeline cache type
* further work rewriting timeline caching
* more work integration new timeline code
* remove old code
* add local timeline, fix up merge conflicts
* remove old use of go-bytes
* implement new timeline code into more areas of codebase, pull in latest go-mangler, go-mutexes, go-structr
* remove old timeline package, add local timeline cache
* remove references to old timeline types that needed starting up in tests
* start adding page validation
* fix test-identified timeline cache package issues
* fix up more tests, fix missing required changes, etc
* add exclusion for test.out in gitignore
* clarify some things better in code comments
* tweak cache size limits
* fix list timeline cache fetching
* further list timeline fixes
* linter, ssssssssshhhhhhhhhhhh please
* fix linter hints
* reslice the output if it's beyond length of 'lim'
* remove old timeline initialization code, bump go-structr to v0.9.4
* continued from previous commit
* improved code comments
* don't allow multiple entries for BoostOfID values to prevent repeated boosts of same boosts
* finish writing more code comments
* some variable renaming, for ease of following
* change the way we update lo,hi paging values during timeline load
* improved code comments for updated / returned lo , hi paging values
* finish writing code comments for the StatusTimeline{} type itself
* fill in more code comments
* update go-structr version to latest with changed timeline unique indexing logic
* have a local and public timeline *per user*
* rewrite calls to public / local timeline calls
* remove the zero length check, as lo, hi values might still be set
* simplify timeline cache loading, fix lo/hi returns, fix timeline invalidation side-effects missing for some federated actions
* swap the lo, hi values :facepalm:
* add (now) missing slice reverse of tag timeline statuses when paging ASC
* remove local / public caches (is out of scope for this work), share more timeline code
* remove unnecessary change
* again, remove more unused code
* remove unused function to appease the linter
* move boost checking to prepare function
* fix use of timeline.lastOrder, fix incorrect range functions used
* remove comments for repeat code
* remove the boost logic from prepare function
* do a maximum of 5 loads, not 10
* add repeat boost filtering logic, update go-structr, general improvements
* more code comments
* add important note
* fix timeline tests now that timelines are returned in page order
* remove unused field
* add StatusTimeline{} tests
* add more status timeline tests
* start adding preloading support
* ensure repeat boosts are marked in preloaded entries
* share a bunch of the database load code in timeline cache, don't clear timelines on relationship change
* add logic to allow dynamic clear / preloading of timelines
* comment-out unused functions, but leave in place as we might end-up using them
* fix timeline preload state check
* much improved status timeline code comments
* more code comments, don't bother inserting statuses if timeline not preloaded
* shift around some logic to make sure things aren't accidentally left set
* finish writing code comments
* remove trim-after-insert behaviour
* fix-up some comments referring to old logic
* remove unsetting of lo, hi
* fix preload repeatBoost checking logic
* don't return on status filter errors, these are usually transient
* better concurrency safety in Clear() and Done()
* fix test broken due to addition of preloader
* fix repeatBoost logic that doesn't account for already-hidden repeatBoosts
* ensure edit submodels are dropped on cache insertion
* update code-comment to expand CAS accronym
* use a plus1hULID() instead of 24h
* remove unused functions
* add note that public / local timeline requester can be nil
* fix incorrect visibility filtering of tag timeline statuses
* ensure we filter home timeline statuses on local only
* some small re-orderings to confirm query params in correct places
* fix the local only home timeline filter func
Diffstat (limited to 'internal/cache')
| -rw-r--r-- | internal/cache/cache.go | 9 | ||||
| -rw-r--r-- | internal/cache/timeline.go | 51 | ||||
| -rw-r--r-- | internal/cache/timeline/preload.go | 152 | ||||
| -rw-r--r-- | internal/cache/timeline/status.go | 842 | ||||
| -rw-r--r-- | internal/cache/timeline/status_map.go | 198 | ||||
| -rw-r--r-- | internal/cache/timeline/status_test.go | 361 | ||||
| -rw-r--r-- | internal/cache/timeline/timeline.go | 59 | ||||
| -rw-r--r-- | internal/cache/wrappers.go | 84 |
8 files changed, 1688 insertions, 68 deletions
diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 7844c03f8..e3fd0d1fe 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -46,6 +46,9 @@ type Caches struct { // `[status.ID][status.UpdatedAt.Unix()]` StatusesFilterableFields *ttl.Cache[string, []string] + // Timelines ... + Timelines TimelineCaches + // Visibility provides access to the item visibility // cache. (used by the visibility filter). Visibility VisibilityCache @@ -87,12 +90,14 @@ func (c *Caches) Init() { c.initFollowRequest() c.initFollowRequestIDs() c.initFollowingTagIDs() + c.initHomeTimelines() c.initInReplyToIDs() c.initInstance() c.initInteractionRequest() c.initList() c.initListIDs() c.initListedIDs() + c.initListTimelines() c.initMarker() c.initMedia() c.initMention() @@ -109,6 +114,7 @@ func (c *Caches) Init() { c.initStatusEdit() c.initStatusFave() c.initStatusFaveIDs() + c.initStatusesFilterableFields() c.initTag() c.initThreadMute() c.initToken() @@ -120,7 +126,6 @@ func (c *Caches) Init() { c.initWebPushSubscription() c.initWebPushSubscriptionIDs() c.initVisibility() - c.initStatusesFilterableFields() } // Start will start any caches that require a background @@ -207,6 +212,8 @@ func (c *Caches) Sweep(threshold float64) { c.DB.User.Trim(threshold) c.DB.UserMute.Trim(threshold) c.DB.UserMuteIDs.Trim(threshold) + c.Timelines.Home.Trim() + c.Timelines.List.Trim() c.Visibility.Trim(threshold) } diff --git a/internal/cache/timeline.go b/internal/cache/timeline.go new file mode 100644 index 000000000..3e6a68558 --- /dev/null +++ b/internal/cache/timeline.go @@ -0,0 +1,51 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package cache + +import ( + "github.com/superseriousbusiness/gotosocial/internal/cache/timeline" + "github.com/superseriousbusiness/gotosocial/internal/log" +) + +type TimelineCaches struct { + // Home provides a concurrency-safe map of status timeline + // caches for home timelines, keyed by home's account ID. + Home timeline.StatusTimelines + + // List provides a concurrency-safe map of status + // timeline caches for lists, keyed by list ID. + List timeline.StatusTimelines +} + +func (c *Caches) initHomeTimelines() { + // TODO: configurable + cap := 800 + + log.Infof(nil, "cache size = %d", cap) + + c.Timelines.Home.Init(cap) +} + +func (c *Caches) initListTimelines() { + // TODO: configurable + cap := 800 + + log.Infof(nil, "cache size = %d", cap) + + c.Timelines.List.Init(cap) +} diff --git a/internal/cache/timeline/preload.go b/internal/cache/timeline/preload.go new file mode 100644 index 000000000..b941a8b0c --- /dev/null +++ b/internal/cache/timeline/preload.go @@ -0,0 +1,152 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "sync" + "sync/atomic" + + "github.com/superseriousbusiness/gotosocial/internal/log" +) + +// preloader provides a means of synchronising the +// initial fill, or "preload", of a timeline cache. +// it has 4 possible states in the atomic pointer: +// - preloading = &(interface{}(*sync.WaitGroup)) +// - preloaded = &(interface{}(nil)) +// - needs preload = &(interface{}(false)) +// - brand-new = nil (functionally same as 'needs preload') +type preloader struct{ p atomic.Pointer[any] } + +// Check will return the current preload state, +// waiting if a preload is currently in progress. +func (p *preloader) Check() bool { + for { + // Get state ptr. + ptr := p.p.Load() + + // Check if requires preloading. + if ptr == nil || *ptr == false { + return false + } + + // Check for a preload currently in progress. + if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil { + wg.Wait() + continue + } + + // Anything else + // means success. + return true + } +} + +// CheckPreload will safely check the preload state, +// and if needed call the provided function. if a +// preload is in progress, it will wait until complete. +func (p *preloader) CheckPreload(preload func(*any)) { + for { + // Get state ptr. + ptr := p.p.Load() + + if ptr == nil || *ptr == false { + // Needs preloading, start it. + ok := p.start(ptr, preload) + + if !ok { + // Failed to acquire start, + // other thread beat us to it. + continue + } + + // Success! + return + } + + // Check for a preload currently in progress. + if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil { + wg.Wait() + continue + } + + // Anything else + // means success. + return + } +} + +// start attempts to start the given preload function, by performing +// a compare and swap operation with 'old'. return is success. +func (p *preloader) start(old *any, preload func(*any)) bool { + + // Optimistically setup a + // new waitgroup to set as + // the preload waiter. + var wg sync.WaitGroup + wg.Add(1) + defer wg.Done() + + // Wrap waitgroup in + // 'any' for pointer. + new := any(&wg) + ptr := &new + + // Attempt CAS operation to claim start. + started := p.p.CompareAndSwap(old, ptr) + if !started { + return false + } + + // Start. + preload(ptr) + return true +} + +// done marks state as preloaded, +// i.e. no more preload required. +func (p *preloader) Done(ptr *any) { + if !p.p.CompareAndSwap(ptr, new(any)) { + log.Errorf(nil, "BUG: invalid preloader state: %#v", (*p.p.Load())) + } +} + +// clear will clear the state, marking a "preload" as required. +// i.e. next call to Check() will call provided preload func. +func (p *preloader) Clear() { + b := false + a := any(b) + for { + // Load current ptr. + ptr := p.p.Load() + if ptr == nil { + return // was brand-new + } + + // Check for a preload currently in progress. + if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil { + wg.Wait() + continue + } + + // Try mark as needing preload. + if p.p.CompareAndSwap(ptr, &a) { + return + } + } +} diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go new file mode 100644 index 000000000..071fc5a36 --- /dev/null +++ b/internal/cache/timeline/status.go @@ -0,0 +1,842 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "context" + "slices" + + "codeberg.org/gruf/go-structr" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" + "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/util/xslices" +) + +// repeatBoostDepth determines the minimum count +// of statuses after which repeat boosts, or boosts +// of the original, may appear. This is may not end +// up *exact*, as small races between insert and the +// repeatBoost calculation may allow 1 or so extra +// to sneak in ahead of time. but it mostly works! +const repeatBoostDepth = 40 + +// StatusMeta contains minimum viable metadata +// about a Status in order to cache a timeline. +type StatusMeta struct { + ID string + AccountID string + BoostOfID string + BoostOfAccountID string + + // is an internal flag that may be set on + // a StatusMeta object that will prevent + // preparation of its apimodel.Status, due + // to it being a recently repeated boost. + repeatBoost bool + + // prepared contains prepared frontend API + // model for the referenced status. This may + // or may-not be nil depending on whether the + // status has been "unprepared" since the last + // call to "prepare" the frontend model. + prepared *apimodel.Status + + // loaded is a temporary field that may be + // set for a newly loaded timeline status + // so that statuses don't need to be loaded + // from the database twice in succession. + // + // i.e. this will only be set if the status + // was newly inserted into the timeline cache. + // for existing cache items this will be nil. + loaded *gtsmodel.Status +} + +// StatusTimeline provides a concurrency-safe sliding-window +// cache of the freshest statuses in a timeline. Internally, +// only StatusMeta{} objects themselves are stored, loading +// the actual statuses when necessary, but caching prepared +// frontend API models where possible. +// +// Notes on design: +// +// Previously, and initially when designing this newer type, +// we had status timeline caches that would dynamically fill +// themselves with statuses on call to Load() with statuses +// at *any* location in the timeline, while simultaneously +// accepting new input of statuses from the background workers. +// This unfortunately can lead to situations where posts need +// to be fetched from the database, but the cache isn't aware +// they exist and instead returns an incomplete selection. +// This problem is best outlined by the follow simple example: +// +// "what if my timeline cache contains posts 0-to-6 and 8-to-12, +// and i make a request for posts between 4-and-10 with no limit, +// how is it to know that it's missing post 7?" +// +// The solution is to unfortunately remove a lot of the caching +// of "older areas" of the timeline, and instead just have it +// be a sliding window of the freshest posts of that timeline. +// It gets preloaded initially on start / first-call, and kept +// up-to-date with new posts by streamed inserts from background +// workers. Any requests for posts outside this we know therefore +// must hit the database, (which we then *don't* cache). +type StatusTimeline struct { + + // underlying timeline cache of *StatusMeta{}, + // primary-keyed by ID, with extra indices below. + cache structr.Timeline[*StatusMeta, string] + + // preloader synchronizes preload + // state of the timeline cache. + preloader preloader + + // fast-access cache indices. + idx_ID *structr.Index //nolint:revive + idx_AccountID *structr.Index //nolint:revive + idx_BoostOfID *structr.Index //nolint:revive + idx_BoostOfAccountID *structr.Index //nolint:revive + + // cutoff and maximum item lengths. + // the timeline is trimmed back to + // cutoff on each call to Trim(), + // and maximum len triggers a Trim(). + // + // the timeline itself does not + // limit items due to complexities + // it would introduce, so we apply + // a 'cut-off' at regular intervals. + cut, max int +} + +// Init will initialize the timeline for usage, +// by preparing internal indices etc. This also +// sets the given max capacity for Trim() operations. +func (t *StatusTimeline) Init(cap int) { + t.cache.Init(structr.TimelineConfig[*StatusMeta, string]{ + + // Timeline item primary key field. + PKey: structr.IndexConfig{Fields: "ID"}, + + // Additional indexed fields. + Indices: []structr.IndexConfig{ + {Fields: "AccountID", Multiple: true}, + {Fields: "BoostOfAccountID", Multiple: true}, + {Fields: "BoostOfID", Multiple: true}, + }, + + // Timeline item copy function. + Copy: func(s *StatusMeta) *StatusMeta { + var prepared *apimodel.Status + if s.prepared != nil { + prepared = new(apimodel.Status) + *prepared = *s.prepared + } + return &StatusMeta{ + ID: s.ID, + AccountID: s.AccountID, + BoostOfID: s.BoostOfID, + BoostOfAccountID: s.BoostOfAccountID, + repeatBoost: s.repeatBoost, + loaded: nil, // NEVER stored + prepared: prepared, + } + }, + }) + + // Get fast index lookup ptrs. + t.idx_ID = t.cache.Index("ID") + t.idx_AccountID = t.cache.Index("AccountID") + t.idx_BoostOfID = t.cache.Index("BoostOfID") + t.idx_BoostOfAccountID = t.cache.Index("BoostOfAccountID") + + // Set maximum capacity and + // cutoff threshold we trim to. + t.cut = int(0.60 * float64(cap)) + t.max = cap +} + +// Preload will fill with StatusTimeline{} cache with +// the latest sliding window of status metadata for the +// timeline type returned by database 'loadPage' function. +// +// This function is concurrency-safe and repeated calls to +// it when already preloaded will be no-ops. To trigger a +// preload as being required, call .Clear(). +func (t *StatusTimeline) Preload( + + // loadPage should load the timeline of given page for cache hydration. + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + + // filter can be used to perform filtering of returned + // statuses BEFORE insert into cache. i.e. this will effect + // what actually gets stored in the timeline cache. + filter func(each *gtsmodel.Status) (delete bool), +) ( + n int, + err error, +) { + t.preloader.CheckPreload(func(ptr *any) { + n, err = t.preload(loadPage, filter) + if err != nil { + return + } + + // Mark as preloaded. + t.preloader.Done(ptr) + }) + return +} + +// preload contains the core logic of +// Preload(), without t.preloader checks. +func (t *StatusTimeline) preload( + + // loadPage should load the timeline of given page for cache hydration. + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + + // filter can be used to perform filtering of returned + // statuses BEFORE insert into cache. i.e. this will effect + // what actually gets stored in the timeline cache. + filter func(each *gtsmodel.Status) (delete bool), +) (int, error) { + if loadPage == nil { + panic("nil load page func") + } + + // Clear timeline + // before preload. + t.cache.Clear() + + // Our starting, page at the top + // of the possible timeline. + page := new(paging.Page) + order := paging.OrderDescending + page.Max.Order = order + page.Max.Value = plus1hULID() + page.Min.Order = order + page.Min.Value = "" + page.Limit = 100 + + // Prepare a slice for gathering status meta. + metas := make([]*StatusMeta, 0, page.Limit) + + var n int + for n < t.cut { + // Load page of timeline statuses. + statuses, err := loadPage(page) + if err != nil { + return n, gtserror.Newf("error loading statuses: %w", err) + } + + // No more statuses from + // load function = at end. + if len(statuses) == 0 { + break + } + + // Update our next page cursor from statuses. + page.Max.Value = statuses[len(statuses)-1].ID + + // Perform any filtering on newly loaded statuses. + statuses = doStatusFilter(statuses, filter) + + // After filtering no more + // statuses remain, retry. + if len(statuses) == 0 { + continue + } + + // Convert statuses to meta and insert. + metas = toStatusMeta(metas[:0], statuses) + n = t.cache.Insert(metas...) + } + + // This is a potentially 100-1000s size map, + // but still easily manageable memory-wise. + recentBoosts := make(map[string]int, t.cut) + + // Iterate timeline ascending (i.e. oldest -> newest), marking + // entry IDs and marking down if boosts have been seen recently. + for idx, value := range t.cache.RangeUnsafe(structr.Asc) { + + // Store current ID in map. + recentBoosts[value.ID] = idx + + // If it's a boost, check if the original, + // or a boost of it has been seen recently. + if id := value.BoostOfID; id != "" { + + // Check if seen recently. + last, ok := recentBoosts[id] + repeat := ok && (idx-last) < 40 + value.repeatBoost = repeat + + // Update last-seen idx. + recentBoosts[id] = idx + } + } + + return n, nil +} + +// Load will load given page of timeline statuses. First it +// will prioritize fetching statuses from the sliding window +// that is the timeline cache of latest statuses, else it will +// fall back to loading from the database using callback funcs. +// The returned string values are the low / high status ID +// paging values, used in calculating next / prev page links. +func (t *StatusTimeline) Load( + ctx context.Context, + page *paging.Page, + + // loadPage should load the timeline of given page for cache hydration. + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + + // loadIDs should load status models with given IDs, this is used + // to load status models of already cached entries in the timeline. + loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error), + + // filter performs filtering of returned statuses. + filter func(each *gtsmodel.Status) (delete bool), + + // prepareAPI should prepare internal status model to frontend API model. + prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), +) ( + []*apimodel.Status, + string, // lo + string, // hi + error, +) { + var err error + + // Get paging details. + lo := page.Min.Value + hi := page.Max.Value + limit := page.Limit + order := page.Order() + dir := toDirection(order) + + // Use a copy of current page so + // we can repeatedly update it. + nextPg := new(paging.Page) + *nextPg = *page + nextPg.Min.Value = lo + nextPg.Max.Value = hi + + // Interstitial meta objects. + var metas []*StatusMeta + + // Returned frontend API statuses. + var apiStatuses []*apimodel.Status + + // TODO: we can remove this nil + // check when we've updated all + // our timeline endpoints to have + // streamed timeline caches. + if t != nil { + + // Ensure timeline has been preloaded. + _, err = t.Preload(loadPage, filter) + if err != nil { + return nil, "", "", err + } + + // First we attempt to load status + // metadata entries from the timeline + // cache, up to given limit. + metas = t.cache.Select( + util.PtrIf(lo), + util.PtrIf(hi), + util.PtrIf(limit), + dir, + ) + + if len(metas) > 0 { + // Before we can do any filtering, we need + // to load status models for cached entries. + err = loadStatuses(metas, loadIDs) + if err != nil { + return nil, "", "", gtserror.Newf("error loading statuses: %w", err) + } + + // Set returned lo, hi values. + lo = metas[len(metas)-1].ID + hi = metas[0].ID + + // Allocate slice of expected required API models. + apiStatuses = make([]*apimodel.Status, 0, len(metas)) + + // Prepare frontend API models for + // the cached statuses. For now this + // also does its own extra filtering. + apiStatuses = prepareStatuses(ctx, + metas, + prepareAPI, + apiStatuses, + limit, + ) + } + } + + // If no cached timeline statuses + // were found for page, we need to + // call through to the database. + if len(apiStatuses) == 0 { + + // Pass through to main timeline db load function. + apiStatuses, lo, hi, err = loadStatusTimeline(ctx, + nextPg, + metas, + apiStatuses, + loadPage, + filter, + prepareAPI, + ) + if err != nil { + return nil, "", "", err + } + } + + if order.Ascending() { + // The caller always expects the statuses + // to be returned in DESC order, but we + // build the status slice in paging order. + // If paging ASC, we need to reverse the + // returned statuses and paging values. + slices.Reverse(apiStatuses) + lo, hi = hi, lo + } + + return apiStatuses, lo, hi, nil +} + +// loadStatusTimeline encapsulates the logic of iteratively +// attempting to load a status timeline page from the database, +// that is in the form of given callback functions. these will +// then be prepared to frontend API models for return. +// +// in time it may make sense to move this logic +// into the StatusTimeline{}.Load() function. +func loadStatusTimeline( + ctx context.Context, + nextPg *paging.Page, + metas []*StatusMeta, + apiStatuses []*apimodel.Status, + loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error), + filter func(each *gtsmodel.Status) (delete bool), + prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error), +) ( + []*apimodel.Status, + string, // lo + string, // hi + error, +) { + if loadPage == nil { + panic("nil load page func") + } + + // Lowest and highest ID + // vals of loaded statuses. + var lo, hi string + + // Extract paging params. + order := nextPg.Order() + limit := nextPg.Limit + + // Load a little more than + // limit to reduce db calls. + nextPg.Limit += 10 + + // Ensure we have a slice of meta objects to + // use in later preparation of the API models. + metas = xslices.GrowJust(metas[:0], nextPg.Limit) + + // Ensure we have a slice of required frontend API models. + apiStatuses = xslices.GrowJust(apiStatuses[:0], nextPg.Limit) + + // Perform maximum of 5 load + // attempts fetching statuses. + for i := 0; i < 5; i++ { + + // Load next timeline statuses. + statuses, err := loadPage(nextPg) + if err != nil { + return nil, "", "", gtserror.Newf("error loading timeline: %w", err) + } + + // No more statuses from + // load function = at end. + if len(statuses) == 0 { + break + } + + if hi == "" { + // Set hi returned paging + // value if not already set. + hi = statuses[0].ID + } + + // Update nextPg cursor parameter for next database query. + nextPageParams(nextPg, statuses[len(statuses)-1].ID, order) + + // Perform any filtering on newly loaded statuses. + statuses = doStatusFilter(statuses, filter) + + // After filtering no more + // statuses remain, retry. + if len(statuses) == 0 { + continue + } + + // Convert to our interstitial meta type. + metas = toStatusMeta(metas[:0], statuses) + + // Prepare frontend API models for + // the loaded statuses. For now this + // also does its own extra filtering. + apiStatuses = prepareStatuses(ctx, + metas, + prepareAPI, + apiStatuses, + limit, + ) + + // If we have anything, return + // here. Even if below limit. + if len(apiStatuses) > 0 { + + // Set returned lo status paging value. + lo = apiStatuses[len(apiStatuses)-1].ID + break + } + } + + return apiStatuses, lo, hi, nil +} + +// InsertOne allows you to insert a single status into the timeline, with optional prepared API model. +// The return value indicates whether status should be skipped from streams, e.g. if already boosted recently. +func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) (skip bool) { + + // If timeline no preloaded, i.e. + // no-one using it, don't insert. + if !t.preloader.Check() { + return false + } + + if status.BoostOfID != "" { + // Check through top $repeatBoostDepth number of items. + for i, value := range t.cache.RangeUnsafe(structr.Desc) { + if i >= repeatBoostDepth { + break + } + + // We don't care about values that have + // already been hidden as repeat boosts. + if value.repeatBoost { + continue + } + + // If inserted status has already been boosted, or original was posted + // within last $repeatBoostDepth, we indicate it as a repeated boost. + if value.ID == status.BoostOfID || value.BoostOfID == status.BoostOfID { + skip = true + break + } + } + } + + // Insert new timeline status. + t.cache.Insert(&StatusMeta{ + ID: status.ID, + AccountID: status.AccountID, + BoostOfID: status.BoostOfID, + BoostOfAccountID: status.BoostOfAccountID, + repeatBoost: skip, + loaded: nil, + prepared: prepared, + }) + + return +} + +// RemoveByStatusID removes all cached timeline entries pertaining to +// status ID, including those that may be a boost of the given status. +func (t *StatusTimeline) RemoveByStatusIDs(statusIDs ...string) { + keys := make([]structr.Key, len(statusIDs)) + + // Nil check indices outside loops. + if t.idx_ID == nil || + t.idx_BoostOfID == nil { + panic("indices are nil") + } + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_ID.Key(id) + } + + // Invalidate all cached entries with IDs. + t.cache.Invalidate(t.idx_ID, keys...) + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_BoostOfID.Key(id) + } + + // Invalidate all cached entries as boost of IDs. + t.cache.Invalidate(t.idx_BoostOfID, keys...) +} + +// RemoveByAccountID removes all cached timeline entries authored by +// account ID, including those that may be boosted by account ID. +func (t *StatusTimeline) RemoveByAccountIDs(accountIDs ...string) { + keys := make([]structr.Key, len(accountIDs)) + + // Nil check indices outside loops. + if t.idx_AccountID == nil || + t.idx_BoostOfAccountID == nil { + panic("indices are nil") + } + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_AccountID.Key(id) + } + + // Invalidate all cached entries as by IDs. + t.cache.Invalidate(t.idx_AccountID, keys...) + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_BoostOfAccountID.Key(id) + } + + // Invalidate all cached entries as boosted by IDs. + t.cache.Invalidate(t.idx_BoostOfAccountID, keys...) +} + +// UnprepareByStatusIDs removes cached frontend API models for all cached +// timeline entries pertaining to status ID, including boosts of given status. +func (t *StatusTimeline) UnprepareByStatusIDs(statusIDs ...string) { + keys := make([]structr.Key, len(statusIDs)) + + // Nil check indices outside loops. + if t.idx_ID == nil || + t.idx_BoostOfID == nil { + panic("indices are nil") + } + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_ID.Key(id) + } + + // Unprepare all statuses stored under StatusMeta.ID. + for meta := range t.cache.RangeKeysUnsafe(t.idx_ID, keys...) { + meta.prepared = nil + } + + // Convert statusIDs to index keys. + for i, id := range statusIDs { + keys[i] = t.idx_BoostOfID.Key(id) + } + + // Unprepare all statuses stored under StatusMeta.BoostOfID. + for meta := range t.cache.RangeKeysUnsafe(t.idx_BoostOfID, keys...) { + meta.prepared = nil + } +} + +// UnprepareByAccountIDs removes cached frontend API models for all cached +// timeline entries authored by account ID, including boosts by account ID. +func (t *StatusTimeline) UnprepareByAccountIDs(accountIDs ...string) { + keys := make([]structr.Key, len(accountIDs)) + + // Nil check indices outside loops. + if t.idx_AccountID == nil || + t.idx_BoostOfAccountID == nil { + panic("indices are nil") + } + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_AccountID.Key(id) + } + + // Unprepare all statuses stored under StatusMeta.AccountID. + for meta := range t.cache.RangeKeysUnsafe(t.idx_AccountID, keys...) { + meta.prepared = nil + } + + // Convert accountIDs to index keys. + for i, id := range accountIDs { + keys[i] = t.idx_BoostOfAccountID.Key(id) + } + + // Unprepare all statuses stored under StatusMeta.BoostOfAccountID. + for meta := range t.cache.RangeKeysUnsafe(t.idx_BoostOfAccountID, keys...) { + meta.prepared = nil + } +} + +// UnprepareAll removes cached frontend API +// models for all cached timeline entries. +func (t *StatusTimeline) UnprepareAll() { + for _, value := range t.cache.RangeUnsafe(structr.Asc) { + value.prepared = nil + } +} + +// Trim will ensure that receiving timeline is less than or +// equal in length to the given threshold percentage of the +// timeline's preconfigured maximum capacity. This will always +// trim from the bottom-up to prioritize streamed inserts. +func (t *StatusTimeline) Trim() { t.cache.Trim(t.cut, structr.Asc) } + +// Clear will mark the entire timeline as requiring preload, +// which will trigger a clear and reload of the entire thing. +func (t *StatusTimeline) Clear() { t.preloader.Clear() } + +// prepareStatuses takes a slice of cached (or, freshly loaded!) StatusMeta{} +// models, and use given function to return prepared frontend API models. +func prepareStatuses( + ctx context.Context, + meta []*StatusMeta, + prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error), + apiStatuses []*apimodel.Status, + limit int, +) []*apimodel.Status { + switch { //nolint:gocritic + case prepareAPI == nil: + panic("nil prepare fn") + } + + // Iterate the given StatusMeta objects for pre-prepared + // frontend models, otherwise attempting to prepare them. + for _, meta := range meta { + + // Check if we have prepared enough + // API statuses for caller to return. + if len(apiStatuses) >= limit { + break + } + + if meta.loaded == nil { + // We failed loading this + // status, skip preparing. + continue + } + + if meta.repeatBoost { + // This is a repeat boost in + // short timespan, skip it. + continue + } + + if meta.prepared == nil { + var err error + + // Prepare the provided status to frontend. + meta.prepared, err = prepareAPI(meta.loaded) + if err != nil { + log.Errorf(ctx, "error preparing status %s: %v", meta.loaded.URI, err) + continue + } + } + + // Append to return slice. + if meta.prepared != nil { + apiStatuses = append(apiStatuses, meta.prepared) + } + } + + return apiStatuses +} + +// loadStatuses loads statuses using provided callback +// for the statuses in meta slice that aren't loaded. +// the amount very much depends on whether meta objects +// are yet-to-be-cached (i.e. newly loaded, with status), +// or are from the timeline cache (unloaded status). +func loadStatuses( + metas []*StatusMeta, + loadIDs func([]string) ([]*gtsmodel.Status, error), +) error { + + // Determine which of our passed status + // meta objects still need statuses loading. + toLoadIDs := make([]string, len(metas)) + loadedMap := make(map[string]*StatusMeta, len(metas)) + for i, meta := range metas { + if meta.loaded == nil { + toLoadIDs[i] = meta.ID + loadedMap[meta.ID] = meta + } + } + + // Load statuses with given IDs. + loaded, err := loadIDs(toLoadIDs) + if err != nil { + return gtserror.Newf("error loading statuses: %w", err) + } + + // Update returned StatusMeta objects + // with newly loaded statuses by IDs. + for i := range loaded { + status := loaded[i] + meta := loadedMap[status.ID] + meta.loaded = status + } + + return nil +} + +// toStatusMeta converts a slice of database model statuses +// into our cache wrapper type, a slice of []StatusMeta{}. +func toStatusMeta(in []*StatusMeta, statuses []*gtsmodel.Status) []*StatusMeta { + return xslices.Gather(in, statuses, func(s *gtsmodel.Status) *StatusMeta { + return &StatusMeta{ + ID: s.ID, + AccountID: s.AccountID, + BoostOfID: s.BoostOfID, + BoostOfAccountID: s.BoostOfAccountID, + loaded: s, + prepared: nil, + } + }) +} + +// doStatusFilter performs given filter function on provided statuses, +func doStatusFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status) bool) []*gtsmodel.Status { + + // Check for provided + // filter function. + if filter == nil { + return statuses + } + + // Filter the provided input statuses. + return slices.DeleteFunc(statuses, filter) +} diff --git a/internal/cache/timeline/status_map.go b/internal/cache/timeline/status_map.go new file mode 100644 index 000000000..e402883af --- /dev/null +++ b/internal/cache/timeline/status_map.go @@ -0,0 +1,198 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "maps" + "sync/atomic" +) + +// StatusTimelines is a concurrency safe map of StatusTimeline{} +// objects, optimizing *very heavily* for reads over writes. +type StatusTimelines struct { + ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS + cap int +} + +// Init stores the given argument(s) such that any created StatusTimeline{} +// objects by MustGet() will initialize them with the given arguments. +func (t *StatusTimelines) Init(cap int) { t.cap = cap } + +// MustGet will attempt to fetch StatusTimeline{} stored under key, else creating one. +func (t *StatusTimelines) MustGet(key string) *StatusTimeline { + var tt *StatusTimeline + + for { + // Load current ptr. + cur := t.ptr.Load() + + // Get timeline map to work on. + var m map[string]*StatusTimeline + + if cur != nil { + // Look for existing + // timeline in cache. + tt = (*cur)[key] + if tt != nil { + return tt + } + + // Get clone of current + // before modifications. + m = maps.Clone(*cur) + } else { + // Allocate new timeline map for below. + m = make(map[string]*StatusTimeline) + } + + if tt == nil { + // Allocate new timeline. + tt = new(StatusTimeline) + tt.Init(t.cap) + } + + // Store timeline + // in new map. + m[key] = tt + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully inserted + // new timeline model. + return tt + } +} + +// Delete will delete the stored StatusTimeline{} under key, if any. +func (t *StatusTimelines) Delete(key string) { + for { + // Load current ptr. + cur := t.ptr.Load() + + // Check for empty map / not in map. + if cur == nil || (*cur)[key] == nil { + return + } + + // Get clone of current + // before modifications. + m := maps.Clone(*cur) + + // Delete ID. + delete(m, key) + + // Attempt to update the map ptr. + if !t.ptr.CompareAndSwap(cur, &m) { + + // We failed the + // CAS, reloop. + continue + } + + // Successfully + // deleted ID. + return + } +} + +// RemoveByStatusIDs calls RemoveByStatusIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.RemoveByStatusIDs(statusIDs...) + } + } +} + +// RemoveByAccountIDs calls RemoveByAccountIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) RemoveByAccountIDs(accountIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.RemoveByAccountIDs(accountIDs...) + } + } +} + +// UnprepareByStatusIDs calls UnprepareByStatusIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) UnprepareByStatusIDs(statusIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareByStatusIDs(statusIDs...) + } + } +} + +// UnprepareByAccountIDs calls UnprepareByAccountIDs() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareByAccountIDs(accountIDs...) + } + } +} + +// Unprepare attempts to call UnprepareAll() for StatusTimeline{} under key. +func (t *StatusTimelines) Unprepare(key string) { + if p := t.ptr.Load(); p != nil { + if tt := (*p)[key]; tt != nil { + tt.UnprepareAll() + } + } +} + +// UnprepareAll calls UnprepareAll() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) UnprepareAll() { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.UnprepareAll() + } + } +} + +// Trim calls Trim() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) Trim() { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.Trim() + } + } +} + +// Clear attempts to call Clear() for StatusTimeline{} under key. +func (t *StatusTimelines) Clear(key string) { + if p := t.ptr.Load(); p != nil { + if tt := (*p)[key]; tt != nil { + tt.Clear() + } + } +} + +// ClearAll calls Clear() for each of the stored StatusTimeline{}s. +func (t *StatusTimelines) ClearAll() { + if p := t.ptr.Load(); p != nil { + for _, tt := range *p { + tt.Clear() + } + } +} diff --git a/internal/cache/timeline/status_test.go b/internal/cache/timeline/status_test.go new file mode 100644 index 000000000..3e53d8256 --- /dev/null +++ b/internal/cache/timeline/status_test.go @@ -0,0 +1,361 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "slices" + "testing" + + "codeberg.org/gruf/go-structr" + "github.com/stretchr/testify/assert" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +var testStatusMeta = []*StatusMeta{ + { + ID: "06B19VYTHEG01F3YW13RQE0QM8", + AccountID: "06B1A61MZEBBVDSNPRJAA8F2C4", + BoostOfID: "06B1A5KQWGQ1ABM3FA7TDX1PK8", + BoostOfAccountID: "06B1A6707818050PCK8SJAEC6G", + }, + { + ID: "06B19VYTJFT0KDWT5C1CPY0XNC", + AccountID: "06B1A61MZN3ZQPZVNGEFBNYBJW", + BoostOfID: "06B1A5KQWSGFN4NNRV34KV5S9R", + BoostOfAccountID: "06B1A6707HY8RAXG7JPCWR7XD4", + }, + { + ID: "06B19VYTJ6WZQPRVNJHPEZH04W", + AccountID: "06B1A61MZY7E0YB6G01VJX8ERR", + BoostOfID: "06B1A5KQX5NPGSYGH8NC7HR1GR", + BoostOfAccountID: "06B1A6707XCSAF0MVCGGYF9160", + }, + { + ID: "06B19VYTJPKGG8JYCR1ENAV7KC", + AccountID: "06B1A61N07K1GC35PJ3CZ4M020", + BoostOfID: "06B1A5KQXG6ZCWE1R7C7KR7RYW", + BoostOfAccountID: "06B1A67084W6SB6P6HJB7K5DSG", + }, + { + ID: "06B19VYTHRR8S35QXC5A6VE2YW", + AccountID: "06B1A61N0P1TGQDVKANNG4AKP4", + BoostOfID: "06B1A5KQY3K839Z6S5HHAJKSWW", + BoostOfAccountID: "06B1A6708SPJC3X3ZG3SGG8BN8", + }, +} + +func TestStatusTimelineUnprepare(t *testing.T) { + var tt StatusTimeline + tt.Init(1000) + + // Clone the input test status data. + data := slices.Clone(testStatusMeta) + + // Bodge some 'prepared' + // models on test data. + for _, meta := range data { + meta.prepared = &apimodel.Status{} + } + + // Insert test data into timeline. + _ = tt.cache.Insert(data...) + + for _, meta := range data { + // Unprepare this status with ID. + tt.UnprepareByStatusIDs(meta.ID) + + // Check the item is unprepared. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value.prepared) + } + + // Clear and reinsert. + tt.cache.Clear() + tt.cache.Insert(data...) + + for _, meta := range data { + // Unprepare this status with boost ID. + tt.UnprepareByStatusIDs(meta.BoostOfID) + + // Check the item is unprepared. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value.prepared) + } + + // Clear and reinsert. + tt.cache.Clear() + tt.cache.Insert(data...) + + for _, meta := range data { + // Unprepare this status with account ID. + tt.UnprepareByAccountIDs(meta.AccountID) + + // Check the item is unprepared. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value.prepared) + } + + // Clear and reinsert. + tt.cache.Clear() + tt.cache.Insert(data...) + + for _, meta := range data { + // Unprepare this status with boost account ID. + tt.UnprepareByAccountIDs(meta.BoostOfAccountID) + + // Check the item is unprepared. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value.prepared) + } +} + +func TestStatusTimelineRemove(t *testing.T) { + var tt StatusTimeline + tt.Init(1000) + + // Clone the input test status data. + data := slices.Clone(testStatusMeta) + + // Insert test data into timeline. + _ = tt.cache.Insert(data...) + + for _, meta := range data { + // Remove this status with ID. + tt.RemoveByStatusIDs(meta.ID) + + // Check the item is now gone. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value) + } + + // Clear and reinsert. + tt.cache.Clear() + tt.cache.Insert(data...) + + for _, meta := range data { + // Remove this status with boost ID. + tt.RemoveByStatusIDs(meta.BoostOfID) + + // Check the item is now gone. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value) + } + + // Clear and reinsert. + tt.cache.Clear() + tt.cache.Insert(data...) + + for _, meta := range data { + // Remove this status with account ID. + tt.RemoveByAccountIDs(meta.AccountID) + + // Check the item is now gone. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value) + } + + // Clear and reinsert. + tt.cache.Clear() + tt.cache.Insert(data...) + + for _, meta := range data { + // Remove this status with boost account ID. + tt.RemoveByAccountIDs(meta.BoostOfAccountID) + + // Check the item is now gone. + value := getStatusByID(&tt, meta.ID) + assert.Nil(t, value) + } +} + +func TestStatusTimelineInserts(t *testing.T) { + var tt StatusTimeline + tt.Init(1000) + + // Clone the input test status data. + data := slices.Clone(testStatusMeta) + + // Insert test data into timeline. + l := tt.cache.Insert(data...) + assert.Equal(t, len(data), l) + + // Ensure 'min' value status + // in the timeline is expected. + minID := minStatusID(data) + assert.Equal(t, minID, minStatus(&tt).ID) + + // Ensure 'max' value status + // in the timeline is expected. + maxID := maxStatusID(data) + assert.Equal(t, maxID, maxStatus(&tt).ID) + + // Manually mark timeline as 'preloaded'. + tt.preloader.CheckPreload(tt.preloader.Done) + + // Specifically craft a boost of latest (i.e. max) status in timeline. + boost := >smodel.Status{ID: "06B1A00PQWDZZH9WK9P5VND35C", BoostOfID: maxID} + + // Insert boost into the timeline + // checking for 'repeatBoost' notifier. + repeatBoost := tt.InsertOne(boost, nil) + assert.True(t, repeatBoost) + + // This should be the new 'max' + // and have 'repeatBoost' set. + newMax := maxStatus(&tt) + assert.Equal(t, boost.ID, newMax.ID) + assert.True(t, newMax.repeatBoost) + + // Specifically craft 2 boosts of some unseen status in the timeline. + boost1 := >smodel.Status{ID: "06B1A121YEX02S0AY48X93JMDW", BoostOfID: "unseen"} + boost2 := >smodel.Status{ID: "06B1A12TG2NTJC9P270EQXS08M", BoostOfID: "unseen"} + + // Insert boosts into the timeline, ensuring + // first is not 'repeat', but second one is. + repeatBoost1 := tt.InsertOne(boost1, nil) + repeatBoost2 := tt.InsertOne(boost2, nil) + assert.False(t, repeatBoost1) + assert.True(t, repeatBoost2) +} + +func TestStatusTimelineTrim(t *testing.T) { + var tt StatusTimeline + tt.Init(1000) + + // Clone the input test status data. + data := slices.Clone(testStatusMeta) + + // Insert test data into timeline. + _ = tt.cache.Insert(data...) + + // From here it'll be easier to have DESC sorted + // test data for reslicing and checking against. + slices.SortFunc(data, func(a, b *StatusMeta) int { + const k = +1 + switch { + case a.ID < b.ID: + return +k + case b.ID < a.ID: + return -k + default: + return 0 + } + }) + + // Set manual cutoff for trim. + tt.cut = len(data) - 1 + + // Perform trim. + tt.Trim() + + // The post trim length should be tt.cut + assert.Equal(t, tt.cut, tt.cache.Len()) + + // It specifically should have removed + // the oldest (i.e. min) status element. + minID := data[len(data)-1].ID + assert.NotEqual(t, minID, minStatus(&tt).ID) + assert.False(t, containsStatusID(&tt, minID)) + + // Drop trimmed status. + data = data[:len(data)-1] + + // Set smaller cutoff for trim. + tt.cut = len(data) - 2 + + // Perform trim. + tt.Trim() + + // The post trim length should be tt.cut + assert.Equal(t, tt.cut, tt.cache.Len()) + + // It specifically should have removed + // the oldest 2 (i.e. min) status elements. + minID1 := data[len(data)-1].ID + minID2 := data[len(data)-2].ID + assert.NotEqual(t, minID1, minStatus(&tt).ID) + assert.NotEqual(t, minID2, minStatus(&tt).ID) + assert.False(t, containsStatusID(&tt, minID1)) + assert.False(t, containsStatusID(&tt, minID2)) + + // Trim at desired length + // should cause no change. + before := tt.cache.Len() + tt.Trim() + assert.Equal(t, before, tt.cache.Len()) +} + +// containsStatusID returns whether timeline contains a status with ID. +func containsStatusID(t *StatusTimeline, id string) bool { + return getStatusByID(t, id) != nil +} + +// getStatusByID attempts to fetch status with given ID from timeline. +func getStatusByID(t *StatusTimeline, id string) *StatusMeta { + for _, value := range t.cache.Range(structr.Desc) { + if value.ID == id { + return value + } + } + return nil +} + +// maxStatus returns the newest (i.e. highest value ID) status in timeline. +func maxStatus(t *StatusTimeline) *StatusMeta { + var meta *StatusMeta + for _, value := range t.cache.Range(structr.Desc) { + meta = value + break + } + return meta +} + +// minStatus returns the oldest (i.e. lowest value ID) status in timeline. +func minStatus(t *StatusTimeline) *StatusMeta { + var meta *StatusMeta + for _, value := range t.cache.Range(structr.Asc) { + meta = value + break + } + return meta +} + +// minStatusID returns the oldest (i.e. lowest value ID) status in metas. +func minStatusID(metas []*StatusMeta) string { + var min string + min = metas[0].ID + for i := 1; i < len(metas); i++ { + if metas[i].ID < min { + min = metas[i].ID + } + } + return min +} + +// maxStatusID returns the newest (i.e. highest value ID) status in metas. +func maxStatusID(metas []*StatusMeta) string { + var max string + max = metas[0].ID + for i := 1; i < len(metas); i++ { + if metas[i].ID > max { + max = metas[i].ID + } + } + return max +} diff --git a/internal/cache/timeline/timeline.go b/internal/cache/timeline/timeline.go new file mode 100644 index 000000000..4f8797e82 --- /dev/null +++ b/internal/cache/timeline/timeline.go @@ -0,0 +1,59 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "time" + + "codeberg.org/gruf/go-structr" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/paging" +) + +// plus1hULID returns a ULID for now+1h. +func plus1hULID() string { + t := time.Now().Add(time.Hour) + return id.NewULIDFromTime(t) +} + +// nextPageParams gets the next set of paging +// parameters to use based on the current set, +// and the next set of lo / hi values. This will +// correctly handle making sure that, depending +// on the paging order, the cursor value gets +// updated while maintaining the boundary value. +func nextPageParams( + page *paging.Page, + lastIdx string, + order paging.Order, +) { + if order.Ascending() { + page.Min.Value = lastIdx + } else /* i.e. descending */ { //nolint:revive + page.Max.Value = lastIdx + } +} + +// toDirection converts page order to timeline direction. +func toDirection(order paging.Order) structr.Direction { + if order.Ascending() { + return structr.Asc + } else /* i.e. descending */ { //nolint:revive + return structr.Desc + } +} diff --git a/internal/cache/wrappers.go b/internal/cache/wrappers.go index 9cb4fca98..34d7cb8db 100644 --- a/internal/cache/wrappers.go +++ b/internal/cache/wrappers.go @@ -27,19 +27,19 @@ import ( // SliceCache wraps a simple.Cache to provide simple loader-callback // functions for fetching + caching slices of objects (e.g. IDs). type SliceCache[T any] struct { - cache simple.Cache[string, []T] + simple.Cache[string, []T] } // Init initializes the cache with given length + capacity. func (c *SliceCache[T]) Init(len, cap int) { - c.cache = simple.Cache[string, []T]{} - c.cache.Init(len, cap) + c.Cache = simple.Cache[string, []T]{} + c.Cache.Init(len, cap) } // Load will attempt to load an existing slice from cache for key, else calling load function and caching the result. func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error) { // Look for cached values. - data, ok := c.cache.Get(key) + data, ok := c.Cache.Get(key) if !ok { var err error @@ -51,7 +51,7 @@ func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error) } // Store the data. - c.cache.Set(key, data) + c.Cache.Set(key, data) } // Return data clone for safety. @@ -60,27 +60,7 @@ func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error) // Invalidate: see simple.Cache{}.InvalidateAll(). func (c *SliceCache[T]) Invalidate(keys ...string) { - _ = c.cache.InvalidateAll(keys...) -} - -// Trim: see simple.Cache{}.Trim(). -func (c *SliceCache[T]) Trim(perc float64) { - c.cache.Trim(perc) -} - -// Clear: see simple.Cache{}.Clear(). -func (c *SliceCache[T]) Clear() { - c.cache.Clear() -} - -// Len: see simple.Cache{}.Len(). -func (c *SliceCache[T]) Len() int { - return c.cache.Len() -} - -// Cap: see simple.Cache{}.Cap(). -func (c *SliceCache[T]) Cap() int { - return c.cache.Cap() + _ = c.Cache.InvalidateAll(keys...) } // StructCache wraps a structr.Cache{} to simple index caching @@ -89,17 +69,17 @@ func (c *SliceCache[T]) Cap() int { // name under the main database caches struct which would reduce // time required to access cached values). type StructCache[StructType any] struct { - cache structr.Cache[StructType] + structr.Cache[StructType] index map[string]*structr.Index } // Init initializes the cache with given structr.CacheConfig{}. func (c *StructCache[T]) Init(config structr.CacheConfig[T]) { c.index = make(map[string]*structr.Index, len(config.Indices)) - c.cache = structr.Cache[T]{} - c.cache.Init(config) + c.Cache = structr.Cache[T]{} + c.Cache.Init(config) for _, cfg := range config.Indices { - c.index[cfg.Fields] = c.cache.Index(cfg.Fields) + c.index[cfg.Fields] = c.Cache.Index(cfg.Fields) } } @@ -107,26 +87,21 @@ func (c *StructCache[T]) Init(config structr.CacheConfig[T]) { // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) GetOne(index string, key ...any) (T, bool) { i := c.index[index] - return c.cache.GetOne(i, i.Key(key...)) + return c.Cache.GetOne(i, i.Key(key...)) } // Get calls structr.Cache{}.Get(), using a cached structr.Index{} by 'index' name. // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) Get(index string, keys ...[]any) []T { i := c.index[index] - return c.cache.Get(i, i.Keys(keys...)...) -} - -// Put: see structr.Cache{}.Put(). -func (c *StructCache[T]) Put(values ...T) { - c.cache.Put(values...) + return c.Cache.Get(i, i.Keys(keys...)...) } // LoadOne calls structr.Cache{}.LoadOne(), using a cached structr.Index{} by 'index' name. // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) LoadOne(index string, load func() (T, error), key ...any) (T, error) { i := c.index[index] - return c.cache.LoadOne(i, i.Key(key...), load) + return c.Cache.LoadOne(i, i.Key(key...), load) } // LoadIDs calls structr.Cache{}.Load(), using a cached structr.Index{} by 'index' name. Note: this also handles @@ -149,7 +124,7 @@ func (c *StructCache[T]) LoadIDs(index string, ids []string, load func([]string) } // Pass loader callback with wrapper onto main cache load function. - return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { + return c.Cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { uncachedIDs := make([]string, len(uncached)) for i := range uncached { uncachedIDs[i] = uncached[i].Values()[0].(string) @@ -177,7 +152,7 @@ func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, l } // Pass loader callback with wrapper onto main cache load function. - return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { + return c.Cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { uncachedIDs := make([]string, len(uncached)) for i := range uncached { uncachedIDs[i] = uncached[i].Values()[1].(string) @@ -186,16 +161,11 @@ func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, l }) } -// Store: see structr.Cache{}.Store(). -func (c *StructCache[T]) Store(value T, store func() error) error { - return c.cache.Store(value, store) -} - // Invalidate calls structr.Cache{}.Invalidate(), using a cached structr.Index{} by 'index' name. // Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}. func (c *StructCache[T]) Invalidate(index string, key ...any) { i := c.index[index] - c.cache.Invalidate(i, i.Key(key...)) + c.Cache.Invalidate(i, i.Key(key...)) } // InvalidateIDs calls structr.Cache{}.Invalidate(), using a cached structr.Index{} by 'index' name. Note: this also @@ -218,25 +188,5 @@ func (c *StructCache[T]) InvalidateIDs(index string, ids []string) { } // Pass to main invalidate func. - c.cache.Invalidate(i, keys...) -} - -// Trim: see structr.Cache{}.Trim(). -func (c *StructCache[T]) Trim(perc float64) { - c.cache.Trim(perc) -} - -// Clear: see structr.Cache{}.Clear(). -func (c *StructCache[T]) Clear() { - c.cache.Clear() -} - -// Len: see structr.Cache{}.Len(). -func (c *StructCache[T]) Len() int { - return c.cache.Len() -} - -// Cap: see structr.Cache{}.Cap(). -func (c *StructCache[T]) Cap() int { - return c.cache.Cap() + c.Cache.Invalidate(i, keys...) } |
