diff options
author | 2024-07-23 12:44:31 -0700 | |
---|---|---|
committer | 2024-07-23 20:44:31 +0100 | |
commit | 8fdd358f4b4428b33df4afd672ed070032d46e48 (patch) | |
tree | 92ec4bcbda633878a468e396b968656dbf33ef59 /internal/processing | |
parent | [bugfix] media.Processor{}.GetFile() returning 404s on first call, correctly ... (diff) | |
download | gotosocial-8fdd358f4b4428b33df4afd672ed070032d46e48.tar.xz |
[feature] Conversations API (#3013)
* Implement conversations API
* Sort and page conversations by last status ID
* Appease linter
* Fix deleting conversations and statuses
* Refactor to make migrations automatic
* Lint
* Update tests post-merge
* Fixes from live-fire testing
* Linter caught a format problem
* Refactor tests, fix cache
* Negative test for non-DMs
* Run conversations advanced migration on testrig startup as well as regular server startup
* Document (lack of) side effects of API method for deleting a conversation
* Make not-found check less nested for readability
* Rename PutConversation to UpsertConversation
* Use util.Ptr instead of IIFE
* Reduce cache used by conversations
* Remove unnecessary TableExpr/ColumnExpr
* Use struct tags for both unique constraints on Conversation
* Make it clear how paging with GetDirectStatusIDsBatch should be used
* Let conversation paging skip conversations it can't render
* Use Bun NewDropTable
* Convert delete raw query to Bun
* Convert update raw query to Bun
* Convert latestConversationStatusesTempTable raw query partially to Bun
* Convert conversationStatusesTempTable raw query partially to Bun
* Rename field used to store result of MaxDirectStatusID
* Move advanced migrations to their own tiny processor
* Catch up util function name with main
* Remove json.… wrappers
* Remove redundant check
* Combine error checks
* Replace map with slice of structs
* Address processor/type converter comments
- Add context info for errors
- Extract some common processor code into shared methods
- Move conversation eligibility check ahead of populating conversation
* Add error context when dropping temp tables
Diffstat (limited to 'internal/processing')
23 files changed, 1528 insertions, 33 deletions
diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go index 075e94544..702b46cda 100644 --- a/internal/processing/account/delete.go +++ b/internal/processing/account/delete.go @@ -460,6 +460,14 @@ func (p *Processor) deleteAccountPeripheral(ctx context.Context, account *gtsmod // TODO: add status mutes here when they're implemented. + // Delete all conversations owned by given account. + // Conversations in which it has only participated will be retained; + // they can always be deleted by their owners. + if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse + err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("error deleting conversations owned by account: %w", err) + } + // Delete all poll votes owned by given account. if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse err != nil && !errors.Is(err, db.ErrNoEntries) { diff --git a/internal/processing/advancedmigrations/advancedmigrations.go b/internal/processing/advancedmigrations/advancedmigrations.go new file mode 100644 index 000000000..3f1876539 --- /dev/null +++ b/internal/processing/advancedmigrations/advancedmigrations.go @@ -0,0 +1,48 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package advancedmigrations + +import ( + "context" + "fmt" + + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" +) + +// Processor holds references to any other processor that has migrations to run. +type Processor struct { + conversations *conversations.Processor +} + +func New( + conversations *conversations.Processor, +) Processor { + return Processor{ + conversations: conversations, + } +} + +// Migrate runs all advanced migrations. +// Errors should be in the same format thrown by other server or testrig startup failures. +func (p *Processor) Migrate(ctx context.Context) error { + if err := p.conversations.MigrateDMsToConversations(ctx); err != nil { + return fmt.Errorf("error running conversations advanced migration: %w", err) + } + + return nil +} diff --git a/internal/processing/conversations/conversations.go b/internal/processing/conversations/conversations.go new file mode 100644 index 000000000..d95740605 --- /dev/null +++ b/internal/processing/conversations/conversations.go @@ -0,0 +1,126 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/filter/usermute" + "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +type Processor struct { + state *state.State + converter *typeutils.Converter + filter *visibility.Filter +} + +func New( + state *state.State, + converter *typeutils.Converter, + filter *visibility.Filter, +) Processor { + return Processor{ + state: state, + converter: converter, + filter: filter, + } +} + +const conversationNotFoundHelpText = "conversation not found" + +// getConversationOwnedBy gets a conversation by ID and checks that it is owned by the given account. +func (p *Processor) getConversationOwnedBy( + ctx context.Context, + id string, + requestingAccount *gtsmodel.Account, +) (*gtsmodel.Conversation, gtserror.WithCode) { + // Get the conversation so that we can check its owning account ID. + conversation, err := p.state.DB.GetConversationByID(ctx, id) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting conversation %s for account %s: %w", + id, + requestingAccount.ID, + err, + ), + ) + } + if conversation == nil { + return nil, gtserror.NewErrorNotFound( + gtserror.Newf( + "conversation %s not found: %w", + id, + err, + ), + conversationNotFoundHelpText, + ) + } + if conversation.AccountID != requestingAccount.ID { + return nil, gtserror.NewErrorNotFound( + gtserror.Newf( + "conversation %s not owned by account %s: %w", + id, + requestingAccount.ID, + err, + ), + conversationNotFoundHelpText, + ) + } + + return conversation, nil +} + +// getFiltersAndMutes gets the given account's filters and compiled mute list. +func (p *Processor) getFiltersAndMutes( + ctx context.Context, + requestingAccount *gtsmodel.Account, +) ([]*gtsmodel.Filter, *usermute.CompiledUserMuteList, gtserror.WithCode) { + filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) + if err != nil { + return nil, nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting filters for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + + mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) + if err != nil { + return nil, nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting mutes for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + compiledMutes := usermute.NewCompiledUserMuteList(mutes) + + return filters, compiledMutes, nil +} diff --git a/internal/processing/conversations/conversations_test.go b/internal/processing/conversations/conversations_test.go new file mode 100644 index 000000000..cc7ec617e --- /dev/null +++ b/internal/processing/conversations/conversations_test.go @@ -0,0 +1,151 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/db" + dbtest "github.com/superseriousbusiness/gotosocial/internal/db/test" + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/media" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" + "github.com/superseriousbusiness/gotosocial/internal/transport" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type ConversationsTestSuite struct { + // standard suite interfaces + suite.Suite + db db.DB + tc *typeutils.Converter + storage *storage.Driver + state state.State + mediaManager *media.Manager + transportController transport.Controller + federator *federation.Federator + emailSender email.Sender + sentEmails map[string]string + filter *visibility.Filter + + // standard suite models + testTokens map[string]*gtsmodel.Token + testClients map[string]*gtsmodel.Client + testApplications map[string]*gtsmodel.Application + testUsers map[string]*gtsmodel.User + testAccounts map[string]*gtsmodel.Account + testFollows map[string]*gtsmodel.Follow + testAttachments map[string]*gtsmodel.MediaAttachment + testStatuses map[string]*gtsmodel.Status + + // module being tested + conversationsProcessor conversations.Processor + + // Owner of test conversations + testAccount *gtsmodel.Account + + // Mixin for conversation tests + dbtest.ConversationFactory +} + +func (suite *ConversationsTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) { + ctx := context.Background() + ctx, cncl := context.WithTimeout(ctx, timeout) + defer cncl() + return suite.state.Workers.Client.Queue.PopCtx(ctx) +} + +func (suite *ConversationsTestSuite) SetupSuite() { + suite.testTokens = testrig.NewTestTokens() + suite.testClients = testrig.NewTestClients() + suite.testApplications = testrig.NewTestApplications() + suite.testUsers = testrig.NewTestUsers() + suite.testAccounts = testrig.NewTestAccounts() + suite.testFollows = testrig.NewTestFollows() + suite.testAttachments = testrig.NewTestAttachments() + suite.testStatuses = testrig.NewTestStatuses() + + suite.ConversationFactory.SetupSuite(suite) +} + +func (suite *ConversationsTestSuite) SetupTest() { + suite.state.Caches.Init() + testrig.StartNoopWorkers(&suite.state) + + testrig.InitTestConfig() + testrig.InitTestLog() + + suite.db = testrig.NewTestDB(&suite.state) + suite.state.DB = suite.db + suite.tc = typeutils.NewConverter(&suite.state) + suite.filter = visibility.NewFilter(&suite.state) + + testrig.StartTimelines( + &suite.state, + suite.filter, + suite.tc, + ) + + suite.storage = testrig.NewInMemoryStorage() + suite.state.Storage = suite.storage + suite.mediaManager = testrig.NewTestMediaManager(&suite.state) + + suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media")) + suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) + suite.sentEmails = make(map[string]string) + suite.emailSender = testrig.NewEmailSender("../../../web/template/", suite.sentEmails) + + suite.conversationsProcessor = conversations.New(&suite.state, suite.tc, suite.filter) + testrig.StandardDBSetup(suite.db, nil) + testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") + + suite.ConversationFactory.SetupTest(suite.db) + + suite.testAccount = suite.testAccounts["local_account_1"] +} + +func (suite *ConversationsTestSuite) TearDownTest() { + conversationModels := []interface{}{ + (*gtsmodel.Conversation)(nil), + (*gtsmodel.ConversationToStatus)(nil), + } + for _, model := range conversationModels { + if err := suite.db.DropTable(context.Background(), model); err != nil { + log.Error(context.Background(), err) + } + } + + testrig.StandardDBTeardown(suite.db) + testrig.StandardStorageTeardown(suite.storage) + testrig.StopWorkers(&suite.state) +} + +func TestConversationsTestSuite(t *testing.T) { + suite.Run(t, new(ConversationsTestSuite)) +} diff --git a/internal/processing/conversations/delete.go b/internal/processing/conversations/delete.go new file mode 100644 index 000000000..5cbdd00a5 --- /dev/null +++ b/internal/processing/conversations/delete.go @@ -0,0 +1,45 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *Processor) Delete( + ctx context.Context, + requestingAccount *gtsmodel.Account, + id string, +) gtserror.WithCode { + // Get the conversation so that we can check its owning account ID. + conversation, errWithCode := p.getConversationOwnedBy(gtscontext.SetBarebones(ctx), id, requestingAccount) + if errWithCode != nil { + return errWithCode + } + + // Delete the conversation. + if err := p.state.DB.DeleteConversationByID(ctx, conversation.ID); err != nil { + return gtserror.NewErrorInternalError(err) + } + + return nil +} diff --git a/internal/processing/conversations/delete_test.go b/internal/processing/conversations/delete_test.go new file mode 100644 index 000000000..23b4f1c1a --- /dev/null +++ b/internal/processing/conversations/delete_test.go @@ -0,0 +1,27 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import "context" + +func (suite *ConversationsTestSuite) TestDelete() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + err := suite.conversationsProcessor.Delete(context.Background(), suite.testAccount, conversation.ID) + suite.NoError(err) +} diff --git a/internal/processing/conversations/get.go b/internal/processing/conversations/get.go new file mode 100644 index 000000000..0c7832cae --- /dev/null +++ b/internal/processing/conversations/get.go @@ -0,0 +1,101 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// GetAll returns conversations owned by the given account. +// The additional parameters can be used for paging. +func (p *Processor) GetAll( + ctx context.Context, + requestingAccount *gtsmodel.Account, + page *paging.Page, +) (*apimodel.PageableResponse, gtserror.WithCode) { + conversations, err := p.state.DB.GetConversationsByOwnerAccountID( + ctx, + requestingAccount.ID, + page, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError( + gtserror.Newf( + "DB error getting conversations for account %s: %w", + requestingAccount.ID, + err, + ), + ) + } + + // Check for empty response. + count := len(conversations) + if len(conversations) == 0 { + return util.EmptyPageableResponse(), nil + } + + // Get the lowest and highest last status ID values, used for paging. + lo := conversations[count-1].LastStatusID + hi := conversations[0].LastStatusID + + items := make([]interface{}, 0, count) + + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + for _, conversation := range conversations { + // Convert conversation to frontend API model. + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + filters, + mutes, + ) + if err != nil { + log.Errorf( + ctx, + "error converting conversation %s to API representation: %v", + conversation.ID, + err, + ) + continue + } + + // Append conversation to return items. + items = append(items, apiConversation) + } + + return paging.PackageResponse(paging.ResponseParams{ + Items: items, + Path: "/api/v1/conversations", + Next: page.Next(lo, hi), + Prev: page.Prev(lo, hi), + }), nil +} diff --git a/internal/processing/conversations/get_test.go b/internal/processing/conversations/get_test.go new file mode 100644 index 000000000..7b3d60749 --- /dev/null +++ b/internal/processing/conversations/get_test.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + "time" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +) + +func (suite *ConversationsTestSuite) TestGetAll() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) + if suite.NoError(err) && suite.Len(resp.Items, 1) && suite.IsType((*apimodel.Conversation)(nil), resp.Items[0]) { + apiConversation := resp.Items[0].(*apimodel.Conversation) + suite.Equal(conversation.ID, apiConversation.ID) + suite.True(apiConversation.Unread) + } +} + +// Test that conversations with newer last status IDs are returned earlier. +func (suite *ConversationsTestSuite) TestGetAllOrder() { + // Create a new conversation. + conversation1 := suite.NewTestConversation(suite.testAccount, 0) + + // Create another new conversation with a last status newer than conversation1's. + conversation2 := suite.NewTestConversation(suite.testAccount, 1*time.Second) + + // Add an even newer status than that to conversation1. + conversation1Status2 := suite.NewTestStatus(suite.testAccount, conversation1.LastStatus.ThreadID, 2*time.Second, conversation1.LastStatus) + conversation1.LastStatusID = conversation1Status2.ID + if err := suite.db.UpsertConversation(context.Background(), conversation1, "last_status_id"); err != nil { + suite.FailNow(err.Error()) + } + + resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) + if suite.NoError(err) && suite.Len(resp.Items, 2) { + // conversation1 should be the first conversation returned. + apiConversation1 := resp.Items[0].(*apimodel.Conversation) + suite.Equal(conversation1.ID, apiConversation1.ID) + // It should have the newest status added to it. + suite.Equal(conversation1.LastStatusID, conversation1Status2.ID) + + // conversation2 should be the second conversation returned. + apiConversation2 := resp.Items[1].(*apimodel.Conversation) + suite.Equal(conversation2.ID, apiConversation2.ID) + } +} diff --git a/internal/processing/conversations/migrate.go b/internal/processing/conversations/migrate.go new file mode 100644 index 000000000..959ffcca4 --- /dev/null +++ b/internal/processing/conversations/migrate.go @@ -0,0 +1,131 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "encoding/json" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +const advancedMigrationID = "20240611190733_add_conversations" +const statusBatchSize = 100 + +type AdvancedMigrationState struct { + MinID string + MaxIDInclusive string +} + +func (p *Processor) MigrateDMsToConversations(ctx context.Context) error { + advancedMigration, err := p.state.DB.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("couldn't get advanced migration with ID %s: %w", advancedMigrationID, err) + } + state := AdvancedMigrationState{} + if advancedMigration != nil { + // There was a previous migration. + if *advancedMigration.Finished { + // This migration has already been run to completion; we don't need to run it again. + return nil + } + // Otherwise, pick up where we left off. + if err := json.Unmarshal(advancedMigration.StateJSON, &state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't deserialize advanced migration state from JSON: %w", err) + } + } else { + // Start at the beginning. + state.MinID = id.Lowest + + // Find the max ID of all existing statuses. + // This will be the last one we migrate; + // newer ones will be handled by the normal conversation flow. + state.MaxIDInclusive, err = p.state.DB.MaxDirectStatusID(ctx) + if err != nil { + return gtserror.Newf("couldn't get max DM status ID for migration: %w", err) + } + + // Save a new advanced migration record. + advancedMigration = >smodel.AdvancedMigration{ + ID: advancedMigrationID, + Finished: util.Ptr(false), + } + if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) + } + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + } + + log.Info(ctx, "migrating DMs to conversations…") + + // In batches, get all statuses up to and including the max ID, + // and update conversations for each in order. + for { + // Get status IDs for this batch. + statusIDs, err := p.state.DB.GetDirectStatusIDsBatch(ctx, state.MinID, state.MaxIDInclusive, statusBatchSize) + if err != nil { + return gtserror.Newf("couldn't get DM status ID batch for migration: %w", err) + } + if len(statusIDs) == 0 { + break + } + log.Infof(ctx, "migrating %d DMs starting after %s", len(statusIDs), state.MinID) + + // Load the batch by IDs. + statuses, err := p.state.DB.GetStatusesByIDs(ctx, statusIDs) + if err != nil { + return gtserror.Newf("couldn't get DM statuses for migration: %w", err) + } + + // Update conversations for each status. Don't generate notifications. + for _, status := range statuses { + if _, err := p.UpdateConversationsForStatus(ctx, status); err != nil { + return gtserror.Newf("couldn't update conversations for status %s during migration: %w", status.ID, err) + } + } + + // Save the migration state with the new min ID. + state.MinID = statusIDs[len(statusIDs)-1] + if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { + // This should never happen. + return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) + } + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + } + + // Mark the migration as finished. + advancedMigration.Finished = util.Ptr(true) + if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { + return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) + } + + log.Info(ctx, "finished migrating DMs to conversations.") + return nil +} diff --git a/internal/processing/conversations/migrate_test.go b/internal/processing/conversations/migrate_test.go new file mode 100644 index 000000000..b625e59ba --- /dev/null +++ b/internal/processing/conversations/migrate_test.go @@ -0,0 +1,85 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/db/bundb" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// Test that we can migrate DMs to conversations. +// This test assumes that we're using the standard test fixtures, which contain some conversation-eligible DMs. +func (suite *ConversationsTestSuite) TestMigrateDMsToConversations() { + advancedMigrationID := "20240611190733_add_conversations" + ctx := context.Background() + rawDB := (suite.db).(*bundb.DBService).DB() + + // Precondition: we shouldn't have any conversations yet. + numConversations := 0 + if err := rawDB.NewSelect(). + Model((*gtsmodel.Conversation)(nil)). + ColumnExpr("COUNT(*)"). + Scan(ctx, &numConversations); // nocollapse + err != nil { + suite.FailNow(err.Error()) + } + suite.Zero(numConversations) + + // Precondition: there is no record of the conversations advanced migration. + _, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + suite.ErrorIs(err, db.ErrNoEntries) + + // Run the migration, which should not fail. + if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { + suite.FailNow(err.Error()) + } + + // We should now have some conversations. + if err := rawDB.NewSelect(). + Model((*gtsmodel.Conversation)(nil)). + ColumnExpr("COUNT(*)"). + Scan(ctx, &numConversations); // nocollapse + err != nil { + suite.FailNow(err.Error()) + } + suite.NotZero(numConversations) + + // The advanced migration should now be marked as finished. + advancedMigration, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil { + suite.FailNow(err.Error()) + } + if suite.NotNil(advancedMigration) && suite.NotNil(advancedMigration.Finished) { + suite.True(*advancedMigration.Finished) + } + + // Run the migration again, which should not fail. + if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { + suite.FailNow(err.Error()) + } + + // However, it shouldn't have done anything, so the advanced migration should not have been updated. + advancedMigration2, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) + if err != nil { + suite.FailNow(err.Error()) + } + suite.Equal(advancedMigration.UpdatedAt, advancedMigration2.UpdatedAt) +} diff --git a/internal/processing/conversations/read.go b/internal/processing/conversations/read.go new file mode 100644 index 000000000..512a004a3 --- /dev/null +++ b/internal/processing/conversations/read.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (p *Processor) Read( + ctx context.Context, + requestingAccount *gtsmodel.Account, + id string, +) (*apimodel.Conversation, gtserror.WithCode) { + // Get the conversation, including participating accounts and last status. + conversation, errWithCode := p.getConversationOwnedBy(ctx, id, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + // Mark the conversation as read. + conversation.Read = util.Ptr(true) + if err := p.state.DB.UpsertConversation(ctx, conversation, "read"); err != nil { + err = gtserror.Newf("DB error updating conversation %s: %w", id, err) + return nil, gtserror.NewErrorInternalError(err) + } + + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) + if errWithCode != nil { + return nil, errWithCode + } + + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + filters, + mutes, + ) + if err != nil { + err = gtserror.Newf("error converting conversation %s to API representation: %w", id, err) + return nil, gtserror.NewErrorInternalError(err) + } + + return apiConversation, nil +} diff --git a/internal/processing/conversations/read_test.go b/internal/processing/conversations/read_test.go new file mode 100644 index 000000000..ebd8f7fe5 --- /dev/null +++ b/internal/processing/conversations/read_test.go @@ -0,0 +1,34 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (suite *ConversationsTestSuite) TestRead() { + conversation := suite.NewTestConversation(suite.testAccount, 0) + + suite.False(util.PtrOrValue(conversation.Read, false)) + apiConversation, err := suite.conversationsProcessor.Read(context.Background(), suite.testAccount, conversation.ID) + if suite.NoError(err) { + suite.False(apiConversation.Unread) + } +} diff --git a/internal/processing/conversations/update.go b/internal/processing/conversations/update.go new file mode 100644 index 000000000..7445994ae --- /dev/null +++ b/internal/processing/conversations/update.go @@ -0,0 +1,242 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// ConversationNotification carries the arguments to processing/stream.Processor.Conversation. +type ConversationNotification struct { + // AccountID of a local account to deliver the notification to. + AccountID string + // Conversation as the notification payload. + Conversation *apimodel.Conversation +} + +// UpdateConversationsForStatus updates all conversations related to a status, +// and returns a map from local account IDs to conversation notifications that should be sent to them. +func (p *Processor) UpdateConversationsForStatus(ctx context.Context, status *gtsmodel.Status) ([]ConversationNotification, error) { + if status.Visibility != gtsmodel.VisibilityDirect { + // Only DMs are considered part of conversations. + return nil, nil + } + if status.BoostOfID != "" { + // Boosts can't be part of conversations. + // FUTURE: This may change if we ever implement quote posts. + return nil, nil + } + if status.ThreadID == "" { + // If the status doesn't have a thread ID, it didn't mention a local account, + // and thus can't be part of a conversation. + return nil, nil + } + + // We need accounts to be populated for this. + if err := p.state.DB.PopulateStatus(ctx, status); err != nil { + return nil, gtserror.Newf("DB error populating status %s: %w", status.ID, err) + } + + // The account which authored the status plus all mentioned accounts. + allParticipantsSet := make(map[string]*gtsmodel.Account, 1+len(status.Mentions)) + allParticipantsSet[status.AccountID] = status.Account + for _, mention := range status.Mentions { + allParticipantsSet[mention.TargetAccountID] = mention.TargetAccount + } + + // Create or update conversations for and send notifications to each local participant. + notifications := make([]ConversationNotification, 0, len(allParticipantsSet)) + for _, participant := range allParticipantsSet { + if participant.IsRemote() { + continue + } + localAccount := participant + + // If the status is not visible to this account, skip processing it for this account. + visible, err := p.filter.StatusVisible(ctx, localAccount, status) + if err != nil { + log.Errorf( + ctx, + "error checking status %s visibility for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + continue + } else if !visible { + continue + } + + // Is the status filtered or muted for this user? + // Converting the status to an API status runs the filter/mute checks. + filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, localAccount) + if errWithCode != nil { + log.Error(ctx, errWithCode) + continue + } + _, err = p.converter.StatusToAPIStatus( + ctx, + status, + localAccount, + statusfilter.FilterContextNotifications, + filters, + mutes, + ) + if err != nil { + // If the status matched a hide filter, skip processing it for this account. + // If there was another kind of error, log that and skip it anyway. + if !errors.Is(err, statusfilter.ErrHideStatus) { + log.Errorf( + ctx, + "error checking status %s filtering/muting for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + } + continue + } + + // Collect other accounts participating in the conversation. + otherAccounts := make([]*gtsmodel.Account, 0, len(allParticipantsSet)-1) + otherAccountIDs := make([]string, 0, len(allParticipantsSet)-1) + for accountID, account := range allParticipantsSet { + if accountID != localAccount.ID { + otherAccounts = append(otherAccounts, account) + otherAccountIDs = append(otherAccountIDs, accountID) + } + } + + // Check for a previously existing conversation, if there is one. + conversation, err := p.state.DB.GetConversationByThreadAndAccountIDs( + ctx, + status.ThreadID, + localAccount.ID, + otherAccountIDs, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + log.Errorf( + ctx, + "error trying to find a previous conversation for status %s and account %s: %v", + status.ID, + localAccount.ID, + err, + ) + continue + } + + if conversation == nil { + // Create a new conversation. + conversation = >smodel.Conversation{ + ID: id.NewULID(), + AccountID: localAccount.ID, + OtherAccountIDs: otherAccountIDs, + OtherAccounts: otherAccounts, + OtherAccountsKey: gtsmodel.ConversationOtherAccountsKey(otherAccountIDs), + ThreadID: status.ThreadID, + Read: util.Ptr(true), + } + } + + // Assume that if the conversation owner posted the status, they've already read it. + statusAuthoredByConversationOwner := status.AccountID == conversation.AccountID + + // Update the conversation. + // If there is no previous last status or this one is more recently created, set it as the last status. + if conversation.LastStatus == nil || conversation.LastStatus.CreatedAt.Before(status.CreatedAt) { + conversation.LastStatusID = status.ID + conversation.LastStatus = status + } + // If the conversation is unread, leave it marked as unread. + // If the conversation is read but this status might not have been, mark the conversation as unread. + if !statusAuthoredByConversationOwner { + conversation.Read = util.Ptr(false) + } + + // Create or update the conversation. + err = p.state.DB.UpsertConversation(ctx, conversation) + if err != nil { + log.Errorf( + ctx, + "error creating or updating conversation %s for status %s and account %s: %v", + conversation.ID, + status.ID, + localAccount.ID, + err, + ) + continue + } + + // Link the conversation to the status. + if err := p.state.DB.LinkConversationToStatus(ctx, conversation.ID, status.ID); err != nil { + log.Errorf( + ctx, + "error linking conversation %s to status %s: %v", + conversation.ID, + status.ID, + err, + ) + continue + } + + // Convert the conversation to API representation. + apiConversation, err := p.converter.ConversationToAPIConversation( + ctx, + conversation, + localAccount, + filters, + mutes, + ) + if err != nil { + // If the conversation's last status matched a hide filter, skip it. + // If there was another kind of error, log that and skip it anyway. + if !errors.Is(err, statusfilter.ErrHideStatus) { + log.Errorf( + ctx, + "error converting conversation %s to API representation for account %s: %v", + status.ID, + localAccount.ID, + err, + ) + } + continue + } + + // Generate a notification, + // unless the status was authored by the user who would be notified, + // in which case they already know. + if status.AccountID != localAccount.ID { + notifications = append(notifications, ConversationNotification{ + AccountID: localAccount.ID, + Conversation: apiConversation, + }) + } + } + + return notifications, nil +} diff --git a/internal/processing/conversations/update_test.go b/internal/processing/conversations/update_test.go new file mode 100644 index 000000000..8ba2800fe --- /dev/null +++ b/internal/processing/conversations/update_test.go @@ -0,0 +1,54 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( + "context" +) + +// Test that we can create conversations when a new status comes in. +func (suite *ConversationsTestSuite) TestUpdateConversationsForStatus() { + ctx := context.Background() + + // Precondition: the test user shouldn't have any conversations yet. + conversations, err := suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) + if err != nil { + suite.FailNow(err.Error()) + } + suite.Empty(conversations) + + // Create a status. + threadID := suite.NewULID(0) + status := suite.NewTestStatus(suite.testAccount, threadID, 0, nil) + + // Update conversations for it. + notifications, err := suite.conversationsProcessor.UpdateConversationsForStatus(ctx, status) + if err != nil { + suite.FailNow(err.Error()) + } + + // In this test, the user is DMing themself, and should not receive a notification from that. + suite.Empty(notifications) + + // The test user should have a conversation now. + conversations, err = suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) + if err != nil { + suite.FailNow(err.Error()) + } + suite.NotEmpty(conversations) +} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index fb6b05d80..a07df76e1 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -27,7 +27,9 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/admin" + "github.com/superseriousbusiness/gotosocial/internal/processing/advancedmigrations" "github.com/superseriousbusiness/gotosocial/internal/processing/common" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/fedi" filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1" filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2" @@ -70,22 +72,24 @@ type Processor struct { SUB-PROCESSORS */ - account account.Processor - admin admin.Processor - fedi fedi.Processor - filtersv1 filtersv1.Processor - filtersv2 filtersv2.Processor - list list.Processor - markers markers.Processor - media media.Processor - polls polls.Processor - report report.Processor - search search.Processor - status status.Processor - stream stream.Processor - timeline timeline.Processor - user user.Processor - workers workers.Processor + account account.Processor + admin admin.Processor + advancedmigrations advancedmigrations.Processor + conversations conversations.Processor + fedi fedi.Processor + filtersv1 filtersv1.Processor + filtersv2 filtersv2.Processor + list list.Processor + markers markers.Processor + media media.Processor + polls polls.Processor + report report.Processor + search search.Processor + status status.Processor + stream stream.Processor + timeline timeline.Processor + user user.Processor + workers workers.Processor } func (p *Processor) Account() *account.Processor { @@ -96,6 +100,14 @@ func (p *Processor) Admin() *admin.Processor { return &p.admin } +func (p *Processor) AdvancedMigrations() *advancedmigrations.Processor { + return &p.advancedmigrations +} + +func (p *Processor) Conversations() *conversations.Processor { + return &p.conversations +} + func (p *Processor) Fedi() *fedi.Processor { return &p.fedi } @@ -188,6 +200,7 @@ func NewProcessor( // processors + pin them to this struct. processor.account = account.New(&common, state, converter, mediaManager, federator, filter, parseMentionFunc) processor.admin = admin.New(&common, state, cleaner, federator, converter, mediaManager, federator.TransportController(), emailSender) + processor.conversations = conversations.New(state, converter, filter) processor.fedi = fedi.New(state, &common, converter, federator, filter) processor.filtersv1 = filtersv1.New(state, converter, &processor.stream) processor.filtersv2 = filtersv2.New(state, converter, &processor.stream) @@ -200,6 +213,9 @@ func NewProcessor( processor.status = status.New(state, &common, &processor.polls, federator, converter, filter, parseMentionFunc) processor.user = user.New(state, converter, oauthServer, emailSender) + // The advanced migrations processor sequences advanced migrations from all other processors. + processor.advancedmigrations = advancedmigrations.New(&processor.conversations) + // Workers processor handles asynchronous // worker jobs; instantiate it separately // and pass subset of sub processors it needs. @@ -212,6 +228,7 @@ func NewProcessor( &processor.account, &processor.media, &processor.stream, + &processor.conversations, ) return processor diff --git a/internal/processing/stream/conversation.go b/internal/processing/stream/conversation.go new file mode 100644 index 000000000..a0236c459 --- /dev/null +++ b/internal/processing/stream/conversation.go @@ -0,0 +1,44 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package stream + +import ( + "context" + "encoding/json" + + "codeberg.org/gruf/go-byteutil" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// Conversation streams the given conversation to any open, appropriate streams belonging to the given account. +func (p *Processor) Conversation(ctx context.Context, accountID string, conversation *apimodel.Conversation) { + b, err := json.Marshal(conversation) + if err != nil { + log.Errorf(ctx, "error marshaling json: %v", err) + return + } + p.streams.Post(ctx, accountID, stream.Message{ + Payload: byteutil.B2S(b), + Event: stream.EventTypeConversation, + Stream: []string{ + stream.TimelineDirect, + }, + }) +} diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go index 49a68d27a..35c2c31b7 100644 --- a/internal/processing/workers/fromclientapi_test.go +++ b/internal/processing/workers/fromclientapi_test.go @@ -50,6 +50,8 @@ func (suite *FromClientAPITestSuite) newStatus( visibility gtsmodel.Visibility, replyToStatus *gtsmodel.Status, boostOfStatus *gtsmodel.Status, + mentionedAccounts []*gtsmodel.Account, + createThread bool, ) *gtsmodel.Status { var ( protocol = config.GetProtocol() @@ -102,6 +104,39 @@ func (suite *FromClientAPITestSuite) newStatus( newStatus.Visibility = boostOfStatus.Visibility } + for _, mentionedAccount := range mentionedAccounts { + newMention := >smodel.Mention{ + ID: id.NewULID(), + StatusID: newStatus.ID, + Status: newStatus, + OriginAccountID: account.ID, + OriginAccountURI: account.URI, + OriginAccount: account, + TargetAccountID: mentionedAccount.ID, + TargetAccount: mentionedAccount, + Silent: util.Ptr(false), + } + + newStatus.Mentions = append(newStatus.Mentions, newMention) + newStatus.MentionIDs = append(newStatus.MentionIDs, newMention.ID) + + if err := state.DB.PutMention(ctx, newMention); err != nil { + suite.FailNow(err.Error()) + } + } + + if createThread { + newThread := >smodel.Thread{ + ID: id.NewULID(), + } + + newStatus.ThreadID = newThread.ID + + if err := state.DB.PutThread(ctx, newThread); err != nil { + suite.FailNow(err.Error()) + } + } + // Put the status in the db, to mimic what would // have already happened earlier up the flow. if err := state.DB.PutStatus(ctx, newStatus); err != nil { @@ -168,6 +203,31 @@ func (suite *FromClientAPITestSuite) statusJSON( return string(statusJSON) } +func (suite *FromClientAPITestSuite) conversationJSON( + ctx context.Context, + typeConverter *typeutils.Converter, + conversation *gtsmodel.Conversation, + requestingAccount *gtsmodel.Account, +) string { + apiConversation, err := typeConverter.ConversationToAPIConversation( + ctx, + conversation, + requestingAccount, + nil, + nil, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + conversationJSON, err := json.Marshal(apiConversation) + if err != nil { + suite.FailNow(err.Error()) + } + + return string(conversationJSON) +} + func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { testStructs := suite.SetupTestStructs() defer suite.TearDownTestStructs(testStructs) @@ -194,6 +254,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { gtsmodel.VisibilityPublic, nil, nil, + nil, + false, ) ) @@ -303,6 +365,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -362,6 +426,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() { gtsmodel.VisibilityPublic, suite.testStatuses["local_account_1_status_1"], nil, + nil, + false, ) threadMute = >smodel.ThreadMute{ ID: "01HD3KRMBB1M85QRWHD912QWRE", @@ -420,6 +486,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() { gtsmodel.VisibilityPublic, nil, suite.testStatuses["local_account_1_status_1"], + nil, + false, ) threadMute = >smodel.ThreadMute{ ID: "01HD3KRMBB1M85QRWHD912QWRE", @@ -483,6 +551,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -556,6 +626,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -634,6 +706,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli gtsmodel.VisibilityPublic, suite.testStatuses["local_account_2_status_1"], nil, + nil, + false, ) ) @@ -704,6 +778,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { gtsmodel.VisibilityPublic, nil, suite.testStatuses["local_account_2_status_1"], + nil, + false, ) ) @@ -765,6 +841,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { gtsmodel.VisibilityPublic, nil, suite.testStatuses["local_account_2_status_1"], + nil, + false, ) ) @@ -807,6 +885,159 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { ) } +// A DM to a local user should create a conversation and accompanying notification. +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() { + testStructs := suite.SetupTestStructs() + defer suite.TearDownTestStructs(testStructs) + + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["local_account_2"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, + testStructs.Processor, + receivingAccount, + nil, + ) + homeStream = streams[stream.TimelineHome] + directStream = streams[stream.TimelineDirect] + + // turtle posts a new top-level DM mentioning zork. + status = suite.newStatus( + ctx, + testStructs.State, + postingAccount, + gtsmodel.VisibilityDirect, + nil, + nil, + []*gtsmodel.Account{receivingAccount}, + true, + ) + ) + + // Process the new status. + if err := testStructs.Processor.Workers().ProcessFromClientAPI( + ctx, + &messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + Origin: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Locate the conversation which should now exist for zork. + conversation, err := testStructs.State.DB.GetConversationByThreadAndAccountIDs( + ctx, + status.ThreadID, + receivingAccount.ID, + []string{postingAccount.ID}, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + // Check status in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeUpdate, + ) + + // Check mention notification in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeNotification, + ) + + // Check conversation in direct stream. + conversationJSON := suite.conversationJSON( + ctx, + testStructs.TypeConverter, + conversation, + receivingAccount, + ) + suite.checkStreamed( + directStream, + true, + conversationJSON, + stream.EventTypeConversation, + ) +} + +// A public message to a local user should not result in a conversation notification. +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() { + testStructs := suite.SetupTestStructs() + defer suite.TearDownTestStructs(testStructs) + + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["local_account_2"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, + testStructs.Processor, + receivingAccount, + nil, + ) + homeStream = streams[stream.TimelineHome] + directStream = streams[stream.TimelineDirect] + + // turtle posts a new top-level public message mentioning zork. + status = suite.newStatus( + ctx, + testStructs.State, + postingAccount, + gtsmodel.VisibilityPublic, + nil, + nil, + []*gtsmodel.Account{receivingAccount}, + true, + ) + ) + + // Process the new status. + if err := testStructs.Processor.Workers().ProcessFromClientAPI( + ctx, + &messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + Origin: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Check status in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeUpdate, + ) + + // Check mention notification in home stream. + suite.checkStreamed( + homeStream, + true, + "", + stream.EventTypeNotification, + ) + + // Check for absence of conversation notification in direct stream. + suite.checkStreamed( + directStream, + false, + "", + "", + ) +} + func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { testStructs := suite.SetupTestStructs() defer suite.TearDownTestStructs(testStructs) diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go index 5ec905ae8..1a7dbbfe5 100644 --- a/internal/processing/workers/surface.go +++ b/internal/processing/workers/surface.go @@ -20,6 +20,7 @@ package workers import ( "github.com/superseriousbusiness/gotosocial/internal/email" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" @@ -32,9 +33,10 @@ import ( // - sending a notification to a user // - sending an email type Surface struct { - State *state.State - Converter *typeutils.Converter - Stream *stream.Processor - Filter *visibility.Filter - EmailSender email.Sender + State *state.State + Converter *typeutils.Converter + Stream *stream.Processor + Filter *visibility.Filter + EmailSender email.Sender + Conversations *conversations.Processor } diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go index 18d0277ae..937ddeca2 100644 --- a/internal/processing/workers/surfacenotify_test.go +++ b/internal/processing/workers/surfacenotify_test.go @@ -39,11 +39,12 @@ func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() { defer suite.TearDownTestStructs(testStructs) surface := &workers.Surface{ - State: testStructs.State, - Converter: testStructs.TypeConverter, - Stream: testStructs.Processor.Stream(), - Filter: visibility.NewFilter(testStructs.State), - EmailSender: testStructs.EmailSender, + State: testStructs.State, + Converter: testStructs.TypeConverter, + Stream: testStructs.Processor.Stream(), + Filter: visibility.NewFilter(testStructs.State), + EmailSender: testStructs.EmailSender, + Conversations: testStructs.Processor.Conversations(), } var ( diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index 41d7f6f2a..8ac8293ed 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -36,8 +36,8 @@ import ( // and LIST timelines of accounts that follow the status author. // // It will also handle notifications for any mentions attached to -// the account, and notifications for any local accounts that want -// to know when this account posts. +// the account, notifications for any local accounts that want +// to know when this account posts, and conversations containing the status. func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error { // Ensure status fully populated; including account, mentions, etc. if err := s.State.DB.PopulateStatus(ctx, status); err != nil { @@ -73,6 +73,15 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel. return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err) } + // Update any conversations containing this status, and send conversation notifications. + notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status) + if err != nil { + return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err) + } + for _, notification := range notifications { + s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation) + } + return nil } diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go index 780e5ca14..915370976 100644 --- a/internal/processing/workers/util.go +++ b/internal/processing/workers/util.go @@ -137,6 +137,11 @@ func (u *utils) wipeStatus( errs.Appendf("error deleting status from timelines: %w", err) } + // delete this status from any conversations that it's part of + if err := u.state.DB.DeleteStatusFromConversations(ctx, statusToDelete.ID); err != nil { + errs.Appendf("error deleting status from conversations: %w", err) + } + // finally, delete the status itself if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil { errs.Appendf("error deleting status: %w", err) diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go index 6b4cc07a6..c7f67b025 100644 --- a/internal/processing/workers/workers.go +++ b/internal/processing/workers/workers.go @@ -22,6 +22,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/federation" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" "github.com/superseriousbusiness/gotosocial/internal/state" @@ -44,6 +45,7 @@ func New( account *account.Processor, media *media.Processor, stream *stream.Processor, + conversations *conversations.Processor, ) Processor { // Init federate logic // wrapper struct. @@ -56,11 +58,12 @@ func New( // Init surface logic // wrapper struct. surface := &Surface{ - State: state, - Converter: converter, - Stream: stream, - Filter: filter, - EmailSender: emailSender, + State: state, + Converter: converter, + Stream: stream, + Filter: filter, + EmailSender: emailSender, + Conversations: conversations, } // Init shared util funcs. diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go index f66190d75..3093fd93a 100644 --- a/internal/processing/workers/workers_test.go +++ b/internal/processing/workers/workers_test.go @@ -108,6 +108,7 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce stream.TimelineHome, stream.TimelinePublic, stream.TimelineNotifications, + stream.TimelineDirect, } { stream, err := processor.Stream().Open(ctx, account, streamType) if err != nil { |