summaryrefslogtreecommitdiff
path: root/internal/cache
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2025-04-26 09:56:15 +0000
committerLibravatar GitHub <noreply@github.com>2025-04-26 09:56:15 +0000
commit6a6a4993338262f87df34c9be051bfaac75c1829 (patch)
treebfbda090dc4b25efdd34145c016d7cc7b9c14d6e /internal/cache
parent[chore] Move deps to code.superseriousbusiness.org (#4054) (diff)
downloadgotosocial-6a6a4993338262f87df34c9be051bfaac75c1829.tar.xz
[performance] rewrite timelines to rely on new timeline cache type (#3941)
* start work rewriting timeline cache type * further work rewriting timeline caching * more work integration new timeline code * remove old code * add local timeline, fix up merge conflicts * remove old use of go-bytes * implement new timeline code into more areas of codebase, pull in latest go-mangler, go-mutexes, go-structr * remove old timeline package, add local timeline cache * remove references to old timeline types that needed starting up in tests * start adding page validation * fix test-identified timeline cache package issues * fix up more tests, fix missing required changes, etc * add exclusion for test.out in gitignore * clarify some things better in code comments * tweak cache size limits * fix list timeline cache fetching * further list timeline fixes * linter, ssssssssshhhhhhhhhhhh please * fix linter hints * reslice the output if it's beyond length of 'lim' * remove old timeline initialization code, bump go-structr to v0.9.4 * continued from previous commit * improved code comments * don't allow multiple entries for BoostOfID values to prevent repeated boosts of same boosts * finish writing more code comments * some variable renaming, for ease of following * change the way we update lo,hi paging values during timeline load * improved code comments for updated / returned lo , hi paging values * finish writing code comments for the StatusTimeline{} type itself * fill in more code comments * update go-structr version to latest with changed timeline unique indexing logic * have a local and public timeline *per user* * rewrite calls to public / local timeline calls * remove the zero length check, as lo, hi values might still be set * simplify timeline cache loading, fix lo/hi returns, fix timeline invalidation side-effects missing for some federated actions * swap the lo, hi values :facepalm: * add (now) missing slice reverse of tag timeline statuses when paging ASC * remove local / public caches (is out of scope for this work), share more timeline code * remove unnecessary change * again, remove more unused code * remove unused function to appease the linter * move boost checking to prepare function * fix use of timeline.lastOrder, fix incorrect range functions used * remove comments for repeat code * remove the boost logic from prepare function * do a maximum of 5 loads, not 10 * add repeat boost filtering logic, update go-structr, general improvements * more code comments * add important note * fix timeline tests now that timelines are returned in page order * remove unused field * add StatusTimeline{} tests * add more status timeline tests * start adding preloading support * ensure repeat boosts are marked in preloaded entries * share a bunch of the database load code in timeline cache, don't clear timelines on relationship change * add logic to allow dynamic clear / preloading of timelines * comment-out unused functions, but leave in place as we might end-up using them * fix timeline preload state check * much improved status timeline code comments * more code comments, don't bother inserting statuses if timeline not preloaded * shift around some logic to make sure things aren't accidentally left set * finish writing code comments * remove trim-after-insert behaviour * fix-up some comments referring to old logic * remove unsetting of lo, hi * fix preload repeatBoost checking logic * don't return on status filter errors, these are usually transient * better concurrency safety in Clear() and Done() * fix test broken due to addition of preloader * fix repeatBoost logic that doesn't account for already-hidden repeatBoosts * ensure edit submodels are dropped on cache insertion * update code-comment to expand CAS accronym * use a plus1hULID() instead of 24h * remove unused functions * add note that public / local timeline requester can be nil * fix incorrect visibility filtering of tag timeline statuses * ensure we filter home timeline statuses on local only * some small re-orderings to confirm query params in correct places * fix the local only home timeline filter func
Diffstat (limited to 'internal/cache')
-rw-r--r--internal/cache/cache.go9
-rw-r--r--internal/cache/timeline.go51
-rw-r--r--internal/cache/timeline/preload.go152
-rw-r--r--internal/cache/timeline/status.go842
-rw-r--r--internal/cache/timeline/status_map.go198
-rw-r--r--internal/cache/timeline/status_test.go361
-rw-r--r--internal/cache/timeline/timeline.go59
-rw-r--r--internal/cache/wrappers.go84
8 files changed, 1688 insertions, 68 deletions
diff --git a/internal/cache/cache.go b/internal/cache/cache.go
index 7844c03f8..e3fd0d1fe 100644
--- a/internal/cache/cache.go
+++ b/internal/cache/cache.go
@@ -46,6 +46,9 @@ type Caches struct {
// `[status.ID][status.UpdatedAt.Unix()]`
StatusesFilterableFields *ttl.Cache[string, []string]
+ // Timelines ...
+ Timelines TimelineCaches
+
// Visibility provides access to the item visibility
// cache. (used by the visibility filter).
Visibility VisibilityCache
@@ -87,12 +90,14 @@ func (c *Caches) Init() {
c.initFollowRequest()
c.initFollowRequestIDs()
c.initFollowingTagIDs()
+ c.initHomeTimelines()
c.initInReplyToIDs()
c.initInstance()
c.initInteractionRequest()
c.initList()
c.initListIDs()
c.initListedIDs()
+ c.initListTimelines()
c.initMarker()
c.initMedia()
c.initMention()
@@ -109,6 +114,7 @@ func (c *Caches) Init() {
c.initStatusEdit()
c.initStatusFave()
c.initStatusFaveIDs()
+ c.initStatusesFilterableFields()
c.initTag()
c.initThreadMute()
c.initToken()
@@ -120,7 +126,6 @@ func (c *Caches) Init() {
c.initWebPushSubscription()
c.initWebPushSubscriptionIDs()
c.initVisibility()
- c.initStatusesFilterableFields()
}
// Start will start any caches that require a background
@@ -207,6 +212,8 @@ func (c *Caches) Sweep(threshold float64) {
c.DB.User.Trim(threshold)
c.DB.UserMute.Trim(threshold)
c.DB.UserMuteIDs.Trim(threshold)
+ c.Timelines.Home.Trim()
+ c.Timelines.List.Trim()
c.Visibility.Trim(threshold)
}
diff --git a/internal/cache/timeline.go b/internal/cache/timeline.go
new file mode 100644
index 000000000..3e6a68558
--- /dev/null
+++ b/internal/cache/timeline.go
@@ -0,0 +1,51 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package cache
+
+import (
+ "github.com/superseriousbusiness/gotosocial/internal/cache/timeline"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+)
+
+type TimelineCaches struct {
+ // Home provides a concurrency-safe map of status timeline
+ // caches for home timelines, keyed by home's account ID.
+ Home timeline.StatusTimelines
+
+ // List provides a concurrency-safe map of status
+ // timeline caches for lists, keyed by list ID.
+ List timeline.StatusTimelines
+}
+
+func (c *Caches) initHomeTimelines() {
+ // TODO: configurable
+ cap := 800
+
+ log.Infof(nil, "cache size = %d", cap)
+
+ c.Timelines.Home.Init(cap)
+}
+
+func (c *Caches) initListTimelines() {
+ // TODO: configurable
+ cap := 800
+
+ log.Infof(nil, "cache size = %d", cap)
+
+ c.Timelines.List.Init(cap)
+}
diff --git a/internal/cache/timeline/preload.go b/internal/cache/timeline/preload.go
new file mode 100644
index 000000000..b941a8b0c
--- /dev/null
+++ b/internal/cache/timeline/preload.go
@@ -0,0 +1,152 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package timeline
+
+import (
+ "sync"
+ "sync/atomic"
+
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+)
+
+// preloader provides a means of synchronising the
+// initial fill, or "preload", of a timeline cache.
+// it has 4 possible states in the atomic pointer:
+// - preloading = &(interface{}(*sync.WaitGroup))
+// - preloaded = &(interface{}(nil))
+// - needs preload = &(interface{}(false))
+// - brand-new = nil (functionally same as 'needs preload')
+type preloader struct{ p atomic.Pointer[any] }
+
+// Check will return the current preload state,
+// waiting if a preload is currently in progress.
+func (p *preloader) Check() bool {
+ for {
+ // Get state ptr.
+ ptr := p.p.Load()
+
+ // Check if requires preloading.
+ if ptr == nil || *ptr == false {
+ return false
+ }
+
+ // Check for a preload currently in progress.
+ if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil {
+ wg.Wait()
+ continue
+ }
+
+ // Anything else
+ // means success.
+ return true
+ }
+}
+
+// CheckPreload will safely check the preload state,
+// and if needed call the provided function. if a
+// preload is in progress, it will wait until complete.
+func (p *preloader) CheckPreload(preload func(*any)) {
+ for {
+ // Get state ptr.
+ ptr := p.p.Load()
+
+ if ptr == nil || *ptr == false {
+ // Needs preloading, start it.
+ ok := p.start(ptr, preload)
+
+ if !ok {
+ // Failed to acquire start,
+ // other thread beat us to it.
+ continue
+ }
+
+ // Success!
+ return
+ }
+
+ // Check for a preload currently in progress.
+ if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil {
+ wg.Wait()
+ continue
+ }
+
+ // Anything else
+ // means success.
+ return
+ }
+}
+
+// start attempts to start the given preload function, by performing
+// a compare and swap operation with 'old'. return is success.
+func (p *preloader) start(old *any, preload func(*any)) bool {
+
+ // Optimistically setup a
+ // new waitgroup to set as
+ // the preload waiter.
+ var wg sync.WaitGroup
+ wg.Add(1)
+ defer wg.Done()
+
+ // Wrap waitgroup in
+ // 'any' for pointer.
+ new := any(&wg)
+ ptr := &new
+
+ // Attempt CAS operation to claim start.
+ started := p.p.CompareAndSwap(old, ptr)
+ if !started {
+ return false
+ }
+
+ // Start.
+ preload(ptr)
+ return true
+}
+
+// done marks state as preloaded,
+// i.e. no more preload required.
+func (p *preloader) Done(ptr *any) {
+ if !p.p.CompareAndSwap(ptr, new(any)) {
+ log.Errorf(nil, "BUG: invalid preloader state: %#v", (*p.p.Load()))
+ }
+}
+
+// clear will clear the state, marking a "preload" as required.
+// i.e. next call to Check() will call provided preload func.
+func (p *preloader) Clear() {
+ b := false
+ a := any(b)
+ for {
+ // Load current ptr.
+ ptr := p.p.Load()
+ if ptr == nil {
+ return // was brand-new
+ }
+
+ // Check for a preload currently in progress.
+ if wg, _ := (*ptr).(*sync.WaitGroup); wg != nil {
+ wg.Wait()
+ continue
+ }
+
+ // Try mark as needing preload.
+ if p.p.CompareAndSwap(ptr, &a) {
+ return
+ }
+ }
+}
diff --git a/internal/cache/timeline/status.go b/internal/cache/timeline/status.go
new file mode 100644
index 000000000..071fc5a36
--- /dev/null
+++ b/internal/cache/timeline/status.go
@@ -0,0 +1,842 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package timeline
+
+import (
+ "context"
+ "slices"
+
+ "codeberg.org/gruf/go-structr"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/paging"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+ "github.com/superseriousbusiness/gotosocial/internal/util/xslices"
+)
+
+// repeatBoostDepth determines the minimum count
+// of statuses after which repeat boosts, or boosts
+// of the original, may appear. This is may not end
+// up *exact*, as small races between insert and the
+// repeatBoost calculation may allow 1 or so extra
+// to sneak in ahead of time. but it mostly works!
+const repeatBoostDepth = 40
+
+// StatusMeta contains minimum viable metadata
+// about a Status in order to cache a timeline.
+type StatusMeta struct {
+ ID string
+ AccountID string
+ BoostOfID string
+ BoostOfAccountID string
+
+ // is an internal flag that may be set on
+ // a StatusMeta object that will prevent
+ // preparation of its apimodel.Status, due
+ // to it being a recently repeated boost.
+ repeatBoost bool
+
+ // prepared contains prepared frontend API
+ // model for the referenced status. This may
+ // or may-not be nil depending on whether the
+ // status has been "unprepared" since the last
+ // call to "prepare" the frontend model.
+ prepared *apimodel.Status
+
+ // loaded is a temporary field that may be
+ // set for a newly loaded timeline status
+ // so that statuses don't need to be loaded
+ // from the database twice in succession.
+ //
+ // i.e. this will only be set if the status
+ // was newly inserted into the timeline cache.
+ // for existing cache items this will be nil.
+ loaded *gtsmodel.Status
+}
+
+// StatusTimeline provides a concurrency-safe sliding-window
+// cache of the freshest statuses in a timeline. Internally,
+// only StatusMeta{} objects themselves are stored, loading
+// the actual statuses when necessary, but caching prepared
+// frontend API models where possible.
+//
+// Notes on design:
+//
+// Previously, and initially when designing this newer type,
+// we had status timeline caches that would dynamically fill
+// themselves with statuses on call to Load() with statuses
+// at *any* location in the timeline, while simultaneously
+// accepting new input of statuses from the background workers.
+// This unfortunately can lead to situations where posts need
+// to be fetched from the database, but the cache isn't aware
+// they exist and instead returns an incomplete selection.
+// This problem is best outlined by the follow simple example:
+//
+// "what if my timeline cache contains posts 0-to-6 and 8-to-12,
+// and i make a request for posts between 4-and-10 with no limit,
+// how is it to know that it's missing post 7?"
+//
+// The solution is to unfortunately remove a lot of the caching
+// of "older areas" of the timeline, and instead just have it
+// be a sliding window of the freshest posts of that timeline.
+// It gets preloaded initially on start / first-call, and kept
+// up-to-date with new posts by streamed inserts from background
+// workers. Any requests for posts outside this we know therefore
+// must hit the database, (which we then *don't* cache).
+type StatusTimeline struct {
+
+ // underlying timeline cache of *StatusMeta{},
+ // primary-keyed by ID, with extra indices below.
+ cache structr.Timeline[*StatusMeta, string]
+
+ // preloader synchronizes preload
+ // state of the timeline cache.
+ preloader preloader
+
+ // fast-access cache indices.
+ idx_ID *structr.Index //nolint:revive
+ idx_AccountID *structr.Index //nolint:revive
+ idx_BoostOfID *structr.Index //nolint:revive
+ idx_BoostOfAccountID *structr.Index //nolint:revive
+
+ // cutoff and maximum item lengths.
+ // the timeline is trimmed back to
+ // cutoff on each call to Trim(),
+ // and maximum len triggers a Trim().
+ //
+ // the timeline itself does not
+ // limit items due to complexities
+ // it would introduce, so we apply
+ // a 'cut-off' at regular intervals.
+ cut, max int
+}
+
+// Init will initialize the timeline for usage,
+// by preparing internal indices etc. This also
+// sets the given max capacity for Trim() operations.
+func (t *StatusTimeline) Init(cap int) {
+ t.cache.Init(structr.TimelineConfig[*StatusMeta, string]{
+
+ // Timeline item primary key field.
+ PKey: structr.IndexConfig{Fields: "ID"},
+
+ // Additional indexed fields.
+ Indices: []structr.IndexConfig{
+ {Fields: "AccountID", Multiple: true},
+ {Fields: "BoostOfAccountID", Multiple: true},
+ {Fields: "BoostOfID", Multiple: true},
+ },
+
+ // Timeline item copy function.
+ Copy: func(s *StatusMeta) *StatusMeta {
+ var prepared *apimodel.Status
+ if s.prepared != nil {
+ prepared = new(apimodel.Status)
+ *prepared = *s.prepared
+ }
+ return &StatusMeta{
+ ID: s.ID,
+ AccountID: s.AccountID,
+ BoostOfID: s.BoostOfID,
+ BoostOfAccountID: s.BoostOfAccountID,
+ repeatBoost: s.repeatBoost,
+ loaded: nil, // NEVER stored
+ prepared: prepared,
+ }
+ },
+ })
+
+ // Get fast index lookup ptrs.
+ t.idx_ID = t.cache.Index("ID")
+ t.idx_AccountID = t.cache.Index("AccountID")
+ t.idx_BoostOfID = t.cache.Index("BoostOfID")
+ t.idx_BoostOfAccountID = t.cache.Index("BoostOfAccountID")
+
+ // Set maximum capacity and
+ // cutoff threshold we trim to.
+ t.cut = int(0.60 * float64(cap))
+ t.max = cap
+}
+
+// Preload will fill with StatusTimeline{} cache with
+// the latest sliding window of status metadata for the
+// timeline type returned by database 'loadPage' function.
+//
+// This function is concurrency-safe and repeated calls to
+// it when already preloaded will be no-ops. To trigger a
+// preload as being required, call .Clear().
+func (t *StatusTimeline) Preload(
+
+ // loadPage should load the timeline of given page for cache hydration.
+ loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
+
+ // filter can be used to perform filtering of returned
+ // statuses BEFORE insert into cache. i.e. this will effect
+ // what actually gets stored in the timeline cache.
+ filter func(each *gtsmodel.Status) (delete bool),
+) (
+ n int,
+ err error,
+) {
+ t.preloader.CheckPreload(func(ptr *any) {
+ n, err = t.preload(loadPage, filter)
+ if err != nil {
+ return
+ }
+
+ // Mark as preloaded.
+ t.preloader.Done(ptr)
+ })
+ return
+}
+
+// preload contains the core logic of
+// Preload(), without t.preloader checks.
+func (t *StatusTimeline) preload(
+
+ // loadPage should load the timeline of given page for cache hydration.
+ loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
+
+ // filter can be used to perform filtering of returned
+ // statuses BEFORE insert into cache. i.e. this will effect
+ // what actually gets stored in the timeline cache.
+ filter func(each *gtsmodel.Status) (delete bool),
+) (int, error) {
+ if loadPage == nil {
+ panic("nil load page func")
+ }
+
+ // Clear timeline
+ // before preload.
+ t.cache.Clear()
+
+ // Our starting, page at the top
+ // of the possible timeline.
+ page := new(paging.Page)
+ order := paging.OrderDescending
+ page.Max.Order = order
+ page.Max.Value = plus1hULID()
+ page.Min.Order = order
+ page.Min.Value = ""
+ page.Limit = 100
+
+ // Prepare a slice for gathering status meta.
+ metas := make([]*StatusMeta, 0, page.Limit)
+
+ var n int
+ for n < t.cut {
+ // Load page of timeline statuses.
+ statuses, err := loadPage(page)
+ if err != nil {
+ return n, gtserror.Newf("error loading statuses: %w", err)
+ }
+
+ // No more statuses from
+ // load function = at end.
+ if len(statuses) == 0 {
+ break
+ }
+
+ // Update our next page cursor from statuses.
+ page.Max.Value = statuses[len(statuses)-1].ID
+
+ // Perform any filtering on newly loaded statuses.
+ statuses = doStatusFilter(statuses, filter)
+
+ // After filtering no more
+ // statuses remain, retry.
+ if len(statuses) == 0 {
+ continue
+ }
+
+ // Convert statuses to meta and insert.
+ metas = toStatusMeta(metas[:0], statuses)
+ n = t.cache.Insert(metas...)
+ }
+
+ // This is a potentially 100-1000s size map,
+ // but still easily manageable memory-wise.
+ recentBoosts := make(map[string]int, t.cut)
+
+ // Iterate timeline ascending (i.e. oldest -> newest), marking
+ // entry IDs and marking down if boosts have been seen recently.
+ for idx, value := range t.cache.RangeUnsafe(structr.Asc) {
+
+ // Store current ID in map.
+ recentBoosts[value.ID] = idx
+
+ // If it's a boost, check if the original,
+ // or a boost of it has been seen recently.
+ if id := value.BoostOfID; id != "" {
+
+ // Check if seen recently.
+ last, ok := recentBoosts[id]
+ repeat := ok && (idx-last) < 40
+ value.repeatBoost = repeat
+
+ // Update last-seen idx.
+ recentBoosts[id] = idx
+ }
+ }
+
+ return n, nil
+}
+
+// Load will load given page of timeline statuses. First it
+// will prioritize fetching statuses from the sliding window
+// that is the timeline cache of latest statuses, else it will
+// fall back to loading from the database using callback funcs.
+// The returned string values are the low / high status ID
+// paging values, used in calculating next / prev page links.
+func (t *StatusTimeline) Load(
+ ctx context.Context,
+ page *paging.Page,
+
+ // loadPage should load the timeline of given page for cache hydration.
+ loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
+
+ // loadIDs should load status models with given IDs, this is used
+ // to load status models of already cached entries in the timeline.
+ loadIDs func(ids []string) (statuses []*gtsmodel.Status, err error),
+
+ // filter performs filtering of returned statuses.
+ filter func(each *gtsmodel.Status) (delete bool),
+
+ // prepareAPI should prepare internal status model to frontend API model.
+ prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error),
+) (
+ []*apimodel.Status,
+ string, // lo
+ string, // hi
+ error,
+) {
+ var err error
+
+ // Get paging details.
+ lo := page.Min.Value
+ hi := page.Max.Value
+ limit := page.Limit
+ order := page.Order()
+ dir := toDirection(order)
+
+ // Use a copy of current page so
+ // we can repeatedly update it.
+ nextPg := new(paging.Page)
+ *nextPg = *page
+ nextPg.Min.Value = lo
+ nextPg.Max.Value = hi
+
+ // Interstitial meta objects.
+ var metas []*StatusMeta
+
+ // Returned frontend API statuses.
+ var apiStatuses []*apimodel.Status
+
+ // TODO: we can remove this nil
+ // check when we've updated all
+ // our timeline endpoints to have
+ // streamed timeline caches.
+ if t != nil {
+
+ // Ensure timeline has been preloaded.
+ _, err = t.Preload(loadPage, filter)
+ if err != nil {
+ return nil, "", "", err
+ }
+
+ // First we attempt to load status
+ // metadata entries from the timeline
+ // cache, up to given limit.
+ metas = t.cache.Select(
+ util.PtrIf(lo),
+ util.PtrIf(hi),
+ util.PtrIf(limit),
+ dir,
+ )
+
+ if len(metas) > 0 {
+ // Before we can do any filtering, we need
+ // to load status models for cached entries.
+ err = loadStatuses(metas, loadIDs)
+ if err != nil {
+ return nil, "", "", gtserror.Newf("error loading statuses: %w", err)
+ }
+
+ // Set returned lo, hi values.
+ lo = metas[len(metas)-1].ID
+ hi = metas[0].ID
+
+ // Allocate slice of expected required API models.
+ apiStatuses = make([]*apimodel.Status, 0, len(metas))
+
+ // Prepare frontend API models for
+ // the cached statuses. For now this
+ // also does its own extra filtering.
+ apiStatuses = prepareStatuses(ctx,
+ metas,
+ prepareAPI,
+ apiStatuses,
+ limit,
+ )
+ }
+ }
+
+ // If no cached timeline statuses
+ // were found for page, we need to
+ // call through to the database.
+ if len(apiStatuses) == 0 {
+
+ // Pass through to main timeline db load function.
+ apiStatuses, lo, hi, err = loadStatusTimeline(ctx,
+ nextPg,
+ metas,
+ apiStatuses,
+ loadPage,
+ filter,
+ prepareAPI,
+ )
+ if err != nil {
+ return nil, "", "", err
+ }
+ }
+
+ if order.Ascending() {
+ // The caller always expects the statuses
+ // to be returned in DESC order, but we
+ // build the status slice in paging order.
+ // If paging ASC, we need to reverse the
+ // returned statuses and paging values.
+ slices.Reverse(apiStatuses)
+ lo, hi = hi, lo
+ }
+
+ return apiStatuses, lo, hi, nil
+}
+
+// loadStatusTimeline encapsulates the logic of iteratively
+// attempting to load a status timeline page from the database,
+// that is in the form of given callback functions. these will
+// then be prepared to frontend API models for return.
+//
+// in time it may make sense to move this logic
+// into the StatusTimeline{}.Load() function.
+func loadStatusTimeline(
+ ctx context.Context,
+ nextPg *paging.Page,
+ metas []*StatusMeta,
+ apiStatuses []*apimodel.Status,
+ loadPage func(page *paging.Page) (statuses []*gtsmodel.Status, err error),
+ filter func(each *gtsmodel.Status) (delete bool),
+ prepareAPI func(status *gtsmodel.Status) (apiStatus *apimodel.Status, err error),
+) (
+ []*apimodel.Status,
+ string, // lo
+ string, // hi
+ error,
+) {
+ if loadPage == nil {
+ panic("nil load page func")
+ }
+
+ // Lowest and highest ID
+ // vals of loaded statuses.
+ var lo, hi string
+
+ // Extract paging params.
+ order := nextPg.Order()
+ limit := nextPg.Limit
+
+ // Load a little more than
+ // limit to reduce db calls.
+ nextPg.Limit += 10
+
+ // Ensure we have a slice of meta objects to
+ // use in later preparation of the API models.
+ metas = xslices.GrowJust(metas[:0], nextPg.Limit)
+
+ // Ensure we have a slice of required frontend API models.
+ apiStatuses = xslices.GrowJust(apiStatuses[:0], nextPg.Limit)
+
+ // Perform maximum of 5 load
+ // attempts fetching statuses.
+ for i := 0; i < 5; i++ {
+
+ // Load next timeline statuses.
+ statuses, err := loadPage(nextPg)
+ if err != nil {
+ return nil, "", "", gtserror.Newf("error loading timeline: %w", err)
+ }
+
+ // No more statuses from
+ // load function = at end.
+ if len(statuses) == 0 {
+ break
+ }
+
+ if hi == "" {
+ // Set hi returned paging
+ // value if not already set.
+ hi = statuses[0].ID
+ }
+
+ // Update nextPg cursor parameter for next database query.
+ nextPageParams(nextPg, statuses[len(statuses)-1].ID, order)
+
+ // Perform any filtering on newly loaded statuses.
+ statuses = doStatusFilter(statuses, filter)
+
+ // After filtering no more
+ // statuses remain, retry.
+ if len(statuses) == 0 {
+ continue
+ }
+
+ // Convert to our interstitial meta type.
+ metas = toStatusMeta(metas[:0], statuses)
+
+ // Prepare frontend API models for
+ // the loaded statuses. For now this
+ // also does its own extra filtering.
+ apiStatuses = prepareStatuses(ctx,
+ metas,
+ prepareAPI,
+ apiStatuses,
+ limit,
+ )
+
+ // If we have anything, return
+ // here. Even if below limit.
+ if len(apiStatuses) > 0 {
+
+ // Set returned lo status paging value.
+ lo = apiStatuses[len(apiStatuses)-1].ID
+ break
+ }
+ }
+
+ return apiStatuses, lo, hi, nil
+}
+
+// InsertOne allows you to insert a single status into the timeline, with optional prepared API model.
+// The return value indicates whether status should be skipped from streams, e.g. if already boosted recently.
+func (t *StatusTimeline) InsertOne(status *gtsmodel.Status, prepared *apimodel.Status) (skip bool) {
+
+ // If timeline no preloaded, i.e.
+ // no-one using it, don't insert.
+ if !t.preloader.Check() {
+ return false
+ }
+
+ if status.BoostOfID != "" {
+ // Check through top $repeatBoostDepth number of items.
+ for i, value := range t.cache.RangeUnsafe(structr.Desc) {
+ if i >= repeatBoostDepth {
+ break
+ }
+
+ // We don't care about values that have
+ // already been hidden as repeat boosts.
+ if value.repeatBoost {
+ continue
+ }
+
+ // If inserted status has already been boosted, or original was posted
+ // within last $repeatBoostDepth, we indicate it as a repeated boost.
+ if value.ID == status.BoostOfID || value.BoostOfID == status.BoostOfID {
+ skip = true
+ break
+ }
+ }
+ }
+
+ // Insert new timeline status.
+ t.cache.Insert(&StatusMeta{
+ ID: status.ID,
+ AccountID: status.AccountID,
+ BoostOfID: status.BoostOfID,
+ BoostOfAccountID: status.BoostOfAccountID,
+ repeatBoost: skip,
+ loaded: nil,
+ prepared: prepared,
+ })
+
+ return
+}
+
+// RemoveByStatusID removes all cached timeline entries pertaining to
+// status ID, including those that may be a boost of the given status.
+func (t *StatusTimeline) RemoveByStatusIDs(statusIDs ...string) {
+ keys := make([]structr.Key, len(statusIDs))
+
+ // Nil check indices outside loops.
+ if t.idx_ID == nil ||
+ t.idx_BoostOfID == nil {
+ panic("indices are nil")
+ }
+
+ // Convert statusIDs to index keys.
+ for i, id := range statusIDs {
+ keys[i] = t.idx_ID.Key(id)
+ }
+
+ // Invalidate all cached entries with IDs.
+ t.cache.Invalidate(t.idx_ID, keys...)
+
+ // Convert statusIDs to index keys.
+ for i, id := range statusIDs {
+ keys[i] = t.idx_BoostOfID.Key(id)
+ }
+
+ // Invalidate all cached entries as boost of IDs.
+ t.cache.Invalidate(t.idx_BoostOfID, keys...)
+}
+
+// RemoveByAccountID removes all cached timeline entries authored by
+// account ID, including those that may be boosted by account ID.
+func (t *StatusTimeline) RemoveByAccountIDs(accountIDs ...string) {
+ keys := make([]structr.Key, len(accountIDs))
+
+ // Nil check indices outside loops.
+ if t.idx_AccountID == nil ||
+ t.idx_BoostOfAccountID == nil {
+ panic("indices are nil")
+ }
+
+ // Convert accountIDs to index keys.
+ for i, id := range accountIDs {
+ keys[i] = t.idx_AccountID.Key(id)
+ }
+
+ // Invalidate all cached entries as by IDs.
+ t.cache.Invalidate(t.idx_AccountID, keys...)
+
+ // Convert accountIDs to index keys.
+ for i, id := range accountIDs {
+ keys[i] = t.idx_BoostOfAccountID.Key(id)
+ }
+
+ // Invalidate all cached entries as boosted by IDs.
+ t.cache.Invalidate(t.idx_BoostOfAccountID, keys...)
+}
+
+// UnprepareByStatusIDs removes cached frontend API models for all cached
+// timeline entries pertaining to status ID, including boosts of given status.
+func (t *StatusTimeline) UnprepareByStatusIDs(statusIDs ...string) {
+ keys := make([]structr.Key, len(statusIDs))
+
+ // Nil check indices outside loops.
+ if t.idx_ID == nil ||
+ t.idx_BoostOfID == nil {
+ panic("indices are nil")
+ }
+
+ // Convert statusIDs to index keys.
+ for i, id := range statusIDs {
+ keys[i] = t.idx_ID.Key(id)
+ }
+
+ // Unprepare all statuses stored under StatusMeta.ID.
+ for meta := range t.cache.RangeKeysUnsafe(t.idx_ID, keys...) {
+ meta.prepared = nil
+ }
+
+ // Convert statusIDs to index keys.
+ for i, id := range statusIDs {
+ keys[i] = t.idx_BoostOfID.Key(id)
+ }
+
+ // Unprepare all statuses stored under StatusMeta.BoostOfID.
+ for meta := range t.cache.RangeKeysUnsafe(t.idx_BoostOfID, keys...) {
+ meta.prepared = nil
+ }
+}
+
+// UnprepareByAccountIDs removes cached frontend API models for all cached
+// timeline entries authored by account ID, including boosts by account ID.
+func (t *StatusTimeline) UnprepareByAccountIDs(accountIDs ...string) {
+ keys := make([]structr.Key, len(accountIDs))
+
+ // Nil check indices outside loops.
+ if t.idx_AccountID == nil ||
+ t.idx_BoostOfAccountID == nil {
+ panic("indices are nil")
+ }
+
+ // Convert accountIDs to index keys.
+ for i, id := range accountIDs {
+ keys[i] = t.idx_AccountID.Key(id)
+ }
+
+ // Unprepare all statuses stored under StatusMeta.AccountID.
+ for meta := range t.cache.RangeKeysUnsafe(t.idx_AccountID, keys...) {
+ meta.prepared = nil
+ }
+
+ // Convert accountIDs to index keys.
+ for i, id := range accountIDs {
+ keys[i] = t.idx_BoostOfAccountID.Key(id)
+ }
+
+ // Unprepare all statuses stored under StatusMeta.BoostOfAccountID.
+ for meta := range t.cache.RangeKeysUnsafe(t.idx_BoostOfAccountID, keys...) {
+ meta.prepared = nil
+ }
+}
+
+// UnprepareAll removes cached frontend API
+// models for all cached timeline entries.
+func (t *StatusTimeline) UnprepareAll() {
+ for _, value := range t.cache.RangeUnsafe(structr.Asc) {
+ value.prepared = nil
+ }
+}
+
+// Trim will ensure that receiving timeline is less than or
+// equal in length to the given threshold percentage of the
+// timeline's preconfigured maximum capacity. This will always
+// trim from the bottom-up to prioritize streamed inserts.
+func (t *StatusTimeline) Trim() { t.cache.Trim(t.cut, structr.Asc) }
+
+// Clear will mark the entire timeline as requiring preload,
+// which will trigger a clear and reload of the entire thing.
+func (t *StatusTimeline) Clear() { t.preloader.Clear() }
+
+// prepareStatuses takes a slice of cached (or, freshly loaded!) StatusMeta{}
+// models, and use given function to return prepared frontend API models.
+func prepareStatuses(
+ ctx context.Context,
+ meta []*StatusMeta,
+ prepareAPI func(*gtsmodel.Status) (*apimodel.Status, error),
+ apiStatuses []*apimodel.Status,
+ limit int,
+) []*apimodel.Status {
+ switch { //nolint:gocritic
+ case prepareAPI == nil:
+ panic("nil prepare fn")
+ }
+
+ // Iterate the given StatusMeta objects for pre-prepared
+ // frontend models, otherwise attempting to prepare them.
+ for _, meta := range meta {
+
+ // Check if we have prepared enough
+ // API statuses for caller to return.
+ if len(apiStatuses) >= limit {
+ break
+ }
+
+ if meta.loaded == nil {
+ // We failed loading this
+ // status, skip preparing.
+ continue
+ }
+
+ if meta.repeatBoost {
+ // This is a repeat boost in
+ // short timespan, skip it.
+ continue
+ }
+
+ if meta.prepared == nil {
+ var err error
+
+ // Prepare the provided status to frontend.
+ meta.prepared, err = prepareAPI(meta.loaded)
+ if err != nil {
+ log.Errorf(ctx, "error preparing status %s: %v", meta.loaded.URI, err)
+ continue
+ }
+ }
+
+ // Append to return slice.
+ if meta.prepared != nil {
+ apiStatuses = append(apiStatuses, meta.prepared)
+ }
+ }
+
+ return apiStatuses
+}
+
+// loadStatuses loads statuses using provided callback
+// for the statuses in meta slice that aren't loaded.
+// the amount very much depends on whether meta objects
+// are yet-to-be-cached (i.e. newly loaded, with status),
+// or are from the timeline cache (unloaded status).
+func loadStatuses(
+ metas []*StatusMeta,
+ loadIDs func([]string) ([]*gtsmodel.Status, error),
+) error {
+
+ // Determine which of our passed status
+ // meta objects still need statuses loading.
+ toLoadIDs := make([]string, len(metas))
+ loadedMap := make(map[string]*StatusMeta, len(metas))
+ for i, meta := range metas {
+ if meta.loaded == nil {
+ toLoadIDs[i] = meta.ID
+ loadedMap[meta.ID] = meta
+ }
+ }
+
+ // Load statuses with given IDs.
+ loaded, err := loadIDs(toLoadIDs)
+ if err != nil {
+ return gtserror.Newf("error loading statuses: %w", err)
+ }
+
+ // Update returned StatusMeta objects
+ // with newly loaded statuses by IDs.
+ for i := range loaded {
+ status := loaded[i]
+ meta := loadedMap[status.ID]
+ meta.loaded = status
+ }
+
+ return nil
+}
+
+// toStatusMeta converts a slice of database model statuses
+// into our cache wrapper type, a slice of []StatusMeta{}.
+func toStatusMeta(in []*StatusMeta, statuses []*gtsmodel.Status) []*StatusMeta {
+ return xslices.Gather(in, statuses, func(s *gtsmodel.Status) *StatusMeta {
+ return &StatusMeta{
+ ID: s.ID,
+ AccountID: s.AccountID,
+ BoostOfID: s.BoostOfID,
+ BoostOfAccountID: s.BoostOfAccountID,
+ loaded: s,
+ prepared: nil,
+ }
+ })
+}
+
+// doStatusFilter performs given filter function on provided statuses,
+func doStatusFilter(statuses []*gtsmodel.Status, filter func(*gtsmodel.Status) bool) []*gtsmodel.Status {
+
+ // Check for provided
+ // filter function.
+ if filter == nil {
+ return statuses
+ }
+
+ // Filter the provided input statuses.
+ return slices.DeleteFunc(statuses, filter)
+}
diff --git a/internal/cache/timeline/status_map.go b/internal/cache/timeline/status_map.go
new file mode 100644
index 000000000..e402883af
--- /dev/null
+++ b/internal/cache/timeline/status_map.go
@@ -0,0 +1,198 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package timeline
+
+import (
+ "maps"
+ "sync/atomic"
+)
+
+// StatusTimelines is a concurrency safe map of StatusTimeline{}
+// objects, optimizing *very heavily* for reads over writes.
+type StatusTimelines struct {
+ ptr atomic.Pointer[map[string]*StatusTimeline] // ronly except by CAS
+ cap int
+}
+
+// Init stores the given argument(s) such that any created StatusTimeline{}
+// objects by MustGet() will initialize them with the given arguments.
+func (t *StatusTimelines) Init(cap int) { t.cap = cap }
+
+// MustGet will attempt to fetch StatusTimeline{} stored under key, else creating one.
+func (t *StatusTimelines) MustGet(key string) *StatusTimeline {
+ var tt *StatusTimeline
+
+ for {
+ // Load current ptr.
+ cur := t.ptr.Load()
+
+ // Get timeline map to work on.
+ var m map[string]*StatusTimeline
+
+ if cur != nil {
+ // Look for existing
+ // timeline in cache.
+ tt = (*cur)[key]
+ if tt != nil {
+ return tt
+ }
+
+ // Get clone of current
+ // before modifications.
+ m = maps.Clone(*cur)
+ } else {
+ // Allocate new timeline map for below.
+ m = make(map[string]*StatusTimeline)
+ }
+
+ if tt == nil {
+ // Allocate new timeline.
+ tt = new(StatusTimeline)
+ tt.Init(t.cap)
+ }
+
+ // Store timeline
+ // in new map.
+ m[key] = tt
+
+ // Attempt to update the map ptr.
+ if !t.ptr.CompareAndSwap(cur, &m) {
+
+ // We failed the
+ // CAS, reloop.
+ continue
+ }
+
+ // Successfully inserted
+ // new timeline model.
+ return tt
+ }
+}
+
+// Delete will delete the stored StatusTimeline{} under key, if any.
+func (t *StatusTimelines) Delete(key string) {
+ for {
+ // Load current ptr.
+ cur := t.ptr.Load()
+
+ // Check for empty map / not in map.
+ if cur == nil || (*cur)[key] == nil {
+ return
+ }
+
+ // Get clone of current
+ // before modifications.
+ m := maps.Clone(*cur)
+
+ // Delete ID.
+ delete(m, key)
+
+ // Attempt to update the map ptr.
+ if !t.ptr.CompareAndSwap(cur, &m) {
+
+ // We failed the
+ // CAS, reloop.
+ continue
+ }
+
+ // Successfully
+ // deleted ID.
+ return
+ }
+}
+
+// RemoveByStatusIDs calls RemoveByStatusIDs() for each of the stored StatusTimeline{}s.
+func (t *StatusTimelines) RemoveByStatusIDs(statusIDs ...string) {
+ if p := t.ptr.Load(); p != nil {
+ for _, tt := range *p {
+ tt.RemoveByStatusIDs(statusIDs...)
+ }
+ }
+}
+
+// RemoveByAccountIDs calls RemoveByAccountIDs() for each of the stored StatusTimeline{}s.
+func (t *StatusTimelines) RemoveByAccountIDs(accountIDs ...string) {
+ if p := t.ptr.Load(); p != nil {
+ for _, tt := range *p {
+ tt.RemoveByAccountIDs(accountIDs...)
+ }
+ }
+}
+
+// UnprepareByStatusIDs calls UnprepareByStatusIDs() for each of the stored StatusTimeline{}s.
+func (t *StatusTimelines) UnprepareByStatusIDs(statusIDs ...string) {
+ if p := t.ptr.Load(); p != nil {
+ for _, tt := range *p {
+ tt.UnprepareByStatusIDs(statusIDs...)
+ }
+ }
+}
+
+// UnprepareByAccountIDs calls UnprepareByAccountIDs() for each of the stored StatusTimeline{}s.
+func (t *StatusTimelines) UnprepareByAccountIDs(accountIDs ...string) {
+ if p := t.ptr.Load(); p != nil {
+ for _, tt := range *p {
+ tt.UnprepareByAccountIDs(accountIDs...)
+ }
+ }
+}
+
+// Unprepare attempts to call UnprepareAll() for StatusTimeline{} under key.
+func (t *StatusTimelines) Unprepare(key string) {
+ if p := t.ptr.Load(); p != nil {
+ if tt := (*p)[key]; tt != nil {
+ tt.UnprepareAll()
+ }
+ }
+}
+
+// UnprepareAll calls UnprepareAll() for each of the stored StatusTimeline{}s.
+func (t *StatusTimelines) UnprepareAll() {
+ if p := t.ptr.Load(); p != nil {
+ for _, tt := range *p {
+ tt.UnprepareAll()
+ }
+ }
+}
+
+// Trim calls Trim() for each of the stored StatusTimeline{}s.
+func (t *StatusTimelines) Trim() {
+ if p := t.ptr.Load(); p != nil {
+ for _, tt := range *p {
+ tt.Trim()
+ }
+ }
+}
+
+// Clear attempts to call Clear() for StatusTimeline{} under key.
+func (t *StatusTimelines) Clear(key string) {
+ if p := t.ptr.Load(); p != nil {
+ if tt := (*p)[key]; tt != nil {
+ tt.Clear()
+ }
+ }
+}
+
+// ClearAll calls Clear() for each of the stored StatusTimeline{}s.
+func (t *StatusTimelines) ClearAll() {
+ if p := t.ptr.Load(); p != nil {
+ for _, tt := range *p {
+ tt.Clear()
+ }
+ }
+}
diff --git a/internal/cache/timeline/status_test.go b/internal/cache/timeline/status_test.go
new file mode 100644
index 000000000..3e53d8256
--- /dev/null
+++ b/internal/cache/timeline/status_test.go
@@ -0,0 +1,361 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package timeline
+
+import (
+ "slices"
+ "testing"
+
+ "codeberg.org/gruf/go-structr"
+ "github.com/stretchr/testify/assert"
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+var testStatusMeta = []*StatusMeta{
+ {
+ ID: "06B19VYTHEG01F3YW13RQE0QM8",
+ AccountID: "06B1A61MZEBBVDSNPRJAA8F2C4",
+ BoostOfID: "06B1A5KQWGQ1ABM3FA7TDX1PK8",
+ BoostOfAccountID: "06B1A6707818050PCK8SJAEC6G",
+ },
+ {
+ ID: "06B19VYTJFT0KDWT5C1CPY0XNC",
+ AccountID: "06B1A61MZN3ZQPZVNGEFBNYBJW",
+ BoostOfID: "06B1A5KQWSGFN4NNRV34KV5S9R",
+ BoostOfAccountID: "06B1A6707HY8RAXG7JPCWR7XD4",
+ },
+ {
+ ID: "06B19VYTJ6WZQPRVNJHPEZH04W",
+ AccountID: "06B1A61MZY7E0YB6G01VJX8ERR",
+ BoostOfID: "06B1A5KQX5NPGSYGH8NC7HR1GR",
+ BoostOfAccountID: "06B1A6707XCSAF0MVCGGYF9160",
+ },
+ {
+ ID: "06B19VYTJPKGG8JYCR1ENAV7KC",
+ AccountID: "06B1A61N07K1GC35PJ3CZ4M020",
+ BoostOfID: "06B1A5KQXG6ZCWE1R7C7KR7RYW",
+ BoostOfAccountID: "06B1A67084W6SB6P6HJB7K5DSG",
+ },
+ {
+ ID: "06B19VYTHRR8S35QXC5A6VE2YW",
+ AccountID: "06B1A61N0P1TGQDVKANNG4AKP4",
+ BoostOfID: "06B1A5KQY3K839Z6S5HHAJKSWW",
+ BoostOfAccountID: "06B1A6708SPJC3X3ZG3SGG8BN8",
+ },
+}
+
+func TestStatusTimelineUnprepare(t *testing.T) {
+ var tt StatusTimeline
+ tt.Init(1000)
+
+ // Clone the input test status data.
+ data := slices.Clone(testStatusMeta)
+
+ // Bodge some 'prepared'
+ // models on test data.
+ for _, meta := range data {
+ meta.prepared = &apimodel.Status{}
+ }
+
+ // Insert test data into timeline.
+ _ = tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Unprepare this status with ID.
+ tt.UnprepareByStatusIDs(meta.ID)
+
+ // Check the item is unprepared.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value.prepared)
+ }
+
+ // Clear and reinsert.
+ tt.cache.Clear()
+ tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Unprepare this status with boost ID.
+ tt.UnprepareByStatusIDs(meta.BoostOfID)
+
+ // Check the item is unprepared.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value.prepared)
+ }
+
+ // Clear and reinsert.
+ tt.cache.Clear()
+ tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Unprepare this status with account ID.
+ tt.UnprepareByAccountIDs(meta.AccountID)
+
+ // Check the item is unprepared.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value.prepared)
+ }
+
+ // Clear and reinsert.
+ tt.cache.Clear()
+ tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Unprepare this status with boost account ID.
+ tt.UnprepareByAccountIDs(meta.BoostOfAccountID)
+
+ // Check the item is unprepared.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value.prepared)
+ }
+}
+
+func TestStatusTimelineRemove(t *testing.T) {
+ var tt StatusTimeline
+ tt.Init(1000)
+
+ // Clone the input test status data.
+ data := slices.Clone(testStatusMeta)
+
+ // Insert test data into timeline.
+ _ = tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Remove this status with ID.
+ tt.RemoveByStatusIDs(meta.ID)
+
+ // Check the item is now gone.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value)
+ }
+
+ // Clear and reinsert.
+ tt.cache.Clear()
+ tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Remove this status with boost ID.
+ tt.RemoveByStatusIDs(meta.BoostOfID)
+
+ // Check the item is now gone.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value)
+ }
+
+ // Clear and reinsert.
+ tt.cache.Clear()
+ tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Remove this status with account ID.
+ tt.RemoveByAccountIDs(meta.AccountID)
+
+ // Check the item is now gone.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value)
+ }
+
+ // Clear and reinsert.
+ tt.cache.Clear()
+ tt.cache.Insert(data...)
+
+ for _, meta := range data {
+ // Remove this status with boost account ID.
+ tt.RemoveByAccountIDs(meta.BoostOfAccountID)
+
+ // Check the item is now gone.
+ value := getStatusByID(&tt, meta.ID)
+ assert.Nil(t, value)
+ }
+}
+
+func TestStatusTimelineInserts(t *testing.T) {
+ var tt StatusTimeline
+ tt.Init(1000)
+
+ // Clone the input test status data.
+ data := slices.Clone(testStatusMeta)
+
+ // Insert test data into timeline.
+ l := tt.cache.Insert(data...)
+ assert.Equal(t, len(data), l)
+
+ // Ensure 'min' value status
+ // in the timeline is expected.
+ minID := minStatusID(data)
+ assert.Equal(t, minID, minStatus(&tt).ID)
+
+ // Ensure 'max' value status
+ // in the timeline is expected.
+ maxID := maxStatusID(data)
+ assert.Equal(t, maxID, maxStatus(&tt).ID)
+
+ // Manually mark timeline as 'preloaded'.
+ tt.preloader.CheckPreload(tt.preloader.Done)
+
+ // Specifically craft a boost of latest (i.e. max) status in timeline.
+ boost := &gtsmodel.Status{ID: "06B1A00PQWDZZH9WK9P5VND35C", BoostOfID: maxID}
+
+ // Insert boost into the timeline
+ // checking for 'repeatBoost' notifier.
+ repeatBoost := tt.InsertOne(boost, nil)
+ assert.True(t, repeatBoost)
+
+ // This should be the new 'max'
+ // and have 'repeatBoost' set.
+ newMax := maxStatus(&tt)
+ assert.Equal(t, boost.ID, newMax.ID)
+ assert.True(t, newMax.repeatBoost)
+
+ // Specifically craft 2 boosts of some unseen status in the timeline.
+ boost1 := &gtsmodel.Status{ID: "06B1A121YEX02S0AY48X93JMDW", BoostOfID: "unseen"}
+ boost2 := &gtsmodel.Status{ID: "06B1A12TG2NTJC9P270EQXS08M", BoostOfID: "unseen"}
+
+ // Insert boosts into the timeline, ensuring
+ // first is not 'repeat', but second one is.
+ repeatBoost1 := tt.InsertOne(boost1, nil)
+ repeatBoost2 := tt.InsertOne(boost2, nil)
+ assert.False(t, repeatBoost1)
+ assert.True(t, repeatBoost2)
+}
+
+func TestStatusTimelineTrim(t *testing.T) {
+ var tt StatusTimeline
+ tt.Init(1000)
+
+ // Clone the input test status data.
+ data := slices.Clone(testStatusMeta)
+
+ // Insert test data into timeline.
+ _ = tt.cache.Insert(data...)
+
+ // From here it'll be easier to have DESC sorted
+ // test data for reslicing and checking against.
+ slices.SortFunc(data, func(a, b *StatusMeta) int {
+ const k = +1
+ switch {
+ case a.ID < b.ID:
+ return +k
+ case b.ID < a.ID:
+ return -k
+ default:
+ return 0
+ }
+ })
+
+ // Set manual cutoff for trim.
+ tt.cut = len(data) - 1
+
+ // Perform trim.
+ tt.Trim()
+
+ // The post trim length should be tt.cut
+ assert.Equal(t, tt.cut, tt.cache.Len())
+
+ // It specifically should have removed
+ // the oldest (i.e. min) status element.
+ minID := data[len(data)-1].ID
+ assert.NotEqual(t, minID, minStatus(&tt).ID)
+ assert.False(t, containsStatusID(&tt, minID))
+
+ // Drop trimmed status.
+ data = data[:len(data)-1]
+
+ // Set smaller cutoff for trim.
+ tt.cut = len(data) - 2
+
+ // Perform trim.
+ tt.Trim()
+
+ // The post trim length should be tt.cut
+ assert.Equal(t, tt.cut, tt.cache.Len())
+
+ // It specifically should have removed
+ // the oldest 2 (i.e. min) status elements.
+ minID1 := data[len(data)-1].ID
+ minID2 := data[len(data)-2].ID
+ assert.NotEqual(t, minID1, minStatus(&tt).ID)
+ assert.NotEqual(t, minID2, minStatus(&tt).ID)
+ assert.False(t, containsStatusID(&tt, minID1))
+ assert.False(t, containsStatusID(&tt, minID2))
+
+ // Trim at desired length
+ // should cause no change.
+ before := tt.cache.Len()
+ tt.Trim()
+ assert.Equal(t, before, tt.cache.Len())
+}
+
+// containsStatusID returns whether timeline contains a status with ID.
+func containsStatusID(t *StatusTimeline, id string) bool {
+ return getStatusByID(t, id) != nil
+}
+
+// getStatusByID attempts to fetch status with given ID from timeline.
+func getStatusByID(t *StatusTimeline, id string) *StatusMeta {
+ for _, value := range t.cache.Range(structr.Desc) {
+ if value.ID == id {
+ return value
+ }
+ }
+ return nil
+}
+
+// maxStatus returns the newest (i.e. highest value ID) status in timeline.
+func maxStatus(t *StatusTimeline) *StatusMeta {
+ var meta *StatusMeta
+ for _, value := range t.cache.Range(structr.Desc) {
+ meta = value
+ break
+ }
+ return meta
+}
+
+// minStatus returns the oldest (i.e. lowest value ID) status in timeline.
+func minStatus(t *StatusTimeline) *StatusMeta {
+ var meta *StatusMeta
+ for _, value := range t.cache.Range(structr.Asc) {
+ meta = value
+ break
+ }
+ return meta
+}
+
+// minStatusID returns the oldest (i.e. lowest value ID) status in metas.
+func minStatusID(metas []*StatusMeta) string {
+ var min string
+ min = metas[0].ID
+ for i := 1; i < len(metas); i++ {
+ if metas[i].ID < min {
+ min = metas[i].ID
+ }
+ }
+ return min
+}
+
+// maxStatusID returns the newest (i.e. highest value ID) status in metas.
+func maxStatusID(metas []*StatusMeta) string {
+ var max string
+ max = metas[0].ID
+ for i := 1; i < len(metas); i++ {
+ if metas[i].ID > max {
+ max = metas[i].ID
+ }
+ }
+ return max
+}
diff --git a/internal/cache/timeline/timeline.go b/internal/cache/timeline/timeline.go
new file mode 100644
index 000000000..4f8797e82
--- /dev/null
+++ b/internal/cache/timeline/timeline.go
@@ -0,0 +1,59 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package timeline
+
+import (
+ "time"
+
+ "codeberg.org/gruf/go-structr"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/paging"
+)
+
+// plus1hULID returns a ULID for now+1h.
+func plus1hULID() string {
+ t := time.Now().Add(time.Hour)
+ return id.NewULIDFromTime(t)
+}
+
+// nextPageParams gets the next set of paging
+// parameters to use based on the current set,
+// and the next set of lo / hi values. This will
+// correctly handle making sure that, depending
+// on the paging order, the cursor value gets
+// updated while maintaining the boundary value.
+func nextPageParams(
+ page *paging.Page,
+ lastIdx string,
+ order paging.Order,
+) {
+ if order.Ascending() {
+ page.Min.Value = lastIdx
+ } else /* i.e. descending */ { //nolint:revive
+ page.Max.Value = lastIdx
+ }
+}
+
+// toDirection converts page order to timeline direction.
+func toDirection(order paging.Order) structr.Direction {
+ if order.Ascending() {
+ return structr.Asc
+ } else /* i.e. descending */ { //nolint:revive
+ return structr.Desc
+ }
+}
diff --git a/internal/cache/wrappers.go b/internal/cache/wrappers.go
index 9cb4fca98..34d7cb8db 100644
--- a/internal/cache/wrappers.go
+++ b/internal/cache/wrappers.go
@@ -27,19 +27,19 @@ import (
// SliceCache wraps a simple.Cache to provide simple loader-callback
// functions for fetching + caching slices of objects (e.g. IDs).
type SliceCache[T any] struct {
- cache simple.Cache[string, []T]
+ simple.Cache[string, []T]
}
// Init initializes the cache with given length + capacity.
func (c *SliceCache[T]) Init(len, cap int) {
- c.cache = simple.Cache[string, []T]{}
- c.cache.Init(len, cap)
+ c.Cache = simple.Cache[string, []T]{}
+ c.Cache.Init(len, cap)
}
// Load will attempt to load an existing slice from cache for key, else calling load function and caching the result.
func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error) {
// Look for cached values.
- data, ok := c.cache.Get(key)
+ data, ok := c.Cache.Get(key)
if !ok {
var err error
@@ -51,7 +51,7 @@ func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error)
}
// Store the data.
- c.cache.Set(key, data)
+ c.Cache.Set(key, data)
}
// Return data clone for safety.
@@ -60,27 +60,7 @@ func (c *SliceCache[T]) Load(key string, load func() ([]T, error)) ([]T, error)
// Invalidate: see simple.Cache{}.InvalidateAll().
func (c *SliceCache[T]) Invalidate(keys ...string) {
- _ = c.cache.InvalidateAll(keys...)
-}
-
-// Trim: see simple.Cache{}.Trim().
-func (c *SliceCache[T]) Trim(perc float64) {
- c.cache.Trim(perc)
-}
-
-// Clear: see simple.Cache{}.Clear().
-func (c *SliceCache[T]) Clear() {
- c.cache.Clear()
-}
-
-// Len: see simple.Cache{}.Len().
-func (c *SliceCache[T]) Len() int {
- return c.cache.Len()
-}
-
-// Cap: see simple.Cache{}.Cap().
-func (c *SliceCache[T]) Cap() int {
- return c.cache.Cap()
+ _ = c.Cache.InvalidateAll(keys...)
}
// StructCache wraps a structr.Cache{} to simple index caching
@@ -89,17 +69,17 @@ func (c *SliceCache[T]) Cap() int {
// name under the main database caches struct which would reduce
// time required to access cached values).
type StructCache[StructType any] struct {
- cache structr.Cache[StructType]
+ structr.Cache[StructType]
index map[string]*structr.Index
}
// Init initializes the cache with given structr.CacheConfig{}.
func (c *StructCache[T]) Init(config structr.CacheConfig[T]) {
c.index = make(map[string]*structr.Index, len(config.Indices))
- c.cache = structr.Cache[T]{}
- c.cache.Init(config)
+ c.Cache = structr.Cache[T]{}
+ c.Cache.Init(config)
for _, cfg := range config.Indices {
- c.index[cfg.Fields] = c.cache.Index(cfg.Fields)
+ c.index[cfg.Fields] = c.Cache.Index(cfg.Fields)
}
}
@@ -107,26 +87,21 @@ func (c *StructCache[T]) Init(config structr.CacheConfig[T]) {
// Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}.
func (c *StructCache[T]) GetOne(index string, key ...any) (T, bool) {
i := c.index[index]
- return c.cache.GetOne(i, i.Key(key...))
+ return c.Cache.GetOne(i, i.Key(key...))
}
// Get calls structr.Cache{}.Get(), using a cached structr.Index{} by 'index' name.
// Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}.
func (c *StructCache[T]) Get(index string, keys ...[]any) []T {
i := c.index[index]
- return c.cache.Get(i, i.Keys(keys...)...)
-}
-
-// Put: see structr.Cache{}.Put().
-func (c *StructCache[T]) Put(values ...T) {
- c.cache.Put(values...)
+ return c.Cache.Get(i, i.Keys(keys...)...)
}
// LoadOne calls structr.Cache{}.LoadOne(), using a cached structr.Index{} by 'index' name.
// Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}.
func (c *StructCache[T]) LoadOne(index string, load func() (T, error), key ...any) (T, error) {
i := c.index[index]
- return c.cache.LoadOne(i, i.Key(key...), load)
+ return c.Cache.LoadOne(i, i.Key(key...), load)
}
// LoadIDs calls structr.Cache{}.Load(), using a cached structr.Index{} by 'index' name. Note: this also handles
@@ -149,7 +124,7 @@ func (c *StructCache[T]) LoadIDs(index string, ids []string, load func([]string)
}
// Pass loader callback with wrapper onto main cache load function.
- return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) {
+ return c.Cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) {
uncachedIDs := make([]string, len(uncached))
for i := range uncached {
uncachedIDs[i] = uncached[i].Values()[0].(string)
@@ -177,7 +152,7 @@ func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, l
}
// Pass loader callback with wrapper onto main cache load function.
- return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) {
+ return c.Cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) {
uncachedIDs := make([]string, len(uncached))
for i := range uncached {
uncachedIDs[i] = uncached[i].Values()[1].(string)
@@ -186,16 +161,11 @@ func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, l
})
}
-// Store: see structr.Cache{}.Store().
-func (c *StructCache[T]) Store(value T, store func() error) error {
- return c.cache.Store(value, store)
-}
-
// Invalidate calls structr.Cache{}.Invalidate(), using a cached structr.Index{} by 'index' name.
// Note: this also handles conversion of the untyped (any) keys to structr.Key{} via structr.Index{}.
func (c *StructCache[T]) Invalidate(index string, key ...any) {
i := c.index[index]
- c.cache.Invalidate(i, i.Key(key...))
+ c.Cache.Invalidate(i, i.Key(key...))
}
// InvalidateIDs calls structr.Cache{}.Invalidate(), using a cached structr.Index{} by 'index' name. Note: this also
@@ -218,25 +188,5 @@ func (c *StructCache[T]) InvalidateIDs(index string, ids []string) {
}
// Pass to main invalidate func.
- c.cache.Invalidate(i, keys...)
-}
-
-// Trim: see structr.Cache{}.Trim().
-func (c *StructCache[T]) Trim(perc float64) {
- c.cache.Trim(perc)
-}
-
-// Clear: see structr.Cache{}.Clear().
-func (c *StructCache[T]) Clear() {
- c.cache.Clear()
-}
-
-// Len: see structr.Cache{}.Len().
-func (c *StructCache[T]) Len() int {
- return c.cache.Len()
-}
-
-// Cap: see structr.Cache{}.Cap().
-func (c *StructCache[T]) Cap() int {
- return c.cache.Cap()
+ c.Cache.Invalidate(i, keys...)
}