From 8fdd358f4b4428b33df4afd672ed070032d46e48 Mon Sep 17 00:00:00 2001 From: Vyr Cossont Date: Tue, 23 Jul 2024 12:44:31 -0700 Subject: [feature] Conversations API (#3013) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement conversations API * Sort and page conversations by last status ID * Appease linter * Fix deleting conversations and statuses * Refactor to make migrations automatic * Lint * Update tests post-merge * Fixes from live-fire testing * Linter caught a format problem * Refactor tests, fix cache * Negative test for non-DMs * Run conversations advanced migration on testrig startup as well as regular server startup * Document (lack of) side effects of API method for deleting a conversation * Make not-found check less nested for readability * Rename PutConversation to UpsertConversation * Use util.Ptr instead of IIFE * Reduce cache used by conversations * Remove unnecessary TableExpr/ColumnExpr * Use struct tags for both unique constraints on Conversation * Make it clear how paging with GetDirectStatusIDsBatch should be used * Let conversation paging skip conversations it can't render * Use Bun NewDropTable * Convert delete raw query to Bun * Convert update raw query to Bun * Convert latestConversationStatusesTempTable raw query partially to Bun * Convert conversationStatusesTempTable raw query partially to Bun * Rename field used to store result of MaxDirectStatusID * Move advanced migrations to their own tiny processor * Catch up util function name with main * Remove json.… wrappers * Remove redundant check * Combine error checks * Replace map with slice of structs * Address processor/type converter comments - Add context info for errors - Extract some common processor code into shared methods - Move conversation eligibility check ahead of populating conversation * Add error context when dropping temp tables --- internal/cache/cache.go | 2 ++ internal/cache/db.go | 52 ++++++++++++++++++++++++++++++++++++++++++++ internal/cache/invalidate.go | 5 +++++ internal/cache/size.go | 15 +++++++++++++ internal/cache/wrappers.go | 28 ++++++++++++++++++++++++ 5 files changed, 102 insertions(+) (limited to 'internal/cache') diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 5a8a92ca3..8b0c04ea4 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -60,6 +60,8 @@ func (c *Caches) Init() { c.initBlockIDs() c.initBoostOfIDs() c.initClient() + c.initConversation() + c.initConversationLastStatusIDs() c.initDomainAllow() c.initDomainBlock() c.initEmoji() diff --git a/internal/cache/db.go b/internal/cache/db.go index 50acf00d1..4c063b06d 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -56,6 +56,12 @@ type GTSCaches struct { // Client provides access to the gtsmodel Client database cache. Client StructCache[*gtsmodel.Client] + // Conversation provides access to the gtsmodel Conversation database cache. + Conversation StructCache[*gtsmodel.Conversation] + + // ConversationLastStatusIDs provides access to the conversation last status IDs database cache. + ConversationLastStatusIDs SliceCache[string] + // DomainAllow provides access to the domain allow database cache. DomainAllow *domain.Cache @@ -426,6 +432,52 @@ func (c *Caches) initClient() { }) } +func (c *Caches) initConversation() { + cap := calculateResultCacheMax( + sizeofConversation(), // model in-mem size. + config.GetCacheConversationMemRatio(), + ) + + log.Infof(nil, "cache size = %d", cap) + + copyF := func(c1 *gtsmodel.Conversation) *gtsmodel.Conversation { + c2 := new(gtsmodel.Conversation) + *c2 = *c1 + + // Don't include ptr fields that + // will be populated separately. + // See internal/db/bundb/conversation.go. + c2.Account = nil + c2.OtherAccounts = nil + c2.LastStatus = nil + + return c2 + } + + c.GTS.Conversation.Init(structr.CacheConfig[*gtsmodel.Conversation]{ + Indices: []structr.IndexConfig{ + {Fields: "ID"}, + {Fields: "ThreadID,AccountID,OtherAccountsKey"}, + {Fields: "AccountID,LastStatusID"}, + {Fields: "AccountID", Multiple: true}, + }, + MaxSize: cap, + IgnoreErr: ignoreErrors, + Copy: copyF, + Invalidate: c.OnInvalidateConversation, + }) +} + +func (c *Caches) initConversationLastStatusIDs() { + cap := calculateSliceCacheMax( + config.GetCacheConversationLastStatusIDsMemRatio(), + ) + + log.Infof(nil, "cache size = %d", cap) + + c.GTS.ConversationLastStatusIDs.Init(0, cap) +} + func (c *Caches) initDomainAllow() { c.GTS.DomainAllow = new(domain.Cache) } diff --git a/internal/cache/invalidate.go b/internal/cache/invalidate.go index 088e7f91f..987a6eb64 100644 --- a/internal/cache/invalidate.go +++ b/internal/cache/invalidate.go @@ -83,6 +83,11 @@ func (c *Caches) OnInvalidateClient(client *gtsmodel.Client) { c.GTS.Token.Invalidate("ClientID", client.ID) } +func (c *Caches) OnInvalidateConversation(conversation *gtsmodel.Conversation) { + // Invalidate owning account's conversation list. + c.GTS.ConversationLastStatusIDs.Invalidate(conversation.AccountID) +} + func (c *Caches) OnInvalidateEmojiCategory(category *gtsmodel.EmojiCategory) { // Invalidate any emoji in this category. c.GTS.Emoji.Invalidate("CategoryID", category.ID) diff --git a/internal/cache/size.go b/internal/cache/size.go index 4ec30fbb7..4c474fa28 100644 --- a/internal/cache/size.go +++ b/internal/cache/size.go @@ -19,6 +19,7 @@ package cache import ( "crypto/rsa" + "strings" "time" "unsafe" @@ -320,6 +321,20 @@ func sizeofClient() uintptr { })) } +func sizeofConversation() uintptr { + return uintptr(size.Of(>smodel.Conversation{ + ID: exampleID, + CreatedAt: exampleTime, + UpdatedAt: exampleTime, + AccountID: exampleID, + OtherAccountIDs: []string{exampleID, exampleID, exampleID}, + OtherAccountsKey: strings.Join([]string{exampleID, exampleID, exampleID}, ","), + ThreadID: exampleID, + LastStatusID: exampleID, + Read: util.Ptr(true), + })) +} + func sizeofEmoji() uintptr { return uintptr(size.Of(>smodel.Emoji{ ID: exampleID, diff --git a/internal/cache/wrappers.go b/internal/cache/wrappers.go index edeea9bcd..9cb4fca98 100644 --- a/internal/cache/wrappers.go +++ b/internal/cache/wrappers.go @@ -158,6 +158,34 @@ func (c *StructCache[T]) LoadIDs(index string, ids []string, load func([]string) }) } +// LoadIDs2Part works as LoadIDs, except using a two-part key, +// where the first part is an ID shared by all the objects, +// and the second part is a list of per-object IDs. +func (c *StructCache[T]) LoadIDs2Part(index string, id1 string, id2s []string, load func(string, []string) ([]T, error)) ([]T, error) { + i := c.index[index] + if i == nil { + // we only perform this check here as + // we're going to use the index before + // passing it to cache in main .Load(). + panic("missing index for cache type") + } + + // Generate cache keys for two-part IDs. + keys := make([]structr.Key, len(id2s)) + for x, id2 := range id2s { + keys[x] = i.Key(id1, id2) + } + + // Pass loader callback with wrapper onto main cache load function. + return c.cache.Load(i, keys, func(uncached []structr.Key) ([]T, error) { + uncachedIDs := make([]string, len(uncached)) + for i := range uncached { + uncachedIDs[i] = uncached[i].Values()[1].(string) + } + return load(id1, uncachedIDs) + }) +} + // Store: see structr.Cache{}.Store(). func (c *StructCache[T]) Store(value T, store func() error) error { return c.cache.Store(value, store) -- cgit v1.2.3