summaryrefslogtreecommitdiff
path: root/internal/processing
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing')
-rw-r--r--internal/processing/account/delete.go8
-rw-r--r--internal/processing/advancedmigrations/advancedmigrations.go48
-rw-r--r--internal/processing/conversations/conversations.go126
-rw-r--r--internal/processing/conversations/conversations_test.go151
-rw-r--r--internal/processing/conversations/delete.go45
-rw-r--r--internal/processing/conversations/delete_test.go27
-rw-r--r--internal/processing/conversations/get.go101
-rw-r--r--internal/processing/conversations/get_test.go65
-rw-r--r--internal/processing/conversations/migrate.go131
-rw-r--r--internal/processing/conversations/migrate_test.go85
-rw-r--r--internal/processing/conversations/read.go65
-rw-r--r--internal/processing/conversations/read_test.go34
-rw-r--r--internal/processing/conversations/update.go242
-rw-r--r--internal/processing/conversations/update_test.go54
-rw-r--r--internal/processing/processor.go49
-rw-r--r--internal/processing/stream/conversation.go44
-rw-r--r--internal/processing/workers/fromclientapi_test.go231
-rw-r--r--internal/processing/workers/surface.go12
-rw-r--r--internal/processing/workers/surfacenotify_test.go11
-rw-r--r--internal/processing/workers/surfacetimeline.go13
-rw-r--r--internal/processing/workers/util.go5
-rw-r--r--internal/processing/workers/workers.go13
-rw-r--r--internal/processing/workers/workers_test.go1
23 files changed, 1528 insertions, 33 deletions
diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go
index 075e94544..702b46cda 100644
--- a/internal/processing/account/delete.go
+++ b/internal/processing/account/delete.go
@@ -460,6 +460,14 @@ func (p *Processor) deleteAccountPeripheral(ctx context.Context, account *gtsmod
// TODO: add status mutes here when they're implemented.
+ // Delete all conversations owned by given account.
+ // Conversations in which it has only participated will be retained;
+ // they can always be deleted by their owners.
+ if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse
+ err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("error deleting conversations owned by account: %w", err)
+ }
+
// Delete all poll votes owned by given account.
if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse
err != nil && !errors.Is(err, db.ErrNoEntries) {
diff --git a/internal/processing/advancedmigrations/advancedmigrations.go b/internal/processing/advancedmigrations/advancedmigrations.go
new file mode 100644
index 000000000..3f1876539
--- /dev/null
+++ b/internal/processing/advancedmigrations/advancedmigrations.go
@@ -0,0 +1,48 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package advancedmigrations
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
+)
+
+// Processor holds references to any other processor that has migrations to run.
+type Processor struct {
+ conversations *conversations.Processor
+}
+
+func New(
+ conversations *conversations.Processor,
+) Processor {
+ return Processor{
+ conversations: conversations,
+ }
+}
+
+// Migrate runs all advanced migrations.
+// Errors should be in the same format thrown by other server or testrig startup failures.
+func (p *Processor) Migrate(ctx context.Context) error {
+ if err := p.conversations.MigrateDMsToConversations(ctx); err != nil {
+ return fmt.Errorf("error running conversations advanced migration: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/conversations/conversations.go b/internal/processing/conversations/conversations.go
new file mode 100644
index 000000000..d95740605
--- /dev/null
+++ b/internal/processing/conversations/conversations.go
@@ -0,0 +1,126 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations
+
+import (
+ "context"
+ "errors"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/filter/usermute"
+ "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+)
+
+type Processor struct {
+ state *state.State
+ converter *typeutils.Converter
+ filter *visibility.Filter
+}
+
+func New(
+ state *state.State,
+ converter *typeutils.Converter,
+ filter *visibility.Filter,
+) Processor {
+ return Processor{
+ state: state,
+ converter: converter,
+ filter: filter,
+ }
+}
+
+const conversationNotFoundHelpText = "conversation not found"
+
+// getConversationOwnedBy gets a conversation by ID and checks that it is owned by the given account.
+func (p *Processor) getConversationOwnedBy(
+ ctx context.Context,
+ id string,
+ requestingAccount *gtsmodel.Account,
+) (*gtsmodel.Conversation, gtserror.WithCode) {
+ // Get the conversation so that we can check its owning account ID.
+ conversation, err := p.state.DB.GetConversationByID(ctx, id)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting conversation %s for account %s: %w",
+ id,
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+ if conversation == nil {
+ return nil, gtserror.NewErrorNotFound(
+ gtserror.Newf(
+ "conversation %s not found: %w",
+ id,
+ err,
+ ),
+ conversationNotFoundHelpText,
+ )
+ }
+ if conversation.AccountID != requestingAccount.ID {
+ return nil, gtserror.NewErrorNotFound(
+ gtserror.Newf(
+ "conversation %s not owned by account %s: %w",
+ id,
+ requestingAccount.ID,
+ err,
+ ),
+ conversationNotFoundHelpText,
+ )
+ }
+
+ return conversation, nil
+}
+
+// getFiltersAndMutes gets the given account's filters and compiled mute list.
+func (p *Processor) getFiltersAndMutes(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+) ([]*gtsmodel.Filter, *usermute.CompiledUserMuteList, gtserror.WithCode) {
+ filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID)
+ if err != nil {
+ return nil, nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting filters for account %s: %w",
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+
+ mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil)
+ if err != nil {
+ return nil, nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting mutes for account %s: %w",
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+ compiledMutes := usermute.NewCompiledUserMuteList(mutes)
+
+ return filters, compiledMutes, nil
+}
diff --git a/internal/processing/conversations/conversations_test.go b/internal/processing/conversations/conversations_test.go
new file mode 100644
index 000000000..cc7ec617e
--- /dev/null
+++ b/internal/processing/conversations/conversations_test.go
@@ -0,0 +1,151 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ dbtest "github.com/superseriousbusiness/gotosocial/internal/db/test"
+ "github.com/superseriousbusiness/gotosocial/internal/email"
+ "github.com/superseriousbusiness/gotosocial/internal/federation"
+ "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/media"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
+ "github.com/superseriousbusiness/gotosocial/internal/transport"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type ConversationsTestSuite struct {
+ // standard suite interfaces
+ suite.Suite
+ db db.DB
+ tc *typeutils.Converter
+ storage *storage.Driver
+ state state.State
+ mediaManager *media.Manager
+ transportController transport.Controller
+ federator *federation.Federator
+ emailSender email.Sender
+ sentEmails map[string]string
+ filter *visibility.Filter
+
+ // standard suite models
+ testTokens map[string]*gtsmodel.Token
+ testClients map[string]*gtsmodel.Client
+ testApplications map[string]*gtsmodel.Application
+ testUsers map[string]*gtsmodel.User
+ testAccounts map[string]*gtsmodel.Account
+ testFollows map[string]*gtsmodel.Follow
+ testAttachments map[string]*gtsmodel.MediaAttachment
+ testStatuses map[string]*gtsmodel.Status
+
+ // module being tested
+ conversationsProcessor conversations.Processor
+
+ // Owner of test conversations
+ testAccount *gtsmodel.Account
+
+ // Mixin for conversation tests
+ dbtest.ConversationFactory
+}
+
+func (suite *ConversationsTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) {
+ ctx := context.Background()
+ ctx, cncl := context.WithTimeout(ctx, timeout)
+ defer cncl()
+ return suite.state.Workers.Client.Queue.PopCtx(ctx)
+}
+
+func (suite *ConversationsTestSuite) SetupSuite() {
+ suite.testTokens = testrig.NewTestTokens()
+ suite.testClients = testrig.NewTestClients()
+ suite.testApplications = testrig.NewTestApplications()
+ suite.testUsers = testrig.NewTestUsers()
+ suite.testAccounts = testrig.NewTestAccounts()
+ suite.testFollows = testrig.NewTestFollows()
+ suite.testAttachments = testrig.NewTestAttachments()
+ suite.testStatuses = testrig.NewTestStatuses()
+
+ suite.ConversationFactory.SetupSuite(suite)
+}
+
+func (suite *ConversationsTestSuite) SetupTest() {
+ suite.state.Caches.Init()
+ testrig.StartNoopWorkers(&suite.state)
+
+ testrig.InitTestConfig()
+ testrig.InitTestLog()
+
+ suite.db = testrig.NewTestDB(&suite.state)
+ suite.state.DB = suite.db
+ suite.tc = typeutils.NewConverter(&suite.state)
+ suite.filter = visibility.NewFilter(&suite.state)
+
+ testrig.StartTimelines(
+ &suite.state,
+ suite.filter,
+ suite.tc,
+ )
+
+ suite.storage = testrig.NewInMemoryStorage()
+ suite.state.Storage = suite.storage
+ suite.mediaManager = testrig.NewTestMediaManager(&suite.state)
+
+ suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media"))
+ suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager)
+ suite.sentEmails = make(map[string]string)
+ suite.emailSender = testrig.NewEmailSender("../../../web/template/", suite.sentEmails)
+
+ suite.conversationsProcessor = conversations.New(&suite.state, suite.tc, suite.filter)
+ testrig.StandardDBSetup(suite.db, nil)
+ testrig.StandardStorageSetup(suite.storage, "../../../testrig/media")
+
+ suite.ConversationFactory.SetupTest(suite.db)
+
+ suite.testAccount = suite.testAccounts["local_account_1"]
+}
+
+func (suite *ConversationsTestSuite) TearDownTest() {
+ conversationModels := []interface{}{
+ (*gtsmodel.Conversation)(nil),
+ (*gtsmodel.ConversationToStatus)(nil),
+ }
+ for _, model := range conversationModels {
+ if err := suite.db.DropTable(context.Background(), model); err != nil {
+ log.Error(context.Background(), err)
+ }
+ }
+
+ testrig.StandardDBTeardown(suite.db)
+ testrig.StandardStorageTeardown(suite.storage)
+ testrig.StopWorkers(&suite.state)
+}
+
+func TestConversationsTestSuite(t *testing.T) {
+ suite.Run(t, new(ConversationsTestSuite))
+}
diff --git a/internal/processing/conversations/delete.go b/internal/processing/conversations/delete.go
new file mode 100644
index 000000000..5cbdd00a5
--- /dev/null
+++ b/internal/processing/conversations/delete.go
@@ -0,0 +1,45 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+func (p *Processor) Delete(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+ id string,
+) gtserror.WithCode {
+ // Get the conversation so that we can check its owning account ID.
+ conversation, errWithCode := p.getConversationOwnedBy(gtscontext.SetBarebones(ctx), id, requestingAccount)
+ if errWithCode != nil {
+ return errWithCode
+ }
+
+ // Delete the conversation.
+ if err := p.state.DB.DeleteConversationByID(ctx, conversation.ID); err != nil {
+ return gtserror.NewErrorInternalError(err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/conversations/delete_test.go b/internal/processing/conversations/delete_test.go
new file mode 100644
index 000000000..23b4f1c1a
--- /dev/null
+++ b/internal/processing/conversations/delete_test.go
@@ -0,0 +1,27 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations_test
+
+import "context"
+
+func (suite *ConversationsTestSuite) TestDelete() {
+ conversation := suite.NewTestConversation(suite.testAccount, 0)
+
+ err := suite.conversationsProcessor.Delete(context.Background(), suite.testAccount, conversation.ID)
+ suite.NoError(err)
+}
diff --git a/internal/processing/conversations/get.go b/internal/processing/conversations/get.go
new file mode 100644
index 000000000..0c7832cae
--- /dev/null
+++ b/internal/processing/conversations/get.go
@@ -0,0 +1,101 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations
+
+import (
+ "context"
+ "errors"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/paging"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+// GetAll returns conversations owned by the given account.
+// The additional parameters can be used for paging.
+func (p *Processor) GetAll(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+ page *paging.Page,
+) (*apimodel.PageableResponse, gtserror.WithCode) {
+ conversations, err := p.state.DB.GetConversationsByOwnerAccountID(
+ ctx,
+ requestingAccount.ID,
+ page,
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return nil, gtserror.NewErrorInternalError(
+ gtserror.Newf(
+ "DB error getting conversations for account %s: %w",
+ requestingAccount.ID,
+ err,
+ ),
+ )
+ }
+
+ // Check for empty response.
+ count := len(conversations)
+ if len(conversations) == 0 {
+ return util.EmptyPageableResponse(), nil
+ }
+
+ // Get the lowest and highest last status ID values, used for paging.
+ lo := conversations[count-1].LastStatusID
+ hi := conversations[0].LastStatusID
+
+ items := make([]interface{}, 0, count)
+
+ filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount)
+ if errWithCode != nil {
+ return nil, errWithCode
+ }
+
+ for _, conversation := range conversations {
+ // Convert conversation to frontend API model.
+ apiConversation, err := p.converter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ requestingAccount,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ log.Errorf(
+ ctx,
+ "error converting conversation %s to API representation: %v",
+ conversation.ID,
+ err,
+ )
+ continue
+ }
+
+ // Append conversation to return items.
+ items = append(items, apiConversation)
+ }
+
+ return paging.PackageResponse(paging.ResponseParams{
+ Items: items,
+ Path: "/api/v1/conversations",
+ Next: page.Next(lo, hi),
+ Prev: page.Prev(lo, hi),
+ }), nil
+}
diff --git a/internal/processing/conversations/get_test.go b/internal/processing/conversations/get_test.go
new file mode 100644
index 000000000..7b3d60749
--- /dev/null
+++ b/internal/processing/conversations/get_test.go
@@ -0,0 +1,65 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations_test
+
+import (
+ "context"
+ "time"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+)
+
+func (suite *ConversationsTestSuite) TestGetAll() {
+ conversation := suite.NewTestConversation(suite.testAccount, 0)
+
+ resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil)
+ if suite.NoError(err) && suite.Len(resp.Items, 1) && suite.IsType((*apimodel.Conversation)(nil), resp.Items[0]) {
+ apiConversation := resp.Items[0].(*apimodel.Conversation)
+ suite.Equal(conversation.ID, apiConversation.ID)
+ suite.True(apiConversation.Unread)
+ }
+}
+
+// Test that conversations with newer last status IDs are returned earlier.
+func (suite *ConversationsTestSuite) TestGetAllOrder() {
+ // Create a new conversation.
+ conversation1 := suite.NewTestConversation(suite.testAccount, 0)
+
+ // Create another new conversation with a last status newer than conversation1's.
+ conversation2 := suite.NewTestConversation(suite.testAccount, 1*time.Second)
+
+ // Add an even newer status than that to conversation1.
+ conversation1Status2 := suite.NewTestStatus(suite.testAccount, conversation1.LastStatus.ThreadID, 2*time.Second, conversation1.LastStatus)
+ conversation1.LastStatusID = conversation1Status2.ID
+ if err := suite.db.UpsertConversation(context.Background(), conversation1, "last_status_id"); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil)
+ if suite.NoError(err) && suite.Len(resp.Items, 2) {
+ // conversation1 should be the first conversation returned.
+ apiConversation1 := resp.Items[0].(*apimodel.Conversation)
+ suite.Equal(conversation1.ID, apiConversation1.ID)
+ // It should have the newest status added to it.
+ suite.Equal(conversation1.LastStatusID, conversation1Status2.ID)
+
+ // conversation2 should be the second conversation returned.
+ apiConversation2 := resp.Items[1].(*apimodel.Conversation)
+ suite.Equal(conversation2.ID, apiConversation2.ID)
+ }
+}
diff --git a/internal/processing/conversations/migrate.go b/internal/processing/conversations/migrate.go
new file mode 100644
index 000000000..959ffcca4
--- /dev/null
+++ b/internal/processing/conversations/migrate.go
@@ -0,0 +1,131 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+const advancedMigrationID = "20240611190733_add_conversations"
+const statusBatchSize = 100
+
+type AdvancedMigrationState struct {
+ MinID string
+ MaxIDInclusive string
+}
+
+func (p *Processor) MigrateDMsToConversations(ctx context.Context) error {
+ advancedMigration, err := p.state.DB.GetAdvancedMigration(ctx, advancedMigrationID)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("couldn't get advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+ state := AdvancedMigrationState{}
+ if advancedMigration != nil {
+ // There was a previous migration.
+ if *advancedMigration.Finished {
+ // This migration has already been run to completion; we don't need to run it again.
+ return nil
+ }
+ // Otherwise, pick up where we left off.
+ if err := json.Unmarshal(advancedMigration.StateJSON, &state); err != nil {
+ // This should never happen.
+ return gtserror.Newf("couldn't deserialize advanced migration state from JSON: %w", err)
+ }
+ } else {
+ // Start at the beginning.
+ state.MinID = id.Lowest
+
+ // Find the max ID of all existing statuses.
+ // This will be the last one we migrate;
+ // newer ones will be handled by the normal conversation flow.
+ state.MaxIDInclusive, err = p.state.DB.MaxDirectStatusID(ctx)
+ if err != nil {
+ return gtserror.Newf("couldn't get max DM status ID for migration: %w", err)
+ }
+
+ // Save a new advanced migration record.
+ advancedMigration = &gtsmodel.AdvancedMigration{
+ ID: advancedMigrationID,
+ Finished: util.Ptr(false),
+ }
+ if advancedMigration.StateJSON, err = json.Marshal(state); err != nil {
+ // This should never happen.
+ return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err)
+ }
+ if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
+ return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+ }
+
+ log.Info(ctx, "migrating DMs to conversations…")
+
+ // In batches, get all statuses up to and including the max ID,
+ // and update conversations for each in order.
+ for {
+ // Get status IDs for this batch.
+ statusIDs, err := p.state.DB.GetDirectStatusIDsBatch(ctx, state.MinID, state.MaxIDInclusive, statusBatchSize)
+ if err != nil {
+ return gtserror.Newf("couldn't get DM status ID batch for migration: %w", err)
+ }
+ if len(statusIDs) == 0 {
+ break
+ }
+ log.Infof(ctx, "migrating %d DMs starting after %s", len(statusIDs), state.MinID)
+
+ // Load the batch by IDs.
+ statuses, err := p.state.DB.GetStatusesByIDs(ctx, statusIDs)
+ if err != nil {
+ return gtserror.Newf("couldn't get DM statuses for migration: %w", err)
+ }
+
+ // Update conversations for each status. Don't generate notifications.
+ for _, status := range statuses {
+ if _, err := p.UpdateConversationsForStatus(ctx, status); err != nil {
+ return gtserror.Newf("couldn't update conversations for status %s during migration: %w", status.ID, err)
+ }
+ }
+
+ // Save the migration state with the new min ID.
+ state.MinID = statusIDs[len(statusIDs)-1]
+ if advancedMigration.StateJSON, err = json.Marshal(state); err != nil {
+ // This should never happen.
+ return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err)
+ }
+ if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
+ return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+ }
+
+ // Mark the migration as finished.
+ advancedMigration.Finished = util.Ptr(true)
+ if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil {
+ return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err)
+ }
+
+ log.Info(ctx, "finished migrating DMs to conversations.")
+ return nil
+}
diff --git a/internal/processing/conversations/migrate_test.go b/internal/processing/conversations/migrate_test.go
new file mode 100644
index 000000000..b625e59ba
--- /dev/null
+++ b/internal/processing/conversations/migrate_test.go
@@ -0,0 +1,85 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations_test
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/db/bundb"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+)
+
+// Test that we can migrate DMs to conversations.
+// This test assumes that we're using the standard test fixtures, which contain some conversation-eligible DMs.
+func (suite *ConversationsTestSuite) TestMigrateDMsToConversations() {
+ advancedMigrationID := "20240611190733_add_conversations"
+ ctx := context.Background()
+ rawDB := (suite.db).(*bundb.DBService).DB()
+
+ // Precondition: we shouldn't have any conversations yet.
+ numConversations := 0
+ if err := rawDB.NewSelect().
+ Model((*gtsmodel.Conversation)(nil)).
+ ColumnExpr("COUNT(*)").
+ Scan(ctx, &numConversations); // nocollapse
+ err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.Zero(numConversations)
+
+ // Precondition: there is no record of the conversations advanced migration.
+ _, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
+ suite.ErrorIs(err, db.ErrNoEntries)
+
+ // Run the migration, which should not fail.
+ if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // We should now have some conversations.
+ if err := rawDB.NewSelect().
+ Model((*gtsmodel.Conversation)(nil)).
+ ColumnExpr("COUNT(*)").
+ Scan(ctx, &numConversations); // nocollapse
+ err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.NotZero(numConversations)
+
+ // The advanced migration should now be marked as finished.
+ advancedMigration, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ if suite.NotNil(advancedMigration) && suite.NotNil(advancedMigration.Finished) {
+ suite.True(*advancedMigration.Finished)
+ }
+
+ // Run the migration again, which should not fail.
+ if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // However, it shouldn't have done anything, so the advanced migration should not have been updated.
+ advancedMigration2, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.Equal(advancedMigration.UpdatedAt, advancedMigration2.UpdatedAt)
+}
diff --git a/internal/processing/conversations/read.go b/internal/processing/conversations/read.go
new file mode 100644
index 000000000..512a004a3
--- /dev/null
+++ b/internal/processing/conversations/read.go
@@ -0,0 +1,65 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations
+
+import (
+ "context"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+func (p *Processor) Read(
+ ctx context.Context,
+ requestingAccount *gtsmodel.Account,
+ id string,
+) (*apimodel.Conversation, gtserror.WithCode) {
+ // Get the conversation, including participating accounts and last status.
+ conversation, errWithCode := p.getConversationOwnedBy(ctx, id, requestingAccount)
+ if errWithCode != nil {
+ return nil, errWithCode
+ }
+
+ // Mark the conversation as read.
+ conversation.Read = util.Ptr(true)
+ if err := p.state.DB.UpsertConversation(ctx, conversation, "read"); err != nil {
+ err = gtserror.Newf("DB error updating conversation %s: %w", id, err)
+ return nil, gtserror.NewErrorInternalError(err)
+ }
+
+ filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount)
+ if errWithCode != nil {
+ return nil, errWithCode
+ }
+
+ apiConversation, err := p.converter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ requestingAccount,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ err = gtserror.Newf("error converting conversation %s to API representation: %w", id, err)
+ return nil, gtserror.NewErrorInternalError(err)
+ }
+
+ return apiConversation, nil
+}
diff --git a/internal/processing/conversations/read_test.go b/internal/processing/conversations/read_test.go
new file mode 100644
index 000000000..ebd8f7fe5
--- /dev/null
+++ b/internal/processing/conversations/read_test.go
@@ -0,0 +1,34 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations_test
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+func (suite *ConversationsTestSuite) TestRead() {
+ conversation := suite.NewTestConversation(suite.testAccount, 0)
+
+ suite.False(util.PtrOrValue(conversation.Read, false))
+ apiConversation, err := suite.conversationsProcessor.Read(context.Background(), suite.testAccount, conversation.ID)
+ if suite.NoError(err) {
+ suite.False(apiConversation.Unread)
+ }
+}
diff --git a/internal/processing/conversations/update.go b/internal/processing/conversations/update.go
new file mode 100644
index 000000000..7445994ae
--- /dev/null
+++ b/internal/processing/conversations/update.go
@@ -0,0 +1,242 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations
+
+import (
+ "context"
+ "errors"
+
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+)
+
+// ConversationNotification carries the arguments to processing/stream.Processor.Conversation.
+type ConversationNotification struct {
+ // AccountID of a local account to deliver the notification to.
+ AccountID string
+ // Conversation as the notification payload.
+ Conversation *apimodel.Conversation
+}
+
+// UpdateConversationsForStatus updates all conversations related to a status,
+// and returns a map from local account IDs to conversation notifications that should be sent to them.
+func (p *Processor) UpdateConversationsForStatus(ctx context.Context, status *gtsmodel.Status) ([]ConversationNotification, error) {
+ if status.Visibility != gtsmodel.VisibilityDirect {
+ // Only DMs are considered part of conversations.
+ return nil, nil
+ }
+ if status.BoostOfID != "" {
+ // Boosts can't be part of conversations.
+ // FUTURE: This may change if we ever implement quote posts.
+ return nil, nil
+ }
+ if status.ThreadID == "" {
+ // If the status doesn't have a thread ID, it didn't mention a local account,
+ // and thus can't be part of a conversation.
+ return nil, nil
+ }
+
+ // We need accounts to be populated for this.
+ if err := p.state.DB.PopulateStatus(ctx, status); err != nil {
+ return nil, gtserror.Newf("DB error populating status %s: %w", status.ID, err)
+ }
+
+ // The account which authored the status plus all mentioned accounts.
+ allParticipantsSet := make(map[string]*gtsmodel.Account, 1+len(status.Mentions))
+ allParticipantsSet[status.AccountID] = status.Account
+ for _, mention := range status.Mentions {
+ allParticipantsSet[mention.TargetAccountID] = mention.TargetAccount
+ }
+
+ // Create or update conversations for and send notifications to each local participant.
+ notifications := make([]ConversationNotification, 0, len(allParticipantsSet))
+ for _, participant := range allParticipantsSet {
+ if participant.IsRemote() {
+ continue
+ }
+ localAccount := participant
+
+ // If the status is not visible to this account, skip processing it for this account.
+ visible, err := p.filter.StatusVisible(ctx, localAccount, status)
+ if err != nil {
+ log.Errorf(
+ ctx,
+ "error checking status %s visibility for account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ continue
+ } else if !visible {
+ continue
+ }
+
+ // Is the status filtered or muted for this user?
+ // Converting the status to an API status runs the filter/mute checks.
+ filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, localAccount)
+ if errWithCode != nil {
+ log.Error(ctx, errWithCode)
+ continue
+ }
+ _, err = p.converter.StatusToAPIStatus(
+ ctx,
+ status,
+ localAccount,
+ statusfilter.FilterContextNotifications,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ // If the status matched a hide filter, skip processing it for this account.
+ // If there was another kind of error, log that and skip it anyway.
+ if !errors.Is(err, statusfilter.ErrHideStatus) {
+ log.Errorf(
+ ctx,
+ "error checking status %s filtering/muting for account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ }
+ continue
+ }
+
+ // Collect other accounts participating in the conversation.
+ otherAccounts := make([]*gtsmodel.Account, 0, len(allParticipantsSet)-1)
+ otherAccountIDs := make([]string, 0, len(allParticipantsSet)-1)
+ for accountID, account := range allParticipantsSet {
+ if accountID != localAccount.ID {
+ otherAccounts = append(otherAccounts, account)
+ otherAccountIDs = append(otherAccountIDs, accountID)
+ }
+ }
+
+ // Check for a previously existing conversation, if there is one.
+ conversation, err := p.state.DB.GetConversationByThreadAndAccountIDs(
+ ctx,
+ status.ThreadID,
+ localAccount.ID,
+ otherAccountIDs,
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ log.Errorf(
+ ctx,
+ "error trying to find a previous conversation for status %s and account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ continue
+ }
+
+ if conversation == nil {
+ // Create a new conversation.
+ conversation = &gtsmodel.Conversation{
+ ID: id.NewULID(),
+ AccountID: localAccount.ID,
+ OtherAccountIDs: otherAccountIDs,
+ OtherAccounts: otherAccounts,
+ OtherAccountsKey: gtsmodel.ConversationOtherAccountsKey(otherAccountIDs),
+ ThreadID: status.ThreadID,
+ Read: util.Ptr(true),
+ }
+ }
+
+ // Assume that if the conversation owner posted the status, they've already read it.
+ statusAuthoredByConversationOwner := status.AccountID == conversation.AccountID
+
+ // Update the conversation.
+ // If there is no previous last status or this one is more recently created, set it as the last status.
+ if conversation.LastStatus == nil || conversation.LastStatus.CreatedAt.Before(status.CreatedAt) {
+ conversation.LastStatusID = status.ID
+ conversation.LastStatus = status
+ }
+ // If the conversation is unread, leave it marked as unread.
+ // If the conversation is read but this status might not have been, mark the conversation as unread.
+ if !statusAuthoredByConversationOwner {
+ conversation.Read = util.Ptr(false)
+ }
+
+ // Create or update the conversation.
+ err = p.state.DB.UpsertConversation(ctx, conversation)
+ if err != nil {
+ log.Errorf(
+ ctx,
+ "error creating or updating conversation %s for status %s and account %s: %v",
+ conversation.ID,
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ continue
+ }
+
+ // Link the conversation to the status.
+ if err := p.state.DB.LinkConversationToStatus(ctx, conversation.ID, status.ID); err != nil {
+ log.Errorf(
+ ctx,
+ "error linking conversation %s to status %s: %v",
+ conversation.ID,
+ status.ID,
+ err,
+ )
+ continue
+ }
+
+ // Convert the conversation to API representation.
+ apiConversation, err := p.converter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ localAccount,
+ filters,
+ mutes,
+ )
+ if err != nil {
+ // If the conversation's last status matched a hide filter, skip it.
+ // If there was another kind of error, log that and skip it anyway.
+ if !errors.Is(err, statusfilter.ErrHideStatus) {
+ log.Errorf(
+ ctx,
+ "error converting conversation %s to API representation for account %s: %v",
+ status.ID,
+ localAccount.ID,
+ err,
+ )
+ }
+ continue
+ }
+
+ // Generate a notification,
+ // unless the status was authored by the user who would be notified,
+ // in which case they already know.
+ if status.AccountID != localAccount.ID {
+ notifications = append(notifications, ConversationNotification{
+ AccountID: localAccount.ID,
+ Conversation: apiConversation,
+ })
+ }
+ }
+
+ return notifications, nil
+}
diff --git a/internal/processing/conversations/update_test.go b/internal/processing/conversations/update_test.go
new file mode 100644
index 000000000..8ba2800fe
--- /dev/null
+++ b/internal/processing/conversations/update_test.go
@@ -0,0 +1,54 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package conversations_test
+
+import (
+ "context"
+)
+
+// Test that we can create conversations when a new status comes in.
+func (suite *ConversationsTestSuite) TestUpdateConversationsForStatus() {
+ ctx := context.Background()
+
+ // Precondition: the test user shouldn't have any conversations yet.
+ conversations, err := suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.Empty(conversations)
+
+ // Create a status.
+ threadID := suite.NewULID(0)
+ status := suite.NewTestStatus(suite.testAccount, threadID, 0, nil)
+
+ // Update conversations for it.
+ notifications, err := suite.conversationsProcessor.UpdateConversationsForStatus(ctx, status)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // In this test, the user is DMing themself, and should not receive a notification from that.
+ suite.Empty(notifications)
+
+ // The test user should have a conversation now.
+ conversations, err = suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+ suite.NotEmpty(conversations)
+}
diff --git a/internal/processing/processor.go b/internal/processing/processor.go
index fb6b05d80..a07df76e1 100644
--- a/internal/processing/processor.go
+++ b/internal/processing/processor.go
@@ -27,7 +27,9 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/admin"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/advancedmigrations"
"github.com/superseriousbusiness/gotosocial/internal/processing/common"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/fedi"
filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1"
filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2"
@@ -70,22 +72,24 @@ type Processor struct {
SUB-PROCESSORS
*/
- account account.Processor
- admin admin.Processor
- fedi fedi.Processor
- filtersv1 filtersv1.Processor
- filtersv2 filtersv2.Processor
- list list.Processor
- markers markers.Processor
- media media.Processor
- polls polls.Processor
- report report.Processor
- search search.Processor
- status status.Processor
- stream stream.Processor
- timeline timeline.Processor
- user user.Processor
- workers workers.Processor
+ account account.Processor
+ admin admin.Processor
+ advancedmigrations advancedmigrations.Processor
+ conversations conversations.Processor
+ fedi fedi.Processor
+ filtersv1 filtersv1.Processor
+ filtersv2 filtersv2.Processor
+ list list.Processor
+ markers markers.Processor
+ media media.Processor
+ polls polls.Processor
+ report report.Processor
+ search search.Processor
+ status status.Processor
+ stream stream.Processor
+ timeline timeline.Processor
+ user user.Processor
+ workers workers.Processor
}
func (p *Processor) Account() *account.Processor {
@@ -96,6 +100,14 @@ func (p *Processor) Admin() *admin.Processor {
return &p.admin
}
+func (p *Processor) AdvancedMigrations() *advancedmigrations.Processor {
+ return &p.advancedmigrations
+}
+
+func (p *Processor) Conversations() *conversations.Processor {
+ return &p.conversations
+}
+
func (p *Processor) Fedi() *fedi.Processor {
return &p.fedi
}
@@ -188,6 +200,7 @@ func NewProcessor(
// processors + pin them to this struct.
processor.account = account.New(&common, state, converter, mediaManager, federator, filter, parseMentionFunc)
processor.admin = admin.New(&common, state, cleaner, federator, converter, mediaManager, federator.TransportController(), emailSender)
+ processor.conversations = conversations.New(state, converter, filter)
processor.fedi = fedi.New(state, &common, converter, federator, filter)
processor.filtersv1 = filtersv1.New(state, converter, &processor.stream)
processor.filtersv2 = filtersv2.New(state, converter, &processor.stream)
@@ -200,6 +213,9 @@ func NewProcessor(
processor.status = status.New(state, &common, &processor.polls, federator, converter, filter, parseMentionFunc)
processor.user = user.New(state, converter, oauthServer, emailSender)
+ // The advanced migrations processor sequences advanced migrations from all other processors.
+ processor.advancedmigrations = advancedmigrations.New(&processor.conversations)
+
// Workers processor handles asynchronous
// worker jobs; instantiate it separately
// and pass subset of sub processors it needs.
@@ -212,6 +228,7 @@ func NewProcessor(
&processor.account,
&processor.media,
&processor.stream,
+ &processor.conversations,
)
return processor
diff --git a/internal/processing/stream/conversation.go b/internal/processing/stream/conversation.go
new file mode 100644
index 000000000..a0236c459
--- /dev/null
+++ b/internal/processing/stream/conversation.go
@@ -0,0 +1,44 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package stream
+
+import (
+ "context"
+ "encoding/json"
+
+ "codeberg.org/gruf/go-byteutil"
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+)
+
+// Conversation streams the given conversation to any open, appropriate streams belonging to the given account.
+func (p *Processor) Conversation(ctx context.Context, accountID string, conversation *apimodel.Conversation) {
+ b, err := json.Marshal(conversation)
+ if err != nil {
+ log.Errorf(ctx, "error marshaling json: %v", err)
+ return
+ }
+ p.streams.Post(ctx, accountID, stream.Message{
+ Payload: byteutil.B2S(b),
+ Event: stream.EventTypeConversation,
+ Stream: []string{
+ stream.TimelineDirect,
+ },
+ })
+}
diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go
index 49a68d27a..35c2c31b7 100644
--- a/internal/processing/workers/fromclientapi_test.go
+++ b/internal/processing/workers/fromclientapi_test.go
@@ -50,6 +50,8 @@ func (suite *FromClientAPITestSuite) newStatus(
visibility gtsmodel.Visibility,
replyToStatus *gtsmodel.Status,
boostOfStatus *gtsmodel.Status,
+ mentionedAccounts []*gtsmodel.Account,
+ createThread bool,
) *gtsmodel.Status {
var (
protocol = config.GetProtocol()
@@ -102,6 +104,39 @@ func (suite *FromClientAPITestSuite) newStatus(
newStatus.Visibility = boostOfStatus.Visibility
}
+ for _, mentionedAccount := range mentionedAccounts {
+ newMention := &gtsmodel.Mention{
+ ID: id.NewULID(),
+ StatusID: newStatus.ID,
+ Status: newStatus,
+ OriginAccountID: account.ID,
+ OriginAccountURI: account.URI,
+ OriginAccount: account,
+ TargetAccountID: mentionedAccount.ID,
+ TargetAccount: mentionedAccount,
+ Silent: util.Ptr(false),
+ }
+
+ newStatus.Mentions = append(newStatus.Mentions, newMention)
+ newStatus.MentionIDs = append(newStatus.MentionIDs, newMention.ID)
+
+ if err := state.DB.PutMention(ctx, newMention); err != nil {
+ suite.FailNow(err.Error())
+ }
+ }
+
+ if createThread {
+ newThread := &gtsmodel.Thread{
+ ID: id.NewULID(),
+ }
+
+ newStatus.ThreadID = newThread.ID
+
+ if err := state.DB.PutThread(ctx, newThread); err != nil {
+ suite.FailNow(err.Error())
+ }
+ }
+
// Put the status in the db, to mimic what would
// have already happened earlier up the flow.
if err := state.DB.PutStatus(ctx, newStatus); err != nil {
@@ -168,6 +203,31 @@ func (suite *FromClientAPITestSuite) statusJSON(
return string(statusJSON)
}
+func (suite *FromClientAPITestSuite) conversationJSON(
+ ctx context.Context,
+ typeConverter *typeutils.Converter,
+ conversation *gtsmodel.Conversation,
+ requestingAccount *gtsmodel.Account,
+) string {
+ apiConversation, err := typeConverter.ConversationToAPIConversation(
+ ctx,
+ conversation,
+ requestingAccount,
+ nil,
+ nil,
+ )
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ conversationJSON, err := json.Marshal(apiConversation)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ return string(conversationJSON)
+}
+
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
@@ -194,6 +254,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
gtsmodel.VisibilityPublic,
nil,
nil,
+ nil,
+ false,
)
)
@@ -303,6 +365,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -362,6 +426,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() {
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_1_status_1"],
nil,
+ nil,
+ false,
)
threadMute = &gtsmodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE",
@@ -420,6 +486,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_1_status_1"],
+ nil,
+ false,
)
threadMute = &gtsmodel.ThreadMute{
ID: "01HD3KRMBB1M85QRWHD912QWRE",
@@ -483,6 +551,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -556,6 +626,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -634,6 +706,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
gtsmodel.VisibilityPublic,
suite.testStatuses["local_account_2_status_1"],
nil,
+ nil,
+ false,
)
)
@@ -704,6 +778,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_2_status_1"],
+ nil,
+ false,
)
)
@@ -765,6 +841,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
gtsmodel.VisibilityPublic,
nil,
suite.testStatuses["local_account_2_status_1"],
+ nil,
+ false,
)
)
@@ -807,6 +885,159 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
)
}
+// A DM to a local user should create a conversation and accompanying notification.
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() {
+ testStructs := suite.SetupTestStructs()
+ defer suite.TearDownTestStructs(testStructs)
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["local_account_2"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx,
+ testStructs.Processor,
+ receivingAccount,
+ nil,
+ )
+ homeStream = streams[stream.TimelineHome]
+ directStream = streams[stream.TimelineDirect]
+
+ // turtle posts a new top-level DM mentioning zork.
+ status = suite.newStatus(
+ ctx,
+ testStructs.State,
+ postingAccount,
+ gtsmodel.VisibilityDirect,
+ nil,
+ nil,
+ []*gtsmodel.Account{receivingAccount},
+ true,
+ )
+ )
+
+ // Process the new status.
+ if err := testStructs.Processor.Workers().ProcessFromClientAPI(
+ ctx,
+ &messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ Origin: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Locate the conversation which should now exist for zork.
+ conversation, err := testStructs.State.DB.GetConversationByThreadAndAccountIDs(
+ ctx,
+ status.ThreadID,
+ receivingAccount.ID,
+ []string{postingAccount.ID},
+ )
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check status in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
+ // Check mention notification in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeNotification,
+ )
+
+ // Check conversation in direct stream.
+ conversationJSON := suite.conversationJSON(
+ ctx,
+ testStructs.TypeConverter,
+ conversation,
+ receivingAccount,
+ )
+ suite.checkStreamed(
+ directStream,
+ true,
+ conversationJSON,
+ stream.EventTypeConversation,
+ )
+}
+
+// A public message to a local user should not result in a conversation notification.
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() {
+ testStructs := suite.SetupTestStructs()
+ defer suite.TearDownTestStructs(testStructs)
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["local_account_2"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx,
+ testStructs.Processor,
+ receivingAccount,
+ nil,
+ )
+ homeStream = streams[stream.TimelineHome]
+ directStream = streams[stream.TimelineDirect]
+
+ // turtle posts a new top-level public message mentioning zork.
+ status = suite.newStatus(
+ ctx,
+ testStructs.State,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ nil,
+ nil,
+ []*gtsmodel.Account{receivingAccount},
+ true,
+ )
+ )
+
+ // Process the new status.
+ if err := testStructs.Processor.Workers().ProcessFromClientAPI(
+ ctx,
+ &messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ Origin: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check status in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeUpdate,
+ )
+
+ // Check mention notification in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ "",
+ stream.EventTypeNotification,
+ )
+
+ // Check for absence of conversation notification in direct stream.
+ suite.checkStreamed(
+ directStream,
+ false,
+ "",
+ "",
+ )
+}
+
func (suite *FromClientAPITestSuite) TestProcessStatusDelete() {
testStructs := suite.SetupTestStructs()
defer suite.TearDownTestStructs(testStructs)
diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go
index 5ec905ae8..1a7dbbfe5 100644
--- a/internal/processing/workers/surface.go
+++ b/internal/processing/workers/surface.go
@@ -20,6 +20,7 @@ package workers
import (
"github.com/superseriousbusiness/gotosocial/internal/email"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
@@ -32,9 +33,10 @@ import (
// - sending a notification to a user
// - sending an email
type Surface struct {
- State *state.State
- Converter *typeutils.Converter
- Stream *stream.Processor
- Filter *visibility.Filter
- EmailSender email.Sender
+ State *state.State
+ Converter *typeutils.Converter
+ Stream *stream.Processor
+ Filter *visibility.Filter
+ EmailSender email.Sender
+ Conversations *conversations.Processor
}
diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go
index 18d0277ae..937ddeca2 100644
--- a/internal/processing/workers/surfacenotify_test.go
+++ b/internal/processing/workers/surfacenotify_test.go
@@ -39,11 +39,12 @@ func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() {
defer suite.TearDownTestStructs(testStructs)
surface := &workers.Surface{
- State: testStructs.State,
- Converter: testStructs.TypeConverter,
- Stream: testStructs.Processor.Stream(),
- Filter: visibility.NewFilter(testStructs.State),
- EmailSender: testStructs.EmailSender,
+ State: testStructs.State,
+ Converter: testStructs.TypeConverter,
+ Stream: testStructs.Processor.Stream(),
+ Filter: visibility.NewFilter(testStructs.State),
+ EmailSender: testStructs.EmailSender,
+ Conversations: testStructs.Processor.Conversations(),
}
var (
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
index 41d7f6f2a..8ac8293ed 100644
--- a/internal/processing/workers/surfacetimeline.go
+++ b/internal/processing/workers/surfacetimeline.go
@@ -36,8 +36,8 @@ import (
// and LIST timelines of accounts that follow the status author.
//
// It will also handle notifications for any mentions attached to
-// the account, and notifications for any local accounts that want
-// to know when this account posts.
+// the account, notifications for any local accounts that want
+// to know when this account posts, and conversations containing the status.
func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {
// Ensure status fully populated; including account, mentions, etc.
if err := s.State.DB.PopulateStatus(ctx, status); err != nil {
@@ -73,6 +73,15 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.
return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)
}
+ // Update any conversations containing this status, and send conversation notifications.
+ notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status)
+ if err != nil {
+ return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err)
+ }
+ for _, notification := range notifications {
+ s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation)
+ }
+
return nil
}
diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go
index 780e5ca14..915370976 100644
--- a/internal/processing/workers/util.go
+++ b/internal/processing/workers/util.go
@@ -137,6 +137,11 @@ func (u *utils) wipeStatus(
errs.Appendf("error deleting status from timelines: %w", err)
}
+ // delete this status from any conversations that it's part of
+ if err := u.state.DB.DeleteStatusFromConversations(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status from conversations: %w", err)
+ }
+
// finally, delete the status itself
if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil {
errs.Appendf("error deleting status: %w", err)
diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go
index 6b4cc07a6..c7f67b025 100644
--- a/internal/processing/workers/workers.go
+++ b/internal/processing/workers/workers.go
@@ -22,6 +22,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
"github.com/superseriousbusiness/gotosocial/internal/state"
@@ -44,6 +45,7 @@ func New(
account *account.Processor,
media *media.Processor,
stream *stream.Processor,
+ conversations *conversations.Processor,
) Processor {
// Init federate logic
// wrapper struct.
@@ -56,11 +58,12 @@ func New(
// Init surface logic
// wrapper struct.
surface := &Surface{
- State: state,
- Converter: converter,
- Stream: stream,
- Filter: filter,
- EmailSender: emailSender,
+ State: state,
+ Converter: converter,
+ Stream: stream,
+ Filter: filter,
+ EmailSender: emailSender,
+ Conversations: conversations,
}
// Init shared util funcs.
diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go
index f66190d75..3093fd93a 100644
--- a/internal/processing/workers/workers_test.go
+++ b/internal/processing/workers/workers_test.go
@@ -108,6 +108,7 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce
stream.TimelineHome,
stream.TimelinePublic,
stream.TimelineNotifications,
+ stream.TimelineDirect,
} {
stream, err := processor.Stream().Open(ctx, account, streamType)
if err != nil {