diff options
Diffstat (limited to 'internal/processing/conversations')
-rw-r--r-- | internal/processing/conversations/conversations.go | 126 | ||||
-rw-r--r-- | internal/processing/conversations/conversations_test.go | 151 | ||||
-rw-r--r-- | internal/processing/conversations/delete.go | 45 | ||||
-rw-r--r-- | internal/processing/conversations/delete_test.go | 27 | ||||
-rw-r--r-- | internal/processing/conversations/get.go | 101 | ||||
-rw-r--r-- | internal/processing/conversations/get_test.go | 65 | ||||
-rw-r--r-- | internal/processing/conversations/migrate.go | 131 | ||||
-rw-r--r-- | internal/processing/conversations/migrate_test.go | 85 | ||||
-rw-r--r-- | internal/processing/conversations/read.go | 65 | ||||
-rw-r--r-- | internal/processing/conversations/read_test.go | 34 | ||||
-rw-r--r-- | internal/processing/conversations/update.go | 242 | ||||
-rw-r--r-- | internal/processing/conversations/update_test.go | 54 |
12 files changed, 1126 insertions, 0 deletions
diff --git a/internal/processing/conversations/conversations.go b/internal/processing/conversations/conversations.go new file mode 100644 index 000000000..d95740605 --- /dev/null +++ b/internal/processing/conversations/conversations.go @@ -0,0 +1,126 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" + "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +type Processor struct { + state *state.State + converter *typeutils.Converter + filter *visibility.Filter +} + +func New( + state *state.State, + converter *typeutils.Converter, + filter *visibility.Filter, +) Processor { + return Processor{ + state: state, + converter: converter, + filter: filter, + } +} + +const conversationNotFoundHelpText = "conversation not found" + +// getConversationOwnedBy gets a conversation by ID and checks that it is owned by the given account. +func (p *Processor) getConversationOwnedBy( + ctx context.Context, + id string, + requestingAccount *gtsmodel.Account, +) (*gtsmodel.Conversation, gtserror.WithCode) { + // Get the conversation so that we can check its owning account ID. + conversation, err := p.state.DB.GetConversationByID(ctx, id) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting conversation %s for account %s: %w", + id, + requestingAccount.ID, + err, + ), + ) + } + if conversation == nil { + return nil, gtserror.NewErrorNotFound( + gtserror.Newf( + "conversation %s not found: %w", + id, + err, + ), + conversationNotFoundHelpText, + ) + } + if conversation.AccountID != requestingAccount.ID { + return nil, gtserror.NewErrorNotFound( + gtserror.Newf( + "conversation %s not owned by account %s: %w", + id, + requestingAccount.ID, + err, + ), + conversationNotFoundHelpText, + ) + } + + return conversation, nil +} + +// getFiltersAndMutes gets the given account's filters and compiled mute list. +func (p *Processor) getFiltersAndMutes( + ctx context.Context, + requestingAccount *gtsmodel.Account, +) ([]*gtsmodel.Filter, *usermute.CompiledUserMuteList, gtserror.WithCode) { + filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) + if err != nil { + return nil, nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting filters for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + + mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) + if err != nil { + return nil, nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting mutes for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + compiledMutes := usermute.NewCompiledUserMuteList(mutes) + + return filters, compiledMutes, nil +} diff --git a/internal/processing/conversations/conversations_test.go b/internal/processing/conversations/conversations_test.go new file mode 100644 index 000000000..cc7ec617e --- /dev/null +++ b/internal/processing/conversations/conversations_test.go @@ -0,0 +1,151 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/db" + dbtest "github.com/superseriousbusiness/gotosocial/internal/db/test" + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/media" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" + "github.com/superseriousbusiness/gotosocial/internal/transport" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type ConversationsTestSuite struct { + // standard suite interfaces + suite.Suite + db db.DB + tc *typeutils.Converter + storage *storage.Driver + state state.State + mediaManager *media.Manager + transportController transport.Controller + federator *federation.Federator + emailSender email.Sender + sentEmails map[string]string + filter *visibility.Filter + + // standard suite models + testTokens map[string]*gtsmodel.Token + testClients map[string]*gtsmodel.Client + testApplications map[string]*gtsmodel.Application + testUsers map[string]*gtsmodel.User + testAccounts map[string]*gtsmodel.Account + testFollows map[string]*gtsmodel.Follow + testAttachments map[string]*gtsmodel.MediaAttachment + testStatuses map[string]*gtsmodel.Status + + // module being tested + conversationsProcessor conversations.Processor + + // Owner of test conversations + testAccount *gtsmodel.Account + + // Mixin for conversation tests + dbtest.ConversationFactory +} + +func (suite *ConversationsTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) { + ctx := context.Background() + ctx, cncl := context.WithTimeout(ctx, timeout) + defer cncl() + return suite.state.Workers.Client.Queue.PopCtx(ctx) +} + +func (suite *ConversationsTestSuite) SetupSuite() { + suite.testTokens = testrig.NewTestTokens() + suite.testClients = testrig.NewTestClients() + suite.testApplications = testrig.NewTestApplications() + suite.testUsers = testrig.NewTestUsers() + suite.testAccounts = testrig.NewTestAccounts() + suite.testFollows = testrig.NewTestFollows() + suite.testAttachments = testrig.NewTestAttachments() + suite.testStatuses = testrig.NewTestStatuses() + + suite.ConversationFactory.SetupSuite(suite) +} + +func (suite *ConversationsTestSuite) SetupTest() { + suite.state.Caches.Init() + testrig.StartNoopWorkers(&suite.state) + + testrig.InitTestConfig() + testrig.InitTestLog() + + suite.db = testrig.NewTestDB(&suite.state) + suite.state.DB = suite.db + suite.tc = typeutils.NewConverter(&suite.state) + suite.filter = visibility.NewFilter(&suite.state) + + testrig.StartTimelines( + &suite.state, + suite.filter, + suite.tc, + ) + + suite.storage = testrig.NewInMemoryStorage() + suite.state.Storage = suite.storage + suite.mediaManager = testrig.NewTestMediaManager(&suite.state) + + suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media")) + suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) + suite.sentEmails = make(map[string]string) + suite.emailSender = testrig.NewEmailSender("../../../web/template/", suite.sentEmails) + + suite.conversationsProcessor = conversations.New(&suite.state, suite.tc, suite.filter) + testrig.StandardDBSetup(suite.db, nil) + testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") + + suite.ConversationFactory.SetupTest(suite.db) + + suite.testAccount = suite.testAccounts["local_account_1"] +} + +func (suite *ConversationsTestSuite) TearDownTest() { + conversationModels := []interface{}{ + (*gtsmodel.Conversation)(nil), + (*gtsmodel.ConversationToStatus)(nil), + } + for _, model := range conversationModels { + if err := suite.db.DropTable(context.Background(), model); err != nil { + log.Error(context.Background(), err) + } + } + + testrig.StandardDBTeardown(suite.db) + testrig.StandardStorageTeardown(suite.storage) + testrig.StopWorkers(&suite.state) +} + +func TestConversationsTestSuite(t *testing.T) { + suite.Run(t, new(ConversationsTestSuite)) +} diff --git a/internal/processing/conversations/delete.go b/internal/processing/conversations/delete.go new file mode 100644 index 000000000..5cbdd00a5 --- /dev/null +++ b/internal/processing/conversations/delete.go @@ -0,0 +1,45 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *Processor) Delete( + ctx context.Context, + requestingAccount *gtsmodel.Account, + id string, +) gtserror.WithCode { + // Get the conversation so that we can check its owning account ID. + conversation, errWithCode := p.getConversationOwnedBy(gtscontext.SetBarebones(ctx), id, requestingAccount) + if errWithCode != nil { + return errWithCode + } + + // Delete the conversation. + if err := p.state.DB.DeleteConversationByID(ctx, conversation.ID); err != nil { + return gtserror.NewErrorInternalError(err) + } + + return nil +} diff --git a/internal/processing/conversations/delete_test.go b/internal/processing/conversations/delete_test.go new file mode 100644 index 000000000..23b4f1c1a --- /dev/null +++ b/internal/processing/conversations/delete_test.go @@ -0,0 +1,27 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import "context" + +func (suite *ConversationsTestSuite) TestDelete() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + err := suite.conversationsProcessor.Delete(context.Background(), suite.testAccount, conversation.ID) + suite.NoError(err) +} diff --git a/internal/processing/conversations/get.go b/internal/processing/conversations/get.go new file mode 100644 index 000000000..0c7832cae --- /dev/null +++ b/internal/processing/conversations/get.go @@ -0,0 +1,101 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// GetAll returns conversations owned by the given account. +// The additional parameters can be used for paging. +func (p *Processor) GetAll( + ctx context.Context, + requestingAccount *gtsmodel.Account, + page *paging.Page, +) (*apimodel.PageableResponse, gtserror.WithCode) { + conversations, err := p.state.DB.GetConversationsByOwnerAccountID( + ctx, + requestingAccount.ID, + page, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting conversations for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + + // Check for empty response. + count := len(conversations) + if len(conversations) == 0 { + return util.EmptyPageableResponse(), nil + } + + // Get the lowest and highest last status ID values, used for paging. + lo := conversations[count-1].LastStatusID + hi := conversations[0].LastStatusID + + items := make([]interface{}, 0, count) + + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + for _, conversation := range conversations { + // Convert conversation to frontend API model. + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + filters, + mutes, + ) + if err != nil { + log.Errorf( + ctx, + "error converting conversation %s to API representation: %v", + conversation.ID, + err, + ) + continue + } + + // Append conversation to return items. + items = append(items, apiConversation) + } + + return paging.PackageResponse(paging.ResponseParams{ + Items: items, + Path: "/api/v1/conversations", + Next: page.Next(lo, hi), + Prev: page.Prev(lo, hi), + }), nil +} diff --git a/internal/processing/conversations/get_test.go b/internal/processing/conversations/get_test.go new file mode 100644 index 000000000..7b3d60749 --- /dev/null +++ b/internal/processing/conversations/get_test.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + "time" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +) + +func (suite *ConversationsTestSuite) TestGetAll() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) + if suite.NoError(err) && suite.Len(resp.Items, 1) && suite.IsType((*apimodel.Conversation)(nil), resp.Items[0]) { + apiConversation := resp.Items[0].(*apimodel.Conversation) + suite.Equal(conversation.ID, apiConversation.ID) + suite.True(apiConversation.Unread) + } +} + +// Test that conversations with newer last status IDs are returned earlier. +func (suite *ConversationsTestSuite) TestGetAllOrder() { + // Create a new conversation. + conversation1 := suite.NewTestConversation(suite.testAccount, 0) + + // Create another new conversation with a last status newer than conversation1's. + conversation2 := suite.NewTestConversation(suite.testAccount, 1*time.Second) + + // Add an even newer status than that to conversation1. + conversation1Status2 := suite.NewTestStatus(suite.testAccount, conversation1.LastStatus.ThreadID, 2*time.Second, conversation1.LastStatus) + conversation1.LastStatusID = conversation1Status2.ID + if err := suite.db.UpsertConversation(context.Background(), conversation1, "last_status_id"); err != nil { + suite.FailNow(err.Error()) + } + + resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) + if suite.NoError(err) && suite.Len(resp.Items, 2) { + // conversation1 should be the first conversation returned. + apiConversation1 := resp.Items[0].(*apimodel.Conversation) + suite.Equal(conversation1.ID, apiConversation1.ID) + // It should have the newest status added to it. + suite.Equal(conversation1.LastStatusID, conversation1Status2.ID) + + // conversation2 should be the second conversation returned. + apiConversation2 := resp.Items[1].(*apimodel.Conversation) + suite.Equal(conversation2.ID, apiConversation2.ID) + } +} diff --git a/internal/processing/conversations/migrate.go b/internal/processing/conversations/migrate.go new file mode 100644 index 000000000..959ffcca4 --- /dev/null +++ b/internal/processing/conversations/migrate.go @@ -0,0 +1,131 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "encoding/json" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +const advancedMigrationID = "20240611190733_add_conversations" +const statusBatchSize = 100 + +type AdvancedMigrationState struct { + MinID string + MaxIDInclusive string +} + +func (p *Processor) MigrateDMsToConversations(ctx context.Context) error { + advancedMigration, err := p.state.DB.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("couldn't get advanced migration with ID %s: %w", advancedMigrationID, err) + } + state := AdvancedMigrationState{} + if advancedMigration != nil { + // There was a previous migration. + if *advancedMigration.Finished { + // This migration has already been run to completion; we don't need to run it again. + return nil + } + // Otherwise, pick up where we left off. + if err := json.Unmarshal(advancedMigration.StateJSON, &state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't deserialize advanced migration state from JSON: %w", err) + } + } else { + // Start at the beginning. + state.MinID = id.Lowest + + // Find the max ID of all existing statuses. + // This will be the last one we migrate; + // newer ones will be handled by the normal conversation flow. + state.MaxIDInclusive, err = p.state.DB.MaxDirectStatusID(ctx) + if err != nil { + return gtserror.Newf("couldn't get max DM status ID for migration: %w", err) + } + + // Save a new advanced migration record. + advancedMigration = >smodel.AdvancedMigration{ + ID: advancedMigrationID, + Finished: util.Ptr(false), + } + if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) + } + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + } + + log.Info(ctx, "migrating DMs to conversations…") + + // In batches, get all statuses up to and including the max ID, + // and update conversations for each in order. + for { + // Get status IDs for this batch. + statusIDs, err := p.state.DB.GetDirectStatusIDsBatch(ctx, state.MinID, state.MaxIDInclusive, statusBatchSize) + if err != nil { + return gtserror.Newf("couldn't get DM status ID batch for migration: %w", err) + } + if len(statusIDs) == 0 { + break + } + log.Infof(ctx, "migrating %d DMs starting after %s", len(statusIDs), state.MinID) + + // Load the batch by IDs. + statuses, err := p.state.DB.GetStatusesByIDs(ctx, statusIDs) + if err != nil { + return gtserror.Newf("couldn't get DM statuses for migration: %w", err) + } + + // Update conversations for each status. Don't generate notifications. + for _, status := range statuses { + if _, err := p.UpdateConversationsForStatus(ctx, status); err != nil { + return gtserror.Newf("couldn't update conversations for status %s during migration: %w", status.ID, err) + } + } + + // Save the migration state with the new min ID. + state.MinID = statusIDs[len(statusIDs)-1] + if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) + } + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + } + + // Mark the migration as finished. + advancedMigration.Finished = util.Ptr(true) + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + + log.Info(ctx, "finished migrating DMs to conversations.") + return nil +} diff --git a/internal/processing/conversations/migrate_test.go b/internal/processing/conversations/migrate_test.go new file mode 100644 index 000000000..b625e59ba --- /dev/null +++ b/internal/processing/conversations/migrate_test.go @@ -0,0 +1,85 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/db/bundb" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// Test that we can migrate DMs to conversations. +// This test assumes that we're using the standard test fixtures, which contain some conversation-eligible DMs. +func (suite *ConversationsTestSuite) TestMigrateDMsToConversations() { + advancedMigrationID := "20240611190733_add_conversations" + ctx := context.Background() + rawDB := (suite.db).(*bundb.DBService).DB() + + // Precondition: we shouldn't have any conversations yet. + numConversations := 0 + if err := rawDB.NewSelect(). + Model((*gtsmodel.Conversation)(nil)). + ColumnExpr("COUNT(*)"). + Scan(ctx, &numConversations); // nocollapse + err != nil { + suite.FailNow(err.Error()) + } + suite.Zero(numConversations) + + // Precondition: there is no record of the conversations advanced migration. + _, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + suite.ErrorIs(err, db.ErrNoEntries) + + // Run the migration, which should not fail. + if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { + suite.FailNow(err.Error()) + } + + // We should now have some conversations. + if err := rawDB.NewSelect(). + Model((*gtsmodel.Conversation)(nil)). + ColumnExpr("COUNT(*)"). + Scan(ctx, &numConversations); // nocollapse + err != nil { + suite.FailNow(err.Error()) + } + suite.NotZero(numConversations) + + // The advanced migration should now be marked as finished. + advancedMigration, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil { + suite.FailNow(err.Error()) + } + if suite.NotNil(advancedMigration) && suite.NotNil(advancedMigration.Finished) { + suite.True(*advancedMigration.Finished) + } + + // Run the migration again, which should not fail. + if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { + suite.FailNow(err.Error()) + } + + // However, it shouldn't have done anything, so the advanced migration should not have been updated. + advancedMigration2, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil { + suite.FailNow(err.Error()) + } + suite.Equal(advancedMigration.UpdatedAt, advancedMigration2.UpdatedAt) +} diff --git a/internal/processing/conversations/read.go b/internal/processing/conversations/read.go new file mode 100644 index 000000000..512a004a3 --- /dev/null +++ b/internal/processing/conversations/read.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (p *Processor) Read( + ctx context.Context, + requestingAccount *gtsmodel.Account, + id string, +) (*apimodel.Conversation, gtserror.WithCode) { + // Get the conversation, including participating accounts and last status. + conversation, errWithCode := p.getConversationOwnedBy(ctx, id, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + // Mark the conversation as read. + conversation.Read = util.Ptr(true) + if err := p.state.DB.UpsertConversation(ctx, conversation, "read"); err != nil { + err = gtserror.Newf("DB error updating conversation %s: %w", id, err) + return nil, gtserror.NewErrorInternalError(err) + } + + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + filters, + mutes, + ) + if err != nil { + err = gtserror.Newf("error converting conversation %s to API representation: %w", id, err) + return nil, gtserror.NewErrorInternalError(err) + } + + return apiConversation, nil +} diff --git a/internal/processing/conversations/read_test.go b/internal/processing/conversations/read_test.go new file mode 100644 index 000000000..ebd8f7fe5 --- /dev/null +++ b/internal/processing/conversations/read_test.go @@ -0,0 +1,34 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (suite *ConversationsTestSuite) TestRead() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + suite.False(util.PtrOrValue(conversation.Read, false)) + apiConversation, err := suite.conversationsProcessor.Read(context.Background(), suite.testAccount, conversation.ID) + if suite.NoError(err) { + suite.False(apiConversation.Unread) + } +} diff --git a/internal/processing/conversations/update.go b/internal/processing/conversations/update.go new file mode 100644 index 000000000..7445994ae --- /dev/null +++ b/internal/processing/conversations/update.go @@ -0,0 +1,242 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// ConversationNotification carries the arguments to processing/stream.Processor.Conversation. +type ConversationNotification struct { + // AccountID of a local account to deliver the notification to. + AccountID string + // Conversation as the notification payload. + Conversation *apimodel.Conversation +} + +// UpdateConversationsForStatus updates all conversations related to a status, +// and returns a map from local account IDs to conversation notifications that should be sent to them. +func (p *Processor) UpdateConversationsForStatus(ctx context.Context, status *gtsmodel.Status) ([]ConversationNotification, error) { + if status.Visibility != gtsmodel.VisibilityDirect { + // Only DMs are considered part of conversations. + return nil, nil + } + if status.BoostOfID != "" { + // Boosts can't be part of conversations. + // FUTURE: This may change if we ever implement quote posts. + return nil, nil + } + if status.ThreadID == "" { + // If the status doesn't have a thread ID, it didn't mention a local account, + // and thus can't be part of a conversation. + return nil, nil + } + + // We need accounts to be populated for this. + if err := p.state.DB.PopulateStatus(ctx, status); err != nil { + return nil, gtserror.Newf("DB error populating status %s: %w", status.ID, err) + } + + // The account which authored the status plus all mentioned accounts. + allParticipantsSet := make(map[string]*gtsmodel.Account, 1+len(status.Mentions)) + allParticipantsSet[status.AccountID] = status.Account + for _, mention := range status.Mentions { + allParticipantsSet[mention.TargetAccountID] = mention.TargetAccount + } + + // Create or update conversations for and send notifications to each local participant. + notifications := make([]ConversationNotification, 0, len(allParticipantsSet)) + for _, participant := range allParticipantsSet { + if participant.IsRemote() { + continue + } + localAccount := participant + + // If the status is not visible to this account, skip processing it for this account. + visible, err := p.filter.StatusVisible(ctx, localAccount, status) + if err != nil { + log.Errorf( + ctx, + "error checking status %s visibility for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + continue + } else if !visible { + continue + } + + // Is the status filtered or muted for this user? + // Converting the status to an API status runs the filter/mute checks. + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, localAccount) + if errWithCode != nil { + log.Error(ctx, errWithCode) + continue + } + _, err = p.converter.StatusToAPIStatus( + ctx, + status, + localAccount, + statusfilter.FilterContextNotifications, + filters, + mutes, + ) + if err != nil { + // If the status matched a hide filter, skip processing it for this account. + // If there was another kind of error, log that and skip it anyway. + if !errors.Is(err, statusfilter.ErrHideStatus) { + log.Errorf( + ctx, + "error checking status %s filtering/muting for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + } + continue + } + + // Collect other accounts participating in the conversation. + otherAccounts := make([]*gtsmodel.Account, 0, len(allParticipantsSet)-1) + otherAccountIDs := make([]string, 0, len(allParticipantsSet)-1) + for accountID, account := range allParticipantsSet { + if accountID != localAccount.ID { + otherAccounts = append(otherAccounts, account) + otherAccountIDs = append(otherAccountIDs, accountID) + } + } + + // Check for a previously existing conversation, if there is one. + conversation, err := p.state.DB.GetConversationByThreadAndAccountIDs( + ctx, + status.ThreadID, + localAccount.ID, + otherAccountIDs, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + log.Errorf( + ctx, + "error trying to find a previous conversation for status %s and account %s: %v", + status.ID, + localAccount.ID, + err, + ) + continue + } + + if conversation == nil { + // Create a new conversation. + conversation = >smodel.Conversation{ + ID: id.NewULID(), + AccountID: localAccount.ID, + OtherAccountIDs: otherAccountIDs, + OtherAccounts: otherAccounts, + OtherAccountsKey: gtsmodel.ConversationOtherAccountsKey(otherAccountIDs), + ThreadID: status.ThreadID, + Read: util.Ptr(true), + } + } + + // Assume that if the conversation owner posted the status, they've already read it. + statusAuthoredByConversationOwner := status.AccountID == conversation.AccountID + + // Update the conversation. + // If there is no previous last status or this one is more recently created, set it as the last status. + if conversation.LastStatus == nil || conversation.LastStatus.CreatedAt.Before(status.CreatedAt) { + conversation.LastStatusID = status.ID + conversation.LastStatus = status + } + // If the conversation is unread, leave it marked as unread. + // If the conversation is read but this status might not have been, mark the conversation as unread. + if !statusAuthoredByConversationOwner { + conversation.Read = util.Ptr(false) + } + + // Create or update the conversation. + err = p.state.DB.UpsertConversation(ctx, conversation) + if err != nil { + log.Errorf( + ctx, + "error creating or updating conversation %s for status %s and account %s: %v", + conversation.ID, + status.ID, + localAccount.ID, + err, + ) + continue + } + + // Link the conversation to the status. + if err := p.state.DB.LinkConversationToStatus(ctx, conversation.ID, status.ID); err != nil { + log.Errorf( + ctx, + "error linking conversation %s to status %s: %v", + conversation.ID, + status.ID, + err, + ) + continue + } + + // Convert the conversation to API representation. + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + localAccount, + filters, + mutes, + ) + if err != nil { + // If the conversation's last status matched a hide filter, skip it. + // If there was another kind of error, log that and skip it anyway. + if !errors.Is(err, statusfilter.ErrHideStatus) { + log.Errorf( + ctx, + "error converting conversation %s to API representation for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + } + continue + } + + // Generate a notification, + // unless the status was authored by the user who would be notified, + // in which case they already know. + if status.AccountID != localAccount.ID { + notifications = append(notifications, ConversationNotification{ + AccountID: localAccount.ID, + Conversation: apiConversation, + }) + } + } + + return notifications, nil +} diff --git a/internal/processing/conversations/update_test.go b/internal/processing/conversations/update_test.go new file mode 100644 index 000000000..8ba2800fe --- /dev/null +++ b/internal/processing/conversations/update_test.go @@ -0,0 +1,54 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" +) + +// Test that we can create conversations when a new status comes in. +func (suite *ConversationsTestSuite) TestUpdateConversationsForStatus() { + ctx := context.Background() + + // Precondition: the test user shouldn't have any conversations yet. + conversations, err := suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) + if err != nil { + suite.FailNow(err.Error()) + } + suite.Empty(conversations) + + // Create a status. + threadID := suite.NewULID(0) + status := suite.NewTestStatus(suite.testAccount, threadID, 0, nil) + + // Update conversations for it. + notifications, err := suite.conversationsProcessor.UpdateConversationsForStatus(ctx, status) + if err != nil { + suite.FailNow(err.Error()) + } + + // In this test, the user is DMing themself, and should not receive a notification from that. + suite.Empty(notifications) + + // The test user should have a conversation now. + conversations, err = suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) + if err != nil { + suite.FailNow(err.Error()) + } + suite.NotEmpty(conversations) +} |