diff options
Diffstat (limited to 'internal/timeline/manager.go')
-rw-r--r-- | internal/timeline/manager.go | 232 |
1 files changed, 84 insertions, 148 deletions
diff --git a/internal/timeline/manager.go b/internal/timeline/manager.go index c3b6b8b0f..f34cee787 100644 --- a/internal/timeline/manager.go +++ b/internal/timeline/manager.go @@ -20,14 +20,18 @@ package timeline import ( "context" "fmt" - "strings" "sync" "time" - "codeberg.org/gruf/go-kv" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" ) +const ( + pruneLengthIndexed = 400 + pruneLengthPrepared = 50 +) + // Manager abstracts functions for creating timelines for multiple accounts, and adding, removing, and fetching entries from those timelines. // // By the time a timelineable hits the manager interface, it should already have been filtered and it should be established that the item indeed @@ -41,38 +45,37 @@ import ( // Prepared items consist of the item's database ID, the time it was created, AND the apimodel representation of that item, for quick serialization. // Prepared items of course take up more memory than indexed items, so they should be regularly pruned if they're not being actively served. type Manager interface { - // Ingest takes one item and indexes it into the timeline for the given account ID. - // - // It should already be established before calling this function that the item actually belongs in the timeline! - // - // The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where - // the item is a boosted status, but a boost of the original status or the status itself already exists recently in the timeline. - Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) - // IngestAndPrepare takes one timelineable and indexes it into the timeline for the given account ID, and then immediately prepares it for serving. + // IngestOne takes one timelineable and indexes it into the timeline for the given account ID, and then immediately prepares it for serving. // This is useful in cases where we know the item will need to be shown at the top of a user's timeline immediately (eg., a new status is created). // // It should already be established before calling this function that the item actually belongs in the timeline! // // The returned bool indicates whether the item was actually put in the timeline. This could be false in cases where // a status is a boost, but a boost of the original status or the status itself already exists recently in the timeline. - IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) + IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error) + // GetTimeline returns limit n amount of prepared entries from the timeline of the given account ID, in descending chronological order. - // If maxID is provided, it will return prepared entries from that maxID onwards, inclusive. GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) - // GetIndexedLength returns the amount of items that have been *indexed* for the given account ID. - GetIndexedLength(ctx context.Context, timelineAccountID string) int + + // GetIndexedLength returns the amount of items that have been indexed for the given account ID. + GetIndexedLength(ctx context.Context, accountID string) int + // GetOldestIndexedID returns the id ID for the oldest item that we have indexed for the given account. - GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) - // PrepareXFromTop prepares limit n amount of items, based on their indexed representations, from the top of the index. - PrepareXFromTop(ctx context.Context, timelineAccountID string, limit int) error + // Will be an empty string if nothing is (yet) indexed. + GetOldestIndexedID(ctx context.Context, accountID string) string + // Remove removes one item from the timeline of the given timelineAccountID - Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error) + Remove(ctx context.Context, accountID string, itemID string) (int, error) + // WipeItemFromAllTimelines removes one item from the index and prepared items of all timelines WipeItemFromAllTimelines(ctx context.Context, itemID string) error + // WipeStatusesFromAccountID removes all items by the given accountID from the timelineAccountID's timelines. WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error + // Start starts hourly cleanup jobs for this timeline manager. Start() error + // Stop stops the timeline manager (currently a stub, doesn't do anything). Stop() error } @@ -97,31 +100,41 @@ type manager struct { } func (m *manager) Start() error { - // range through all timelines in the sync map once per hour + prune as necessary + // Start a background goroutine which iterates + // through all stored timelines once per hour, + // and cleans up old entries if that timeline + // hasn't been accessed in the last hour. go func() { for now := range time.NewTicker(1 * time.Hour).C { - m.accountTimelines.Range(func(key any, value any) bool { - timelineAccountID, ok := key.(string) + // Define the range function inside here, + // so that we can use the 'now' returned + // by the ticker, instead of having to call + // time.Now() multiple times. + // + // Unless it panics, this function always + // returns 'true', to continue the Range + // call through the sync.Map. + f := func(_ any, v any) bool { + timeline, ok := v.(Timeline) if !ok { - panic("couldn't parse timeline manager sync map key as string, this should never happen so panic") + log.Panic(nil, "couldn't parse timeline manager sync map value as Timeline, this should never happen so panic") } - t, ok := value.(Timeline) - if !ok { - panic("couldn't parse timeline manager sync map value as Timeline, this should never happen so panic") + if now.Sub(timeline.LastGot()) < 1*time.Hour { + // Timeline has been fetched in the + // last hour, move on to the next one. + return true } - anHourAgo := now.Add(-1 * time.Hour) - if lastGot := t.LastGot(); lastGot.Before(anHourAgo) { - amountPruned := t.Prune(defaultDesiredPreparedItemsLength, defaultDesiredIndexedItemsLength) - log.WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"amountPruned", amountPruned}, - }...).Info("pruned indexed and prepared items from timeline") + if amountPruned := timeline.Prune(pruneLengthPrepared, pruneLengthIndexed); amountPruned > 0 { + log.WithField("accountID", timeline.AccountID()).Infof("pruned %d indexed and prepared items from timeline", amountPruned) } return true - }) + } + + // Execute the function for each timeline. + m.accountTimelines.Range(f) } }() @@ -132,146 +145,69 @@ func (m *manager) Stop() error { return nil } -func (m *manager) Ingest(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"itemID", item.GetID()}, - }...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return false, err - } - - l.Trace("ingesting item") - return t.IndexOne(ctx, item.GetID(), item.GetBoostOfID(), item.GetAccountID(), item.GetBoostOfAccountID()) +func (m *manager) IngestOne(ctx context.Context, accountID string, item Timelineable) (bool, error) { + return m.getOrCreateTimeline(ctx, accountID).IndexAndPrepareOne( + ctx, + item.GetID(), + item.GetBoostOfID(), + item.GetAccountID(), + item.GetBoostOfAccountID(), + ) } -func (m *manager) IngestAndPrepare(ctx context.Context, item Timelineable, timelineAccountID string) (bool, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"itemID", item.GetID()}, - }...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return false, err - } - - l.Trace("ingesting item") - return t.IndexAndPrepareOne(ctx, item.GetID(), item.GetBoostOfID(), item.GetAccountID(), item.GetBoostOfAccountID()) +func (m *manager) Remove(ctx context.Context, accountID string, itemID string) (int, error) { + return m.getOrCreateTimeline(ctx, accountID).Remove(ctx, itemID) } -func (m *manager) Remove(ctx context.Context, timelineAccountID string, itemID string) (int, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{ - {"timelineAccountID", timelineAccountID}, - {"itemID", itemID}, - }...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return 0, err - } - - l.Trace("removing item") - return t.Remove(ctx, itemID) -} - -func (m *manager) GetTimeline(ctx context.Context, timelineAccountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) { - l := log.WithContext(ctx). - WithFields(kv.Fields{{"timelineAccountID", timelineAccountID}}...) - - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return nil, err - } - - items, err := t.Get(ctx, limit, maxID, sinceID, minID, true) - if err != nil { - l.Errorf("error getting statuses: %s", err) - } - return items, nil +func (m *manager) GetTimeline(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int, local bool) ([]Preparable, error) { + return m.getOrCreateTimeline(ctx, accountID).Get(ctx, limit, maxID, sinceID, minID, true) } -func (m *manager) GetIndexedLength(ctx context.Context, timelineAccountID string) int { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return 0 - } - - return t.ItemIndexLength(ctx) +func (m *manager) GetIndexedLength(ctx context.Context, accountID string) int { + return m.getOrCreateTimeline(ctx, accountID).Len() } -func (m *manager) GetOldestIndexedID(ctx context.Context, timelineAccountID string) (string, error) { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return "", err - } - - return t.OldestIndexedItemID(ctx) -} - -func (m *manager) PrepareXFromTop(ctx context.Context, timelineAccountID string, limit int) error { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return err - } - - return t.PrepareFromTop(ctx, limit) +func (m *manager) GetOldestIndexedID(ctx context.Context, accountID string) string { + return m.getOrCreateTimeline(ctx, accountID).OldestIndexedItemID() } func (m *manager) WipeItemFromAllTimelines(ctx context.Context, statusID string) error { - errors := []string{} - m.accountTimelines.Range(func(k interface{}, i interface{}) bool { - t, ok := i.(Timeline) - if !ok { - panic("couldn't parse entry as Timeline, this should never happen so panic") - } + errors := gtserror.MultiError{} - if _, err := t.Remove(ctx, statusID); err != nil { - errors = append(errors, err.Error()) + m.accountTimelines.Range(func(_ any, v any) bool { + if _, err := v.(Timeline).Remove(ctx, statusID); err != nil { + errors.Append(err) } - return true + return true // always continue range }) - var err error if len(errors) > 0 { - err = fmt.Errorf("one or more errors removing status %s from all timelines: %s", statusID, strings.Join(errors, ";")) + return fmt.Errorf("WipeItemFromAllTimelines: one or more errors wiping status %s: %w", statusID, errors.Combine()) } - return err + return nil } func (m *manager) WipeItemsFromAccountID(ctx context.Context, timelineAccountID string, accountID string) error { - t, err := m.getOrCreateTimeline(ctx, timelineAccountID) - if err != nil { - return err - } - - _, err = t.RemoveAllBy(ctx, accountID) + _, err := m.getOrCreateTimeline(ctx, timelineAccountID).RemoveAllByOrBoosting(ctx, accountID) return err } -func (m *manager) getOrCreateTimeline(ctx context.Context, timelineAccountID string) (Timeline, error) { - var t Timeline - i, ok := m.accountTimelines.Load(timelineAccountID) - if !ok { - var err error - t, err = NewTimeline(ctx, timelineAccountID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction) - if err != nil { - return nil, err - } - m.accountTimelines.Store(timelineAccountID, t) - } else { - t, ok = i.(Timeline) - if !ok { - panic("couldn't parse entry as Timeline, this should never happen so panic") - } +// getOrCreateTimeline returns a timeline for the given +// accountID. If a timeline does not yet exist in the +// manager's sync.Map, it will be created and stored. +func (m *manager) getOrCreateTimeline(ctx context.Context, accountID string) Timeline { + i, ok := m.accountTimelines.Load(accountID) + if ok { + // Timeline already existed in sync.Map. + return i.(Timeline) //nolint:forcetypeassert } - return t, nil + // Timeline did not yet exist in sync.Map. + // Create + store it. + timeline := NewTimeline(ctx, accountID, m.grabFunction, m.filterFunction, m.prepareFunction, m.skipInsertFunction) + m.accountTimelines.Store(accountID, timeline) + + return timeline } |