summaryrefslogtreecommitdiff
path: root/internal/processing/workers
diff options
context:
space:
mode:
authorLibravatar Vyr Cossont <VyrCossont@users.noreply.github.com>2024-07-23 12:44:31 -0700
committerLibravatar GitHub <noreply@github.com>2024-07-23 20:44:31 +0100
commit8fdd358f4b4428b33df4afd672ed070032d46e48 (patch)
tree92ec4bcbda633878a468e396b968656dbf33ef59 /internal/processing/workers
parent[bugfix] media.Processor{}.GetFile() returning 404s on first call, correctly ... (diff)
downloadgotosocial-8fdd358f4b4428b33df4afd672ed070032d46e48.tar.xz
[feature] Conversations API (#3013)
* Implement conversations API * Sort and page conversations by last status ID * Appease linter * Fix deleting conversations and statuses * Refactor to make migrations automatic * Lint * Update tests post-merge * Fixes from live-fire testing * Linter caught a format problem * Refactor tests, fix cache * Negative test for non-DMs * Run conversations advanced migration on testrig startup as well as regular server startup * Document (lack of) side effects of API method for deleting a conversation * Make not-found check less nested for readability * Rename PutConversation to UpsertConversation * Use util.Ptr instead of IIFE * Reduce cache used by conversations * Remove unnecessary TableExpr/ColumnExpr * Use struct tags for both unique constraints on Conversation * Make it clear how paging with GetDirectStatusIDsBatch should be used * Let conversation paging skip conversations it can't render * Use Bun NewDropTable * Convert delete raw query to Bun * Convert update raw query to Bun * Convert latestConversationStatusesTempTable raw query partially to Bun * Convert conversationStatusesTempTable raw query partially to Bun * Rename field used to store result of MaxDirectStatusID * Move advanced migrations to their own tiny processor * Catch up util function name with main * Remove json.… wrappers * Remove redundant check * Combine error checks * Replace map with slice of structs * Address processor/type converter comments - Add context info for errors - Extract some common processor code into shared methods - Move conversation eligibility check ahead of populating conversation * Add error context when dropping temp tables
Diffstat (limited to 'internal/processing/workers')
-rw-r--r--internal/processing/workers/fromclientapi_test.go231
-rw-r--r--internal/processing/workers/surface.go12
-rw-r--r--internal/processing/workers/surfacenotify_test.go11
-rw-r--r--internal/processing/workers/surfacetimeline.go13
-rw-r--r--internal/processing/workers/util.go5
-rw-r--r--internal/processing/workers/workers.go13
-rw-r--r--internal/processing/workers/workers_test.go1
7 files changed, 269 insertions, 17 deletions
diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go
index 49a68d27a..35c2c31b7 100644
--- a/internal/processing/workers/fromclientapi_test.go
+++ b/internal/processing/workers/fromclientapi_test.go
@@ -50,6 +50,8 @@ func (suite *FromClientAPITestSuite) newStatus(
visibility gtsmodel.Visibility,
replyToStatus *gtsmodel.Status,
boostOfStatus *gtsmodel.Status,
+ mentionedAccounts []*gtsmodel.Account,
+ createThread bool,
) *gtsmodel.Status {
var (
protocol = config.GetProtocol()
@@ -102,6 +104,39 @@ func (suite *FromClientAPITestSuite) newStatus(
newStatus.Visibility = boostOfStatus.Visibility
}
+ for _, mentionedAccount := range mentionedAccounts {
+ newMention := &gtsmodel.Mention{
+ ID: id.NewULID(),
+ StatusID: newStatus.ID,
+ Status: newStatus,
+ OriginAccountID: account.ID,
+ OriginAccountURI: account.URI,
+ OriginAccount: account,
+ TargetAccountID: mentionedAccount.ID,
+ TargetAccount: mentionedAccount,
+ Silent: util.Ptr(false),
+ }
+
+ newStatus.Mentions = append(newStatus.Mentions, newMention)
+ newStatus.MentionIDs = append(newStatus.MentionIDs, newMention.ID)
+
+ if err := state.DB.PutMention(ctx, newMention); err != nil {
+ suite.FailNow(err.Error())
+ }
+ }
+
+ if createThread {
+ newThread := &gtsmodel.Thread{
+ ID: id.NewULID(),
+ }
+
+ newStatus.ThreadID = newThread.ID
+
+ if err := state.DB.PutThread(ctx, newThread); err != nil {
+ suite.FailNow(err.Error())
+ }
+ }
+
// Put the status in the db, to mimic what would
// have already happened earlier up the flow.
if err := state.DB.PutStatus(ctx, newStatus); err != nil {
@@ -168,6 +203,31 @@ func (suite *FromClientAPITestSuite) statusJSON(
return string(statusJSON)
}
+func (suite *FromClientAPITestSuite) conversationJSON(
+ ctx context.Context,
+ typeConverter *typeutils.Converter,
+ conversation *gtsmodel.Conversation,
+ requestingAccount *gtsmodel.Account,
+) string {
+ apiConversation, err := typeConverter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ requestingAccount,
+ nil,
+ nil,
+ )
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ conversationJSON, err := json.Marshal(apiConversation)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ return string(conversationJSON)
+}
+
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
@@ -194,6 +254,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
gtsmodel.VisibilityPublic,
nil,
nil,
+ nil,
+ false,
)
)
@@ -303,6 +365,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -362,6 +426,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() {
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_1_status_1"],
nil,
+ nil,
+ false,
)
threadMute = &gtsmodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE",
@@ -420,6 +486,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_1_status_1"],
+ nil,
+ false,
)
threadMute = &gtsmodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE",
@@ -483,6 +551,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -556,6 +626,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -634,6 +706,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -704,6 +778,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_2_status_1"],
+ nil,
+ false,
)
)
@@ -765,6 +841,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_2_status_1"],
+ nil,
+ false,
)
)
@@ -807,6 +885,159 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
)
}
+// A DM to a local user should create a conversation and accompanying notification.
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() {
+ testStructs := suite.SetupTestStructs()
+ defer suite.TearDownTestStructs(testStructs)
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["local_account_2"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx,
+ testStructs.Processor,
+ receivingAccount,
+ nil,
+ )
+ homeStream = streams[stream.TimelineHome]
+ directStream = streams[stream.TimelineDirect]
+
+ // turtle posts a new top-level DM mentioning zork.
+ status = suite.newStatus(
+ ctx,
+ testStructs.State,
+ postingAccount,
+ gtsmodel.VisibilityDirect,
+ nil,
+ nil,
+ []*gtsmodel.Account{receivingAccount},
+ true,
+ )
+ )
+
+ // Process the new status.
+ if err := testStructs.Processor.Workers().ProcessFromClientAPI(
+ ctx,
+ &messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ Origin: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Locate the conversation which should now exist for zork.
+ conversation, err := testStructs.State.DB.GetConversationByThreadAndAccountIDs(
+ ctx,
+ status.ThreadID,
+ receivingAccount.ID,
+ []string{postingAccount.ID},
+ )
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check status in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
+ // Check mention notification in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeNotification,
+ )
+
+ // Check conversation in direct stream.
+ conversationJSON := suite.conversationJSON(
+ ctx,
+ testStructs.TypeConverter,
+ conversation,
+ receivingAccount,
+ )
+ suite.checkStreamed(
+ directStream,
+ true,
+ conversationJSON,
+ stream.EventTypeConversation,
+ )
+}
+
+// A public message to a local user should not result in a conversation notification.
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() {
+ testStructs := suite.SetupTestStructs()
+ defer suite.TearDownTestStructs(testStructs)
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["local_account_2"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx,
+ testStructs.Processor,
+ receivingAccount,
+ nil,
+ )
+ homeStream = streams[stream.TimelineHome]
+ directStream = streams[stream.TimelineDirect]
+
+ // turtle posts a new top-level public message mentioning zork.
+ status = suite.newStatus(
+ ctx,
+ testStructs.State,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ nil,
+ nil,
+ []*gtsmodel.Account{receivingAccount},
+ true,
+ )
+ )
+
+ // Process the new status.
+ if err := testStructs.Processor.Workers().ProcessFromClientAPI(
+ ctx,
+ &messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ Origin: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check status in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
+ // Check mention notification in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeNotification,
+ )
+
+ // Check for absence of conversation notification in direct stream.
+ suite.checkStreamed(
+ directStream,
+ false,
+ "",
+ "",
+ )
+}
+
func (suite *FromClientAPITestSuite) TestProcessStatusDelete() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go
index 5ec905ae8..1a7dbbfe5 100644
--- a/internal/processing/workers/surface.go
+++ b/internal/processing/workers/surface.go
@@ -20,6 +20,7 @@ package workers
import (
"github.com/superseriousbusiness/gotosocial/internal/email"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
@@ -32,9 +33,10 @@ import (
// - sending a notification to a user
// - sending an email
type Surface struct {
- State *state.State
- Converter *typeutils.Converter
- Stream *stream.Processor
- Filter *visibility.Filter
- EmailSender email.Sender
+ State *state.State
+ Converter *typeutils.Converter
+ Stream *stream.Processor
+ Filter *visibility.Filter
+ EmailSender email.Sender
+ Conversations *conversations.Processor
}
diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go
index 18d0277ae..937ddeca2 100644
--- a/internal/processing/workers/surfacenotify_test.go
+++ b/internal/processing/workers/surfacenotify_test.go
@@ -39,11 +39,12 @@ func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() {
defer suite.TearDownTestStructs(testStructs)
surface := &workers.Surface{
- State: testStructs.State,
- Converter: testStructs.TypeConverter,
- Stream: testStructs.Processor.Stream(),
- Filter: visibility.NewFilter(testStructs.State),
- EmailSender: testStructs.EmailSender,
+ State: testStructs.State,
+ Converter: testStructs.TypeConverter,
+ Stream: testStructs.Processor.Stream(),
+ Filter: visibility.NewFilter(testStructs.State),
+ EmailSender: testStructs.EmailSender,
+ Conversations: testStructs.Processor.Conversations(),
}
var (
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
index 41d7f6f2a..8ac8293ed 100644
--- a/internal/processing/workers/surfacetimeline.go
+++ b/internal/processing/workers/surfacetimeline.go
@@ -36,8 +36,8 @@ import (
// and LIST timelines of accounts that follow the status author.
//
// It will also handle notifications for any mentions attached to
-// the account, and notifications for any local accounts that want
-// to know when this account posts.
+// the account, notifications for any local accounts that want
+// to know when this account posts, and conversations containing the status.
func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {
// Ensure status fully populated; including account, mentions, etc.
if err := s.State.DB.PopulateStatus(ctx, status); err != nil {
@@ -73,6 +73,15 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)
}
+ // Update any conversations containing this status, and send conversation notifications.
+ notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status)
+ if err != nil {
+ return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err)
+ }
+ for _, notification := range notifications {
+ s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation)
+ }
+
return nil
}
diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go
index 780e5ca14..915370976 100644
--- a/internal/processing/workers/util.go
+++ b/internal/processing/workers/util.go
@@ -137,6 +137,11 @@ func (u *utils) wipeStatus(
errs.Appendf("error deleting status from timelines: %w", err)
}
+ // delete this status from any conversations that it's part of
+ if err := u.state.DB.DeleteStatusFromConversations(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status from conversations: %w", err)
+ }
+
// finally, delete the status itself
if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil {
errs.Appendf("error deleting status: %w", err)
diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go
index 6b4cc07a6..c7f67b025 100644
--- a/internal/processing/workers/workers.go
+++ b/internal/processing/workers/workers.go
@@ -22,6 +22,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state"
@@ -44,6 +45,7 @@ func New(
account *account.Processor,
media *media.Processor,
stream *stream.Processor,
+ conversations *conversations.Processor,
) Processor {
// Init federate logic
// wrapper struct.
@@ -56,11 +58,12 @@ func New(
// Init surface logic
// wrapper struct.
surface := &Surface{
- State: state,
- Converter: converter,
- Stream: stream,
- Filter: filter,
- EmailSender: emailSender,
+ State: state,
+ Converter: converter,
+ Stream: stream,
+ Filter: filter,
+ EmailSender: emailSender,
+ Conversations: conversations,
}
// Init shared util funcs.
diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go
index f66190d75..3093fd93a 100644
--- a/internal/processing/workers/workers_test.go
+++ b/internal/processing/workers/workers_test.go
@@ -108,6 +108,7 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce
stream.TimelineHome,
stream.TimelinePublic,
stream.TimelineNotifications,
+ stream.TimelineDirect,
} {
stream, err := processor.Stream().Open(ctx, account, streamType)
if err != nil {