diff options
| author | 2024-07-23 12:44:31 -0700 | |
|---|---|---|
| committer | 2024-07-23 20:44:31 +0100 | |
| commit | 8fdd358f4b4428b33df4afd672ed070032d46e48 (patch) | |
| tree | 92ec4bcbda633878a468e396b968656dbf33ef59 /internal/processing | |
| parent | [bugfix] media.Processor{}.GetFile() returning 404s on first call, correctly ... (diff) | |
| download | gotosocial-8fdd358f4b4428b33df4afd672ed070032d46e48.tar.xz | |
[feature] Conversations API (#3013)
* Implement conversations API
* Sort and page conversations by last status ID
* Appease linter
* Fix deleting conversations and statuses
* Refactor to make migrations automatic
* Lint
* Update tests post-merge
* Fixes from live-fire testing
* Linter caught a format problem
* Refactor tests, fix cache
* Negative test for non-DMs
* Run conversations advanced migration on testrig startup as well as regular server startup
* Document (lack of) side effects of API method for deleting a conversation
* Make not-found check less nested for readability
* Rename PutConversation to UpsertConversation
* Use util.Ptr instead of IIFE
* Reduce cache used by conversations
* Remove unnecessary TableExpr/ColumnExpr
* Use struct tags for both unique constraints on Conversation
* Make it clear how paging with GetDirectStatusIDsBatch should be used
* Let conversation paging skip conversations it can't render
* Use Bun NewDropTable
* Convert delete raw query to Bun
* Convert update raw query to Bun
* Convert latestConversationStatusesTempTable raw query partially to Bun
* Convert conversationStatusesTempTable raw query partially to Bun
* Rename field used to store result of MaxDirectStatusID
* Move advanced migrations to their own tiny processor
* Catch up util function name with main
* Remove json.… wrappers
* Remove redundant check
* Combine error checks
* Replace map with slice of structs
* Address processor/type converter comments
- Add context info for errors
- Extract some common processor code into shared methods
- Move conversation eligibility check ahead of populating conversation
* Add error context when dropping temp tables
Diffstat (limited to 'internal/processing')
23 files changed, 1528 insertions, 33 deletions
diff --git a/internal/processing/account/delete.go b/internal/processing/account/delete.go index 075e94544..702b46cda 100644 --- a/internal/processing/account/delete.go +++ b/internal/processing/account/delete.go @@ -460,6 +460,14 @@ func (p *Processor) deleteAccountPeripheral(ctx context.Context, account *gtsmod  	// TODO: add status mutes here when they're implemented. +	// Delete all conversations owned by given account. +	// Conversations in which it has only participated will be retained; +	// they can always be deleted by their owners. +	if err := p.state.DB.DeleteConversationsByOwnerAccountID(ctx, account.ID); // nocollapse +	err != nil && !errors.Is(err, db.ErrNoEntries) { +		return gtserror.Newf("error deleting conversations owned by account: %w", err) +	} +  	// Delete all poll votes owned by given account.  	if err := p.state.DB.DeletePollVotesByAccountID(ctx, account.ID); // nocollapse  	err != nil && !errors.Is(err, db.ErrNoEntries) { diff --git a/internal/processing/advancedmigrations/advancedmigrations.go b/internal/processing/advancedmigrations/advancedmigrations.go new file mode 100644 index 000000000..3f1876539 --- /dev/null +++ b/internal/processing/advancedmigrations/advancedmigrations.go @@ -0,0 +1,48 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package advancedmigrations + +import ( +	"context" +	"fmt" + +	"github.com/superseriousbusiness/gotosocial/internal/processing/conversations" +) + +// Processor holds references to any other processor that has migrations to run. +type Processor struct { +	conversations *conversations.Processor +} + +func New( +	conversations *conversations.Processor, +) Processor { +	return Processor{ +		conversations: conversations, +	} +} + +// Migrate runs all advanced migrations. +// Errors should be in the same format thrown by other server or testrig startup failures. +func (p *Processor) Migrate(ctx context.Context) error { +	if err := p.conversations.MigrateDMsToConversations(ctx); err != nil { +		return fmt.Errorf("error running conversations advanced migration: %w", err) +	} + +	return nil +} diff --git a/internal/processing/conversations/conversations.go b/internal/processing/conversations/conversations.go new file mode 100644 index 000000000..d95740605 --- /dev/null +++ b/internal/processing/conversations/conversations.go @@ -0,0 +1,126 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( +	"context" +	"errors" + +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/filter/usermute" +	"github.com/superseriousbusiness/gotosocial/internal/filter/visibility" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +type Processor struct { +	state     *state.State +	converter *typeutils.Converter +	filter    *visibility.Filter +} + +func New( +	state *state.State, +	converter *typeutils.Converter, +	filter *visibility.Filter, +) Processor { +	return Processor{ +		state:     state, +		converter: converter, +		filter:    filter, +	} +} + +const conversationNotFoundHelpText = "conversation not found" + +// getConversationOwnedBy gets a conversation by ID and checks that it is owned by the given account. +func (p *Processor) getConversationOwnedBy( +	ctx context.Context, +	id string, +	requestingAccount *gtsmodel.Account, +) (*gtsmodel.Conversation, gtserror.WithCode) { +	// Get the conversation so that we can check its owning account ID. +	conversation, err := p.state.DB.GetConversationByID(ctx, id) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		return nil, gtserror.NewErrorInternalError( +			gtserror.Newf( +				"DB error getting conversation %s for account %s: %w", +				id, +				requestingAccount.ID, +				err, +			), +		) +	} +	if conversation == nil { +		return nil, gtserror.NewErrorNotFound( +			gtserror.Newf( +				"conversation %s not found: %w", +				id, +				err, +			), +			conversationNotFoundHelpText, +		) +	} +	if conversation.AccountID != requestingAccount.ID { +		return nil, gtserror.NewErrorNotFound( +			gtserror.Newf( +				"conversation %s not owned by account %s: %w", +				id, +				requestingAccount.ID, +				err, +			), +			conversationNotFoundHelpText, +		) +	} + +	return conversation, nil +} + +// getFiltersAndMutes gets the given account's filters and compiled mute list. +func (p *Processor) getFiltersAndMutes( +	ctx context.Context, +	requestingAccount *gtsmodel.Account, +) ([]*gtsmodel.Filter, *usermute.CompiledUserMuteList, gtserror.WithCode) { +	filters, err := p.state.DB.GetFiltersForAccountID(ctx, requestingAccount.ID) +	if err != nil { +		return nil, nil, gtserror.NewErrorInternalError( +			gtserror.Newf( +				"DB error getting filters for account %s: %w", +				requestingAccount.ID, +				err, +			), +		) +	} + +	mutes, err := p.state.DB.GetAccountMutes(gtscontext.SetBarebones(ctx), requestingAccount.ID, nil) +	if err != nil { +		return nil, nil, gtserror.NewErrorInternalError( +			gtserror.Newf( +				"DB error getting mutes for account %s: %w", +				requestingAccount.ID, +				err, +			), +		) +	} +	compiledMutes := usermute.NewCompiledUserMuteList(mutes) + +	return filters, compiledMutes, nil +} diff --git a/internal/processing/conversations/conversations_test.go b/internal/processing/conversations/conversations_test.go new file mode 100644 index 000000000..cc7ec617e --- /dev/null +++ b/internal/processing/conversations/conversations_test.go @@ -0,0 +1,151 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( +	"context" +	"testing" +	"time" + +	"github.com/stretchr/testify/suite" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	dbtest "github.com/superseriousbusiness/gotosocial/internal/db/test" +	"github.com/superseriousbusiness/gotosocial/internal/email" +	"github.com/superseriousbusiness/gotosocial/internal/federation" +	"github.com/superseriousbusiness/gotosocial/internal/filter/visibility" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/media" +	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/processing/conversations" +	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/storage" +	"github.com/superseriousbusiness/gotosocial/internal/transport" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/testrig" +) + +type ConversationsTestSuite struct { +	// standard suite interfaces +	suite.Suite +	db                  db.DB +	tc                  *typeutils.Converter +	storage             *storage.Driver +	state               state.State +	mediaManager        *media.Manager +	transportController transport.Controller +	federator           *federation.Federator +	emailSender         email.Sender +	sentEmails          map[string]string +	filter              *visibility.Filter + +	// standard suite models +	testTokens       map[string]*gtsmodel.Token +	testClients      map[string]*gtsmodel.Client +	testApplications map[string]*gtsmodel.Application +	testUsers        map[string]*gtsmodel.User +	testAccounts     map[string]*gtsmodel.Account +	testFollows      map[string]*gtsmodel.Follow +	testAttachments  map[string]*gtsmodel.MediaAttachment +	testStatuses     map[string]*gtsmodel.Status + +	// module being tested +	conversationsProcessor conversations.Processor + +	// Owner of test conversations +	testAccount *gtsmodel.Account + +	// Mixin for conversation tests +	dbtest.ConversationFactory +} + +func (suite *ConversationsTestSuite) getClientMsg(timeout time.Duration) (*messages.FromClientAPI, bool) { +	ctx := context.Background() +	ctx, cncl := context.WithTimeout(ctx, timeout) +	defer cncl() +	return suite.state.Workers.Client.Queue.PopCtx(ctx) +} + +func (suite *ConversationsTestSuite) SetupSuite() { +	suite.testTokens = testrig.NewTestTokens() +	suite.testClients = testrig.NewTestClients() +	suite.testApplications = testrig.NewTestApplications() +	suite.testUsers = testrig.NewTestUsers() +	suite.testAccounts = testrig.NewTestAccounts() +	suite.testFollows = testrig.NewTestFollows() +	suite.testAttachments = testrig.NewTestAttachments() +	suite.testStatuses = testrig.NewTestStatuses() + +	suite.ConversationFactory.SetupSuite(suite) +} + +func (suite *ConversationsTestSuite) SetupTest() { +	suite.state.Caches.Init() +	testrig.StartNoopWorkers(&suite.state) + +	testrig.InitTestConfig() +	testrig.InitTestLog() + +	suite.db = testrig.NewTestDB(&suite.state) +	suite.state.DB = suite.db +	suite.tc = typeutils.NewConverter(&suite.state) +	suite.filter = visibility.NewFilter(&suite.state) + +	testrig.StartTimelines( +		&suite.state, +		suite.filter, +		suite.tc, +	) + +	suite.storage = testrig.NewInMemoryStorage() +	suite.state.Storage = suite.storage +	suite.mediaManager = testrig.NewTestMediaManager(&suite.state) + +	suite.transportController = testrig.NewTestTransportController(&suite.state, testrig.NewMockHTTPClient(nil, "../../../testrig/media")) +	suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) +	suite.sentEmails = make(map[string]string) +	suite.emailSender = testrig.NewEmailSender("../../../web/template/", suite.sentEmails) + +	suite.conversationsProcessor = conversations.New(&suite.state, suite.tc, suite.filter) +	testrig.StandardDBSetup(suite.db, nil) +	testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") + +	suite.ConversationFactory.SetupTest(suite.db) + +	suite.testAccount = suite.testAccounts["local_account_1"] +} + +func (suite *ConversationsTestSuite) TearDownTest() { +	conversationModels := []interface{}{ +		(*gtsmodel.Conversation)(nil), +		(*gtsmodel.ConversationToStatus)(nil), +	} +	for _, model := range conversationModels { +		if err := suite.db.DropTable(context.Background(), model); err != nil { +			log.Error(context.Background(), err) +		} +	} + +	testrig.StandardDBTeardown(suite.db) +	testrig.StandardStorageTeardown(suite.storage) +	testrig.StopWorkers(&suite.state) +} + +func TestConversationsTestSuite(t *testing.T) { +	suite.Run(t, new(ConversationsTestSuite)) +} diff --git a/internal/processing/conversations/delete.go b/internal/processing/conversations/delete.go new file mode 100644 index 000000000..5cbdd00a5 --- /dev/null +++ b/internal/processing/conversations/delete.go @@ -0,0 +1,45 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( +	"context" + +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +func (p *Processor) Delete( +	ctx context.Context, +	requestingAccount *gtsmodel.Account, +	id string, +) gtserror.WithCode { +	// Get the conversation so that we can check its owning account ID. +	conversation, errWithCode := p.getConversationOwnedBy(gtscontext.SetBarebones(ctx), id, requestingAccount) +	if errWithCode != nil { +		return errWithCode +	} + +	// Delete the conversation. +	if err := p.state.DB.DeleteConversationByID(ctx, conversation.ID); err != nil { +		return gtserror.NewErrorInternalError(err) +	} + +	return nil +} diff --git a/internal/processing/conversations/delete_test.go b/internal/processing/conversations/delete_test.go new file mode 100644 index 000000000..23b4f1c1a --- /dev/null +++ b/internal/processing/conversations/delete_test.go @@ -0,0 +1,27 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import "context" + +func (suite *ConversationsTestSuite) TestDelete() { +	conversation := suite.NewTestConversation(suite.testAccount, 0) + +	err := suite.conversationsProcessor.Delete(context.Background(), suite.testAccount, conversation.ID) +	suite.NoError(err) +} diff --git a/internal/processing/conversations/get.go b/internal/processing/conversations/get.go new file mode 100644 index 000000000..0c7832cae --- /dev/null +++ b/internal/processing/conversations/get.go @@ -0,0 +1,101 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( +	"context" +	"errors" + +	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/paging" +	"github.com/superseriousbusiness/gotosocial/internal/util" +) + +// GetAll returns conversations owned by the given account. +// The additional parameters can be used for paging. +func (p *Processor) GetAll( +	ctx context.Context, +	requestingAccount *gtsmodel.Account, +	page *paging.Page, +) (*apimodel.PageableResponse, gtserror.WithCode) { +	conversations, err := p.state.DB.GetConversationsByOwnerAccountID( +		ctx, +		requestingAccount.ID, +		page, +	) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		return nil, gtserror.NewErrorInternalError( +			gtserror.Newf( +				"DB error getting conversations for account %s: %w", +				requestingAccount.ID, +				err, +			), +		) +	} + +	// Check for empty response. +	count := len(conversations) +	if len(conversations) == 0 { +		return util.EmptyPageableResponse(), nil +	} + +	// Get the lowest and highest last status ID values, used for paging. +	lo := conversations[count-1].LastStatusID +	hi := conversations[0].LastStatusID + +	items := make([]interface{}, 0, count) + +	filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) +	if errWithCode != nil { +		return nil, errWithCode +	} + +	for _, conversation := range conversations { +		// Convert conversation to frontend API model. +		apiConversation, err := p.converter.ConversationToAPIConversation( +			ctx, +			conversation, +			requestingAccount, +			filters, +			mutes, +		) +		if err != nil { +			log.Errorf( +				ctx, +				"error converting conversation %s to API representation: %v", +				conversation.ID, +				err, +			) +			continue +		} + +		// Append conversation to return items. +		items = append(items, apiConversation) +	} + +	return paging.PackageResponse(paging.ResponseParams{ +		Items: items, +		Path:  "/api/v1/conversations", +		Next:  page.Next(lo, hi), +		Prev:  page.Prev(lo, hi), +	}), nil +} diff --git a/internal/processing/conversations/get_test.go b/internal/processing/conversations/get_test.go new file mode 100644 index 000000000..7b3d60749 --- /dev/null +++ b/internal/processing/conversations/get_test.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( +	"context" +	"time" + +	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +) + +func (suite *ConversationsTestSuite) TestGetAll() { +	conversation := suite.NewTestConversation(suite.testAccount, 0) + +	resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) +	if suite.NoError(err) && suite.Len(resp.Items, 1) && suite.IsType((*apimodel.Conversation)(nil), resp.Items[0]) { +		apiConversation := resp.Items[0].(*apimodel.Conversation) +		suite.Equal(conversation.ID, apiConversation.ID) +		suite.True(apiConversation.Unread) +	} +} + +// Test that conversations with newer last status IDs are returned earlier. +func (suite *ConversationsTestSuite) TestGetAllOrder() { +	// Create a new conversation. +	conversation1 := suite.NewTestConversation(suite.testAccount, 0) + +	// Create another new conversation with a last status newer than conversation1's. +	conversation2 := suite.NewTestConversation(suite.testAccount, 1*time.Second) + +	// Add an even newer status than that to conversation1. +	conversation1Status2 := suite.NewTestStatus(suite.testAccount, conversation1.LastStatus.ThreadID, 2*time.Second, conversation1.LastStatus) +	conversation1.LastStatusID = conversation1Status2.ID +	if err := suite.db.UpsertConversation(context.Background(), conversation1, "last_status_id"); err != nil { +		suite.FailNow(err.Error()) +	} + +	resp, err := suite.conversationsProcessor.GetAll(context.Background(), suite.testAccount, nil) +	if suite.NoError(err) && suite.Len(resp.Items, 2) { +		// conversation1 should be the first conversation returned. +		apiConversation1 := resp.Items[0].(*apimodel.Conversation) +		suite.Equal(conversation1.ID, apiConversation1.ID) +		// It should have the newest status added to it. +		suite.Equal(conversation1.LastStatusID, conversation1Status2.ID) + +		// conversation2 should be the second conversation returned. +		apiConversation2 := resp.Items[1].(*apimodel.Conversation) +		suite.Equal(conversation2.ID, apiConversation2.ID) +	} +} diff --git a/internal/processing/conversations/migrate.go b/internal/processing/conversations/migrate.go new file mode 100644 index 000000000..959ffcca4 --- /dev/null +++ b/internal/processing/conversations/migrate.go @@ -0,0 +1,131 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( +	"context" +	"encoding/json" +	"errors" + +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/id" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/util" +) + +const advancedMigrationID = "20240611190733_add_conversations" +const statusBatchSize = 100 + +type AdvancedMigrationState struct { +	MinID          string +	MaxIDInclusive string +} + +func (p *Processor) MigrateDMsToConversations(ctx context.Context) error { +	advancedMigration, err := p.state.DB.GetAdvancedMigration(ctx, advancedMigrationID) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		return gtserror.Newf("couldn't get advanced migration with ID %s: %w", advancedMigrationID, err) +	} +	state := AdvancedMigrationState{} +	if advancedMigration != nil { +		// There was a previous migration. +		if *advancedMigration.Finished { +			// This migration has already been run to completion; we don't need to run it again. +			return nil +		} +		// Otherwise, pick up where we left off. +		if err := json.Unmarshal(advancedMigration.StateJSON, &state); err != nil { +			// This should never happen. +			return gtserror.Newf("couldn't deserialize advanced migration state from JSON: %w", err) +		} +	} else { +		// Start at the beginning. +		state.MinID = id.Lowest + +		// Find the max ID of all existing statuses. +		// This will be the last one we migrate; +		// newer ones will be handled by the normal conversation flow. +		state.MaxIDInclusive, err = p.state.DB.MaxDirectStatusID(ctx) +		if err != nil { +			return gtserror.Newf("couldn't get max DM status ID for migration: %w", err) +		} + +		// Save a new advanced migration record. +		advancedMigration = >smodel.AdvancedMigration{ +			ID:       advancedMigrationID, +			Finished: util.Ptr(false), +		} +		if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { +			// This should never happen. +			return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) +		} +		if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { +			return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) +		} +	} + +	log.Info(ctx, "migrating DMs to conversations…") + +	// In batches, get all statuses up to and including the max ID, +	// and update conversations for each in order. +	for { +		// Get status IDs for this batch. +		statusIDs, err := p.state.DB.GetDirectStatusIDsBatch(ctx, state.MinID, state.MaxIDInclusive, statusBatchSize) +		if err != nil { +			return gtserror.Newf("couldn't get DM status ID batch for migration: %w", err) +		} +		if len(statusIDs) == 0 { +			break +		} +		log.Infof(ctx, "migrating %d DMs starting after %s", len(statusIDs), state.MinID) + +		// Load the batch by IDs. +		statuses, err := p.state.DB.GetStatusesByIDs(ctx, statusIDs) +		if err != nil { +			return gtserror.Newf("couldn't get DM statuses for migration: %w", err) +		} + +		// Update conversations for each status. Don't generate notifications. +		for _, status := range statuses { +			if _, err := p.UpdateConversationsForStatus(ctx, status); err != nil { +				return gtserror.Newf("couldn't update conversations for status %s during migration: %w", status.ID, err) +			} +		} + +		// Save the migration state with the new min ID. +		state.MinID = statusIDs[len(statusIDs)-1] +		if advancedMigration.StateJSON, err = json.Marshal(state); err != nil { +			// This should never happen. +			return gtserror.Newf("couldn't serialize advanced migration state to JSON: %w", err) +		} +		if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { +			return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) +		} +	} + +	// Mark the migration as finished. +	advancedMigration.Finished = util.Ptr(true) +	if err := p.state.DB.PutAdvancedMigration(ctx, advancedMigration); err != nil { +		return gtserror.Newf("couldn't save state for advanced migration with ID %s: %w", advancedMigrationID, err) +	} + +	log.Info(ctx, "finished migrating DMs to conversations.") +	return nil +} diff --git a/internal/processing/conversations/migrate_test.go b/internal/processing/conversations/migrate_test.go new file mode 100644 index 000000000..b625e59ba --- /dev/null +++ b/internal/processing/conversations/migrate_test.go @@ -0,0 +1,85 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( +	"context" + +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/db/bundb" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// Test that we can migrate DMs to conversations. +// This test assumes that we're using the standard test fixtures, which contain some conversation-eligible DMs. +func (suite *ConversationsTestSuite) TestMigrateDMsToConversations() { +	advancedMigrationID := "20240611190733_add_conversations" +	ctx := context.Background() +	rawDB := (suite.db).(*bundb.DBService).DB() + +	// Precondition: we shouldn't have any conversations yet. +	numConversations := 0 +	if err := rawDB.NewSelect(). +		Model((*gtsmodel.Conversation)(nil)). +		ColumnExpr("COUNT(*)"). +		Scan(ctx, &numConversations); // nocollapse +	err != nil { +		suite.FailNow(err.Error()) +	} +	suite.Zero(numConversations) + +	// Precondition: there is no record of the conversations advanced migration. +	_, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) +	suite.ErrorIs(err, db.ErrNoEntries) + +	// Run the migration, which should not fail. +	if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { +		suite.FailNow(err.Error()) +	} + +	// We should now have some conversations. +	if err := rawDB.NewSelect(). +		Model((*gtsmodel.Conversation)(nil)). +		ColumnExpr("COUNT(*)"). +		Scan(ctx, &numConversations); // nocollapse +	err != nil { +		suite.FailNow(err.Error()) +	} +	suite.NotZero(numConversations) + +	// The advanced migration should now be marked as finished. +	advancedMigration, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) +	if err != nil { +		suite.FailNow(err.Error()) +	} +	if suite.NotNil(advancedMigration) && suite.NotNil(advancedMigration.Finished) { +		suite.True(*advancedMigration.Finished) +	} + +	// Run the migration again, which should not fail. +	if err := suite.conversationsProcessor.MigrateDMsToConversations(ctx); err != nil { +		suite.FailNow(err.Error()) +	} + +	// However, it shouldn't have done anything, so the advanced migration should not have been updated. +	advancedMigration2, err := suite.db.GetAdvancedMigration(ctx, advancedMigrationID) +	if err != nil { +		suite.FailNow(err.Error()) +	} +	suite.Equal(advancedMigration.UpdatedAt, advancedMigration2.UpdatedAt) +} diff --git a/internal/processing/conversations/read.go b/internal/processing/conversations/read.go new file mode 100644 index 000000000..512a004a3 --- /dev/null +++ b/internal/processing/conversations/read.go @@ -0,0 +1,65 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( +	"context" + +	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (p *Processor) Read( +	ctx context.Context, +	requestingAccount *gtsmodel.Account, +	id string, +) (*apimodel.Conversation, gtserror.WithCode) { +	// Get the conversation, including participating accounts and last status. +	conversation, errWithCode := p.getConversationOwnedBy(ctx, id, requestingAccount) +	if errWithCode != nil { +		return nil, errWithCode +	} + +	// Mark the conversation as read. +	conversation.Read = util.Ptr(true) +	if err := p.state.DB.UpsertConversation(ctx, conversation, "read"); err != nil { +		err = gtserror.Newf("DB error updating conversation %s: %w", id, err) +		return nil, gtserror.NewErrorInternalError(err) +	} + +	filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, requestingAccount) +	if errWithCode != nil { +		return nil, errWithCode +	} + +	apiConversation, err := p.converter.ConversationToAPIConversation( +		ctx, +		conversation, +		requestingAccount, +		filters, +		mutes, +	) +	if err != nil { +		err = gtserror.Newf("error converting conversation %s to API representation: %w", id, err) +		return nil, gtserror.NewErrorInternalError(err) +	} + +	return apiConversation, nil +} diff --git a/internal/processing/conversations/read_test.go b/internal/processing/conversations/read_test.go new file mode 100644 index 000000000..ebd8f7fe5 --- /dev/null +++ b/internal/processing/conversations/read_test.go @@ -0,0 +1,34 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( +	"context" + +	"github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (suite *ConversationsTestSuite) TestRead() { +	conversation := suite.NewTestConversation(suite.testAccount, 0) + +	suite.False(util.PtrOrValue(conversation.Read, false)) +	apiConversation, err := suite.conversationsProcessor.Read(context.Background(), suite.testAccount, conversation.ID) +	if suite.NoError(err) { +		suite.False(apiConversation.Unread) +	} +} diff --git a/internal/processing/conversations/update.go b/internal/processing/conversations/update.go new file mode 100644 index 000000000..7445994ae --- /dev/null +++ b/internal/processing/conversations/update.go @@ -0,0 +1,242 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations + +import ( +	"context" +	"errors" + +	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	statusfilter "github.com/superseriousbusiness/gotosocial/internal/filter/status" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/id" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/util" +) + +// ConversationNotification carries the arguments to processing/stream.Processor.Conversation. +type ConversationNotification struct { +	// AccountID of a local account to deliver the notification to. +	AccountID string +	// Conversation as the notification payload. +	Conversation *apimodel.Conversation +} + +// UpdateConversationsForStatus updates all conversations related to a status, +// and returns a map from local account IDs to conversation notifications that should be sent to them. +func (p *Processor) UpdateConversationsForStatus(ctx context.Context, status *gtsmodel.Status) ([]ConversationNotification, error) { +	if status.Visibility != gtsmodel.VisibilityDirect { +		// Only DMs are considered part of conversations. +		return nil, nil +	} +	if status.BoostOfID != "" { +		// Boosts can't be part of conversations. +		// FUTURE: This may change if we ever implement quote posts. +		return nil, nil +	} +	if status.ThreadID == "" { +		// If the status doesn't have a thread ID, it didn't mention a local account, +		// and thus can't be part of a conversation. +		return nil, nil +	} + +	// We need accounts to be populated for this. +	if err := p.state.DB.PopulateStatus(ctx, status); err != nil { +		return nil, gtserror.Newf("DB error populating status %s: %w", status.ID, err) +	} + +	// The account which authored the status plus all mentioned accounts. +	allParticipantsSet := make(map[string]*gtsmodel.Account, 1+len(status.Mentions)) +	allParticipantsSet[status.AccountID] = status.Account +	for _, mention := range status.Mentions { +		allParticipantsSet[mention.TargetAccountID] = mention.TargetAccount +	} + +	// Create or update conversations for and send notifications to each local participant. +	notifications := make([]ConversationNotification, 0, len(allParticipantsSet)) +	for _, participant := range allParticipantsSet { +		if participant.IsRemote() { +			continue +		} +		localAccount := participant + +		// If the status is not visible to this account, skip processing it for this account. +		visible, err := p.filter.StatusVisible(ctx, localAccount, status) +		if err != nil { +			log.Errorf( +				ctx, +				"error checking status %s visibility for account %s: %v", +				status.ID, +				localAccount.ID, +				err, +			) +			continue +		} else if !visible { +			continue +		} + +		// Is the status filtered or muted for this user? +		// Converting the status to an API status runs the filter/mute checks. +		filters, mutes, errWithCode := p.getFiltersAndMutes(ctx, localAccount) +		if errWithCode != nil { +			log.Error(ctx, errWithCode) +			continue +		} +		_, err = p.converter.StatusToAPIStatus( +			ctx, +			status, +			localAccount, +			statusfilter.FilterContextNotifications, +			filters, +			mutes, +		) +		if err != nil { +			// If the status matched a hide filter, skip processing it for this account. +			// If there was another kind of error, log that and skip it anyway. +			if !errors.Is(err, statusfilter.ErrHideStatus) { +				log.Errorf( +					ctx, +					"error checking status %s filtering/muting for account %s: %v", +					status.ID, +					localAccount.ID, +					err, +				) +			} +			continue +		} + +		// Collect other accounts participating in the conversation. +		otherAccounts := make([]*gtsmodel.Account, 0, len(allParticipantsSet)-1) +		otherAccountIDs := make([]string, 0, len(allParticipantsSet)-1) +		for accountID, account := range allParticipantsSet { +			if accountID != localAccount.ID { +				otherAccounts = append(otherAccounts, account) +				otherAccountIDs = append(otherAccountIDs, accountID) +			} +		} + +		// Check for a previously existing conversation, if there is one. +		conversation, err := p.state.DB.GetConversationByThreadAndAccountIDs( +			ctx, +			status.ThreadID, +			localAccount.ID, +			otherAccountIDs, +		) +		if err != nil && !errors.Is(err, db.ErrNoEntries) { +			log.Errorf( +				ctx, +				"error trying to find a previous conversation for status %s and account %s: %v", +				status.ID, +				localAccount.ID, +				err, +			) +			continue +		} + +		if conversation == nil { +			// Create a new conversation. +			conversation = >smodel.Conversation{ +				ID:               id.NewULID(), +				AccountID:        localAccount.ID, +				OtherAccountIDs:  otherAccountIDs, +				OtherAccounts:    otherAccounts, +				OtherAccountsKey: gtsmodel.ConversationOtherAccountsKey(otherAccountIDs), +				ThreadID:         status.ThreadID, +				Read:             util.Ptr(true), +			} +		} + +		// Assume that if the conversation owner posted the status, they've already read it. +		statusAuthoredByConversationOwner := status.AccountID == conversation.AccountID + +		// Update the conversation. +		// If there is no previous last status or this one is more recently created, set it as the last status. +		if conversation.LastStatus == nil || conversation.LastStatus.CreatedAt.Before(status.CreatedAt) { +			conversation.LastStatusID = status.ID +			conversation.LastStatus = status +		} +		// If the conversation is unread, leave it marked as unread. +		// If the conversation is read but this status might not have been, mark the conversation as unread. +		if !statusAuthoredByConversationOwner { +			conversation.Read = util.Ptr(false) +		} + +		// Create or update the conversation. +		err = p.state.DB.UpsertConversation(ctx, conversation) +		if err != nil { +			log.Errorf( +				ctx, +				"error creating or updating conversation %s for status %s and account %s: %v", +				conversation.ID, +				status.ID, +				localAccount.ID, +				err, +			) +			continue +		} + +		// Link the conversation to the status. +		if err := p.state.DB.LinkConversationToStatus(ctx, conversation.ID, status.ID); err != nil { +			log.Errorf( +				ctx, +				"error linking conversation %s to status %s: %v", +				conversation.ID, +				status.ID, +				err, +			) +			continue +		} + +		// Convert the conversation to API representation. +		apiConversation, err := p.converter.ConversationToAPIConversation( +			ctx, +			conversation, +			localAccount, +			filters, +			mutes, +		) +		if err != nil { +			// If the conversation's last status matched a hide filter, skip it. +			// If there was another kind of error, log that and skip it anyway. +			if !errors.Is(err, statusfilter.ErrHideStatus) { +				log.Errorf( +					ctx, +					"error converting conversation %s to API representation for account %s: %v", +					status.ID, +					localAccount.ID, +					err, +				) +			} +			continue +		} + +		// Generate a notification, +		// unless the status was authored by the user who would be notified, +		// in which case they already know. +		if status.AccountID != localAccount.ID { +			notifications = append(notifications, ConversationNotification{ +				AccountID:    localAccount.ID, +				Conversation: apiConversation, +			}) +		} +	} + +	return notifications, nil +} diff --git a/internal/processing/conversations/update_test.go b/internal/processing/conversations/update_test.go new file mode 100644 index 000000000..8ba2800fe --- /dev/null +++ b/internal/processing/conversations/update_test.go @@ -0,0 +1,54 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package conversations_test + +import ( +	"context" +) + +// Test that we can create conversations when a new status comes in. +func (suite *ConversationsTestSuite) TestUpdateConversationsForStatus() { +	ctx := context.Background() + +	// Precondition: the test user shouldn't have any conversations yet. +	conversations, err := suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) +	if err != nil { +		suite.FailNow(err.Error()) +	} +	suite.Empty(conversations) + +	// Create a status. +	threadID := suite.NewULID(0) +	status := suite.NewTestStatus(suite.testAccount, threadID, 0, nil) + +	// Update conversations for it. +	notifications, err := suite.conversationsProcessor.UpdateConversationsForStatus(ctx, status) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	// In this test, the user is DMing themself, and should not receive a notification from that. +	suite.Empty(notifications) + +	// The test user should have a conversation now. +	conversations, err = suite.db.GetConversationsByOwnerAccountID(ctx, suite.testAccount.ID, nil) +	if err != nil { +		suite.FailNow(err.Error()) +	} +	suite.NotEmpty(conversations) +} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index fb6b05d80..a07df76e1 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -27,7 +27,9 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/oauth"  	"github.com/superseriousbusiness/gotosocial/internal/processing/account"  	"github.com/superseriousbusiness/gotosocial/internal/processing/admin" +	"github.com/superseriousbusiness/gotosocial/internal/processing/advancedmigrations"  	"github.com/superseriousbusiness/gotosocial/internal/processing/common" +	"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"  	"github.com/superseriousbusiness/gotosocial/internal/processing/fedi"  	filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1"  	filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2" @@ -70,22 +72,24 @@ type Processor struct {  		SUB-PROCESSORS  	*/ -	account   account.Processor -	admin     admin.Processor -	fedi      fedi.Processor -	filtersv1 filtersv1.Processor -	filtersv2 filtersv2.Processor -	list      list.Processor -	markers   markers.Processor -	media     media.Processor -	polls     polls.Processor -	report    report.Processor -	search    search.Processor -	status    status.Processor -	stream    stream.Processor -	timeline  timeline.Processor -	user      user.Processor -	workers   workers.Processor +	account            account.Processor +	admin              admin.Processor +	advancedmigrations advancedmigrations.Processor +	conversations      conversations.Processor +	fedi               fedi.Processor +	filtersv1          filtersv1.Processor +	filtersv2          filtersv2.Processor +	list               list.Processor +	markers            markers.Processor +	media              media.Processor +	polls              polls.Processor +	report             report.Processor +	search             search.Processor +	status             status.Processor +	stream             stream.Processor +	timeline           timeline.Processor +	user               user.Processor +	workers            workers.Processor  }  func (p *Processor) Account() *account.Processor { @@ -96,6 +100,14 @@ func (p *Processor) Admin() *admin.Processor {  	return &p.admin  } +func (p *Processor) AdvancedMigrations() *advancedmigrations.Processor { +	return &p.advancedmigrations +} + +func (p *Processor) Conversations() *conversations.Processor { +	return &p.conversations +} +  func (p *Processor) Fedi() *fedi.Processor {  	return &p.fedi  } @@ -188,6 +200,7 @@ func NewProcessor(  	// processors + pin them to this struct.  	processor.account = account.New(&common, state, converter, mediaManager, federator, filter, parseMentionFunc)  	processor.admin = admin.New(&common, state, cleaner, federator, converter, mediaManager, federator.TransportController(), emailSender) +	processor.conversations = conversations.New(state, converter, filter)  	processor.fedi = fedi.New(state, &common, converter, federator, filter)  	processor.filtersv1 = filtersv1.New(state, converter, &processor.stream)  	processor.filtersv2 = filtersv2.New(state, converter, &processor.stream) @@ -200,6 +213,9 @@ func NewProcessor(  	processor.status = status.New(state, &common, &processor.polls, federator, converter, filter, parseMentionFunc)  	processor.user = user.New(state, converter, oauthServer, emailSender) +	// The advanced migrations processor sequences advanced migrations from all other processors. +	processor.advancedmigrations = advancedmigrations.New(&processor.conversations) +  	// Workers processor handles asynchronous  	// worker jobs; instantiate it separately  	// and pass subset of sub processors it needs. @@ -212,6 +228,7 @@ func NewProcessor(  		&processor.account,  		&processor.media,  		&processor.stream, +		&processor.conversations,  	)  	return processor diff --git a/internal/processing/stream/conversation.go b/internal/processing/stream/conversation.go new file mode 100644 index 000000000..a0236c459 --- /dev/null +++ b/internal/processing/stream/conversation.go @@ -0,0 +1,44 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package stream + +import ( +	"context" +	"encoding/json" + +	"codeberg.org/gruf/go-byteutil" +	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/stream" +) + +// Conversation streams the given conversation to any open, appropriate streams belonging to the given account. +func (p *Processor) Conversation(ctx context.Context, accountID string, conversation *apimodel.Conversation) { +	b, err := json.Marshal(conversation) +	if err != nil { +		log.Errorf(ctx, "error marshaling json: %v", err) +		return +	} +	p.streams.Post(ctx, accountID, stream.Message{ +		Payload: byteutil.B2S(b), +		Event:   stream.EventTypeConversation, +		Stream: []string{ +			stream.TimelineDirect, +		}, +	}) +} diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go index 49a68d27a..35c2c31b7 100644 --- a/internal/processing/workers/fromclientapi_test.go +++ b/internal/processing/workers/fromclientapi_test.go @@ -50,6 +50,8 @@ func (suite *FromClientAPITestSuite) newStatus(  	visibility gtsmodel.Visibility,  	replyToStatus *gtsmodel.Status,  	boostOfStatus *gtsmodel.Status, +	mentionedAccounts []*gtsmodel.Account, +	createThread bool,  ) *gtsmodel.Status {  	var (  		protocol = config.GetProtocol() @@ -102,6 +104,39 @@ func (suite *FromClientAPITestSuite) newStatus(  		newStatus.Visibility = boostOfStatus.Visibility  	} +	for _, mentionedAccount := range mentionedAccounts { +		newMention := >smodel.Mention{ +			ID:               id.NewULID(), +			StatusID:         newStatus.ID, +			Status:           newStatus, +			OriginAccountID:  account.ID, +			OriginAccountURI: account.URI, +			OriginAccount:    account, +			TargetAccountID:  mentionedAccount.ID, +			TargetAccount:    mentionedAccount, +			Silent:           util.Ptr(false), +		} + +		newStatus.Mentions = append(newStatus.Mentions, newMention) +		newStatus.MentionIDs = append(newStatus.MentionIDs, newMention.ID) + +		if err := state.DB.PutMention(ctx, newMention); err != nil { +			suite.FailNow(err.Error()) +		} +	} + +	if createThread { +		newThread := >smodel.Thread{ +			ID: id.NewULID(), +		} + +		newStatus.ThreadID = newThread.ID + +		if err := state.DB.PutThread(ctx, newThread); err != nil { +			suite.FailNow(err.Error()) +		} +	} +  	// Put the status in the db, to mimic what would  	// have already happened earlier up the flow.  	if err := state.DB.PutStatus(ctx, newStatus); err != nil { @@ -168,6 +203,31 @@ func (suite *FromClientAPITestSuite) statusJSON(  	return string(statusJSON)  } +func (suite *FromClientAPITestSuite) conversationJSON( +	ctx context.Context, +	typeConverter *typeutils.Converter, +	conversation *gtsmodel.Conversation, +	requestingAccount *gtsmodel.Account, +) string { +	apiConversation, err := typeConverter.ConversationToAPIConversation( +		ctx, +		conversation, +		requestingAccount, +		nil, +		nil, +	) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	conversationJSON, err := json.Marshal(apiConversation) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	return string(conversationJSON) +} +  func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {  	testStructs := suite.SetupTestStructs()  	defer suite.TearDownTestStructs(testStructs) @@ -194,6 +254,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {  			gtsmodel.VisibilityPublic,  			nil,  			nil, +			nil, +			false,  		)  	) @@ -303,6 +365,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {  			gtsmodel.VisibilityPublic,  			suite.testStatuses["local_account_2_status_1"],  			nil, +			nil, +			false,  		)  	) @@ -362,6 +426,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() {  			gtsmodel.VisibilityPublic,  			suite.testStatuses["local_account_1_status_1"],  			nil, +			nil, +			false,  		)  		threadMute = >smodel.ThreadMute{  			ID:        "01HD3KRMBB1M85QRWHD912QWRE", @@ -420,6 +486,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() {  			gtsmodel.VisibilityPublic,  			nil,  			suite.testStatuses["local_account_1_status_1"], +			nil, +			false,  		)  		threadMute = >smodel.ThreadMute{  			ID:        "01HD3KRMBB1M85QRWHD912QWRE", @@ -483,6 +551,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis  			gtsmodel.VisibilityPublic,  			suite.testStatuses["local_account_2_status_1"],  			nil, +			nil, +			false,  		)  	) @@ -556,6 +626,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis  			gtsmodel.VisibilityPublic,  			suite.testStatuses["local_account_2_status_1"],  			nil, +			nil, +			false,  		)  	) @@ -634,6 +706,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli  			gtsmodel.VisibilityPublic,  			suite.testStatuses["local_account_2_status_1"],  			nil, +			nil, +			false,  		)  	) @@ -704,6 +778,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {  			gtsmodel.VisibilityPublic,  			nil,  			suite.testStatuses["local_account_2_status_1"], +			nil, +			false,  		)  	) @@ -765,6 +841,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {  			gtsmodel.VisibilityPublic,  			nil,  			suite.testStatuses["local_account_2_status_1"], +			nil, +			false,  		)  	) @@ -807,6 +885,159 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {  	)  } +// A DM to a local user should create a conversation and accompanying notification. +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() { +	testStructs := suite.SetupTestStructs() +	defer suite.TearDownTestStructs(testStructs) + +	var ( +		ctx              = context.Background() +		postingAccount   = suite.testAccounts["local_account_2"] +		receivingAccount = suite.testAccounts["local_account_1"] +		streams          = suite.openStreams(ctx, +			testStructs.Processor, +			receivingAccount, +			nil, +		) +		homeStream   = streams[stream.TimelineHome] +		directStream = streams[stream.TimelineDirect] + +		// turtle posts a new top-level DM mentioning zork. +		status = suite.newStatus( +			ctx, +			testStructs.State, +			postingAccount, +			gtsmodel.VisibilityDirect, +			nil, +			nil, +			[]*gtsmodel.Account{receivingAccount}, +			true, +		) +	) + +	// Process the new status. +	if err := testStructs.Processor.Workers().ProcessFromClientAPI( +		ctx, +		&messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			GTSModel:       status, +			Origin:         postingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Locate the conversation which should now exist for zork. +	conversation, err := testStructs.State.DB.GetConversationByThreadAndAccountIDs( +		ctx, +		status.ThreadID, +		receivingAccount.ID, +		[]string{postingAccount.ID}, +	) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check status in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		"", +		stream.EventTypeUpdate, +	) + +	// Check mention notification in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		"", +		stream.EventTypeNotification, +	) + +	// Check conversation in direct stream. +	conversationJSON := suite.conversationJSON( +		ctx, +		testStructs.TypeConverter, +		conversation, +		receivingAccount, +	) +	suite.checkStreamed( +		directStream, +		true, +		conversationJSON, +		stream.EventTypeConversation, +	) +} + +// A public message to a local user should not result in a conversation notification. +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() { +	testStructs := suite.SetupTestStructs() +	defer suite.TearDownTestStructs(testStructs) + +	var ( +		ctx              = context.Background() +		postingAccount   = suite.testAccounts["local_account_2"] +		receivingAccount = suite.testAccounts["local_account_1"] +		streams          = suite.openStreams(ctx, +			testStructs.Processor, +			receivingAccount, +			nil, +		) +		homeStream   = streams[stream.TimelineHome] +		directStream = streams[stream.TimelineDirect] + +		// turtle posts a new top-level public message mentioning zork. +		status = suite.newStatus( +			ctx, +			testStructs.State, +			postingAccount, +			gtsmodel.VisibilityPublic, +			nil, +			nil, +			[]*gtsmodel.Account{receivingAccount}, +			true, +		) +	) + +	// Process the new status. +	if err := testStructs.Processor.Workers().ProcessFromClientAPI( +		ctx, +		&messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			GTSModel:       status, +			Origin:         postingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check status in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		"", +		stream.EventTypeUpdate, +	) + +	// Check mention notification in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		"", +		stream.EventTypeNotification, +	) + +	// Check for absence of conversation notification in direct stream. +	suite.checkStreamed( +		directStream, +		false, +		"", +		"", +	) +} +  func (suite *FromClientAPITestSuite) TestProcessStatusDelete() {  	testStructs := suite.SetupTestStructs()  	defer suite.TearDownTestStructs(testStructs) diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go index 5ec905ae8..1a7dbbfe5 100644 --- a/internal/processing/workers/surface.go +++ b/internal/processing/workers/surface.go @@ -20,6 +20,7 @@ package workers  import (  	"github.com/superseriousbusiness/gotosocial/internal/email"  	"github.com/superseriousbusiness/gotosocial/internal/filter/visibility" +	"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"  	"github.com/superseriousbusiness/gotosocial/internal/processing/stream"  	"github.com/superseriousbusiness/gotosocial/internal/state"  	"github.com/superseriousbusiness/gotosocial/internal/typeutils" @@ -32,9 +33,10 @@ import (  //   - sending a notification to a user  //   - sending an email  type Surface struct { -	State       *state.State -	Converter   *typeutils.Converter -	Stream      *stream.Processor -	Filter      *visibility.Filter -	EmailSender email.Sender +	State         *state.State +	Converter     *typeutils.Converter +	Stream        *stream.Processor +	Filter        *visibility.Filter +	EmailSender   email.Sender +	Conversations *conversations.Processor  } diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go index 18d0277ae..937ddeca2 100644 --- a/internal/processing/workers/surfacenotify_test.go +++ b/internal/processing/workers/surfacenotify_test.go @@ -39,11 +39,12 @@ func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() {  	defer suite.TearDownTestStructs(testStructs)  	surface := &workers.Surface{ -		State:       testStructs.State, -		Converter:   testStructs.TypeConverter, -		Stream:      testStructs.Processor.Stream(), -		Filter:      visibility.NewFilter(testStructs.State), -		EmailSender: testStructs.EmailSender, +		State:         testStructs.State, +		Converter:     testStructs.TypeConverter, +		Stream:        testStructs.Processor.Stream(), +		Filter:        visibility.NewFilter(testStructs.State), +		EmailSender:   testStructs.EmailSender, +		Conversations: testStructs.Processor.Conversations(),  	}  	var ( diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go index 41d7f6f2a..8ac8293ed 100644 --- a/internal/processing/workers/surfacetimeline.go +++ b/internal/processing/workers/surfacetimeline.go @@ -36,8 +36,8 @@ import (  // and LIST timelines of accounts that follow the status author.  //  // It will also handle notifications for any mentions attached to -// the account, and notifications for any local accounts that want -// to know when this account posts. +// the account, notifications for any local accounts that want +// to know when this account posts, and conversations containing the status.  func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {  	// Ensure status fully populated; including account, mentions, etc.  	if err := s.State.DB.PopulateStatus(ctx, status); err != nil { @@ -73,6 +73,15 @@ func (s *Surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.  		return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)  	} +	// Update any conversations containing this status, and send conversation notifications. +	notifications, err := s.Conversations.UpdateConversationsForStatus(ctx, status) +	if err != nil { +		return gtserror.Newf("error updating conversations for status %s: %w", status.ID, err) +	} +	for _, notification := range notifications { +		s.Stream.Conversation(ctx, notification.AccountID, notification.Conversation) +	} +  	return nil  } diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go index 780e5ca14..915370976 100644 --- a/internal/processing/workers/util.go +++ b/internal/processing/workers/util.go @@ -137,6 +137,11 @@ func (u *utils) wipeStatus(  		errs.Appendf("error deleting status from timelines: %w", err)  	} +	// delete this status from any conversations that it's part of +	if err := u.state.DB.DeleteStatusFromConversations(ctx, statusToDelete.ID); err != nil { +		errs.Appendf("error deleting status from conversations: %w", err) +	} +  	// finally, delete the status itself  	if err := u.state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil {  		errs.Appendf("error deleting status: %w", err) diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go index 6b4cc07a6..c7f67b025 100644 --- a/internal/processing/workers/workers.go +++ b/internal/processing/workers/workers.go @@ -22,6 +22,7 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/federation"  	"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"  	"github.com/superseriousbusiness/gotosocial/internal/processing/account" +	"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"  	"github.com/superseriousbusiness/gotosocial/internal/processing/media"  	"github.com/superseriousbusiness/gotosocial/internal/processing/stream"  	"github.com/superseriousbusiness/gotosocial/internal/state" @@ -44,6 +45,7 @@ func New(  	account *account.Processor,  	media *media.Processor,  	stream *stream.Processor, +	conversations *conversations.Processor,  ) Processor {  	// Init federate logic  	// wrapper struct. @@ -56,11 +58,12 @@ func New(  	// Init surface logic  	// wrapper struct.  	surface := &Surface{ -		State:       state, -		Converter:   converter, -		Stream:      stream, -		Filter:      filter, -		EmailSender: emailSender, +		State:         state, +		Converter:     converter, +		Stream:        stream, +		Filter:        filter, +		EmailSender:   emailSender, +		Conversations: conversations,  	}  	// Init shared util funcs. diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go index f66190d75..3093fd93a 100644 --- a/internal/processing/workers/workers_test.go +++ b/internal/processing/workers/workers_test.go @@ -108,6 +108,7 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce  		stream.TimelineHome,  		stream.TimelinePublic,  		stream.TimelineNotifications, +		stream.TimelineDirect,  	} {  		stream, err := processor.Stream().Open(ctx, account, streamType)  		if err != nil {  | 
