diff options
Diffstat (limited to 'internal/processing/workers')
| -rw-r--r-- | internal/processing/workers/federate.go | 892 | ||||
| -rw-r--r-- | internal/processing/workers/fromclientapi.go | 548 | ||||
| -rw-r--r-- | internal/processing/workers/fromclientapi_test.go | 589 | ||||
| -rw-r--r-- | internal/processing/workers/fromfediapi.go | 540 | ||||
| -rw-r--r-- | internal/processing/workers/fromfediapi_test.go | 565 | ||||
| -rw-r--r-- | internal/processing/workers/surface.go | 40 | ||||
| -rw-r--r-- | internal/processing/workers/surfaceemail.go | 160 | ||||
| -rw-r--r-- | internal/processing/workers/surfacenotify.go | 221 | ||||
| -rw-r--r-- | internal/processing/workers/surfacetimeline.go | 401 | ||||
| -rw-r--r-- | internal/processing/workers/wipestatus.go | 119 | ||||
| -rw-r--r-- | internal/processing/workers/workers.go | 92 | ||||
| -rw-r--r-- | internal/processing/workers/workers_test.go | 169 | 
12 files changed, 4336 insertions, 0 deletions
diff --git a/internal/processing/workers/federate.go b/internal/processing/workers/federate.go new file mode 100644 index 000000000..76bfc892e --- /dev/null +++ b/internal/processing/workers/federate.go @@ -0,0 +1,892 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"context" +	"net/url" + +	"github.com/superseriousbusiness/activity/pub" +	"github.com/superseriousbusiness/activity/streams" +	"github.com/superseriousbusiness/gotosocial/internal/federation" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +// federate wraps functions for federating +// something out via ActivityPub in response +// to message processing. +type federate struct { +	// Embed federator to give access +	// to send and retrieve functions. +	federation.Federator +	state *state.State +	tc    typeutils.TypeConverter +} + +// parseURI is a cheeky little +// shortcut to wrap parsing errors. +// +// The returned err will be prepended +// with the name of the function that +// called this function, so it can be +// returned without further wrapping. +func parseURI(s string) (*url.URL, error) { +	const ( +		// Provides enough calldepth to +		// prepend the name of whatever +		// function called *this* one, +		// so that they don't have to +		// wrap the error themselves. +		calldepth = 3 +		errFmt    = "error parsing uri %s: %w" +	) + +	uri, err := url.Parse(s) +	if err != nil { +		return nil, gtserror.NewfAt(calldepth, errFmt, s, err) +	} + +	return uri, err +} + +func (f *federate) DeleteAccount(ctx context.Context, account *gtsmodel.Account) error { +	// Do nothing if it's not our +	// account that's been deleted. +	if !account.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(account.OutboxURI) +	if err != nil { +		return err +	} + +	actorIRI, err := parseURI(account.URI) +	if err != nil { +		return err +	} + +	followersIRI, err := parseURI(account.FollowersURI) +	if err != nil { +		return err +	} + +	publicIRI, err := parseURI(pub.PublicActivityPubIRI) +	if err != nil { +		return err +	} + +	// Create a new delete. +	// todo: tc.AccountToASDelete +	delete := streams.NewActivityStreamsDelete() + +	// Set the Actor for the delete; no matter +	// who actually did the delete, we should +	// use the account owner for this. +	deleteActor := streams.NewActivityStreamsActorProperty() +	deleteActor.AppendIRI(actorIRI) +	delete.SetActivityStreamsActor(deleteActor) + +	// Set the account's IRI as the 'object' property. +	deleteObject := streams.NewActivityStreamsObjectProperty() +	deleteObject.AppendIRI(actorIRI) +	delete.SetActivityStreamsObject(deleteObject) + +	// Address the delete To followers. +	deleteTo := streams.NewActivityStreamsToProperty() +	deleteTo.AppendIRI(followersIRI) +	delete.SetActivityStreamsTo(deleteTo) + +	// Address the delete CC public. +	deleteCC := streams.NewActivityStreamsCcProperty() +	deleteCC.AppendIRI(publicIRI) +	delete.SetActivityStreamsCc(deleteCC) + +	// Send the Delete via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, delete, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			delete, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) CreateStatus(ctx context.Context, status *gtsmodel.Status) error { +	// Do nothing if the status +	// shouldn't be federated. +	if !*status.Federated { +		return nil +	} + +	// Do nothing if this +	// isn't our status. +	if !*status.Local { +		return nil +	} + +	// Populate model. +	if err := f.state.DB.PopulateStatus(ctx, status); err != nil { +		return gtserror.Newf("error populating status: %w", err) +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(status.Account.OutboxURI) +	if err != nil { +		return err +	} + +	// Convert status to an ActivityStreams +	// Note, wrapped in a Create activity. +	asStatus, err := f.tc.StatusToAS(ctx, status) +	if err != nil { +		return gtserror.Newf("error converting status to AS: %w", err) +	} + +	create, err := f.tc.WrapNoteInCreate(asStatus, false) +	if err != nil { +		return gtserror.Newf("error wrapping status in create: %w", err) +	} + +	// Send the Create via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, create, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			create, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) DeleteStatus(ctx context.Context, status *gtsmodel.Status) error { +	// Do nothing if the status +	// shouldn't be federated. +	if !*status.Federated { +		return nil +	} + +	// Do nothing if this +	// isn't our status. +	if !*status.Local { +		return nil +	} + +	// Populate model. +	if err := f.state.DB.PopulateStatus(ctx, status); err != nil { +		return gtserror.Newf("error populating status: %w", err) +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(status.Account.OutboxURI) +	if err != nil { +		return err +	} + +	// Wrap the status URI in a Delete activity. +	delete, err := f.tc.StatusToASDelete(ctx, status) +	if err != nil { +		return gtserror.Newf("error creating Delete: %w", err) +	} + +	// Send the Delete via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, delete, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			delete, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) Follow(ctx context.Context, follow *gtsmodel.Follow) error { +	// Populate model. +	if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { +		return gtserror.Newf("error populating follow: %w", err) +	} + +	// Do nothing if both accounts are local. +	if follow.Account.IsLocal() && +		follow.TargetAccount.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(follow.Account.OutboxURI) +	if err != nil { +		return err +	} + +	// Convert follow to ActivityStreams Follow. +	asFollow, err := f.tc.FollowToAS(ctx, follow) +	if err != nil { +		return gtserror.Newf("error converting follow to AS: %s", err) +	} + +	// Send the Follow via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, asFollow, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			asFollow, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) UndoFollow(ctx context.Context, follow *gtsmodel.Follow) error { +	// Populate model. +	if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { +		return gtserror.Newf("error populating follow: %w", err) +	} + +	// Do nothing if both accounts are local. +	if follow.Account.IsLocal() && +		follow.TargetAccount.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(follow.Account.OutboxURI) +	if err != nil { +		return err +	} + +	targetAccountIRI, err := parseURI(follow.TargetAccount.URI) +	if err != nil { +		return err +	} + +	// Recreate the ActivityStreams Follow. +	asFollow, err := f.tc.FollowToAS(ctx, follow) +	if err != nil { +		return gtserror.Newf("error converting follow to AS: %w", err) +	} + +	// Create a new Undo. +	// todo: tc.FollowToASUndo +	undo := streams.NewActivityStreamsUndo() + +	// Set the Actor for the Undo: +	// same as the actor for the Follow. +	undo.SetActivityStreamsActor(asFollow.GetActivityStreamsActor()) + +	// Set recreated Follow as the 'object' property. +	// +	// For most AP implementations, it's not enough +	// to just send the URI of the original Follow, +	// we have to send the whole object again. +	undoObject := streams.NewActivityStreamsObjectProperty() +	undoObject.AppendActivityStreamsFollow(asFollow) +	undo.SetActivityStreamsObject(undoObject) + +	// Address the Undo To the target account. +	undoTo := streams.NewActivityStreamsToProperty() +	undoTo.AppendIRI(targetAccountIRI) +	undo.SetActivityStreamsTo(undoTo) + +	// Send the Undo via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, undo, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			undo, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) UndoLike(ctx context.Context, fave *gtsmodel.StatusFave) error { +	// Populate model. +	if err := f.state.DB.PopulateStatusFave(ctx, fave); err != nil { +		return gtserror.Newf("error populating fave: %w", err) +	} + +	// Do nothing if both accounts are local. +	if fave.Account.IsLocal() && +		fave.TargetAccount.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(fave.Account.OutboxURI) +	if err != nil { +		return err +	} + +	targetAccountIRI, err := parseURI(fave.TargetAccount.URI) +	if err != nil { +		return err +	} + +	// Recreate the ActivityStreams Like. +	like, err := f.tc.FaveToAS(ctx, fave) +	if err != nil { +		return gtserror.Newf("error converting fave to AS: %w", err) +	} + +	// Create a new Undo. +	// todo: tc.FaveToASUndo +	undo := streams.NewActivityStreamsUndo() + +	// Set the Actor for the Undo: +	// same as the actor for the Like. +	undo.SetActivityStreamsActor(like.GetActivityStreamsActor()) + +	// Set recreated Like as the 'object' property. +	// +	// For most AP implementations, it's not enough +	// to just send the URI of the original Like, +	// we have to send the whole object again. +	undoObject := streams.NewActivityStreamsObjectProperty() +	undoObject.AppendActivityStreamsLike(like) +	undo.SetActivityStreamsObject(undoObject) + +	// Address the Undo To the target account. +	undoTo := streams.NewActivityStreamsToProperty() +	undoTo.AppendIRI(targetAccountIRI) +	undo.SetActivityStreamsTo(undoTo) + +	// Send the Undo via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, undo, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			undo, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) UndoAnnounce(ctx context.Context, boost *gtsmodel.Status) error { +	// Populate model. +	if err := f.state.DB.PopulateStatus(ctx, boost); err != nil { +		return gtserror.Newf("error populating status: %w", err) +	} + +	// Do nothing if boosting +	// account isn't ours. +	if !boost.Account.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(boost.Account.OutboxURI) +	if err != nil { +		return err +	} + +	// Recreate the ActivityStreams Announce. +	asAnnounce, err := f.tc.BoostToAS( +		ctx, +		boost, +		boost.Account, +		boost.BoostOfAccount, +	) +	if err != nil { +		return gtserror.Newf("error converting boost to AS: %w", err) +	} + +	// Create a new Undo. +	// todo: tc.AnnounceToASUndo +	undo := streams.NewActivityStreamsUndo() + +	// Set the Actor for the Undo: +	// same as the actor for the Announce. +	undo.SetActivityStreamsActor(asAnnounce.GetActivityStreamsActor()) + +	// Set recreated Announce as the 'object' property. +	// +	// For most AP implementations, it's not enough +	// to just send the URI of the original Announce, +	// we have to send the whole object again. +	undoObject := streams.NewActivityStreamsObjectProperty() +	undoObject.AppendActivityStreamsAnnounce(asAnnounce) +	undo.SetActivityStreamsObject(undoObject) + +	// Address the Undo To the Announce To. +	undo.SetActivityStreamsTo(asAnnounce.GetActivityStreamsTo()) + +	// Address the Undo CC the Announce CC. +	undo.SetActivityStreamsCc(asAnnounce.GetActivityStreamsCc()) + +	// Send the Undo via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, undo, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			undo, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) AcceptFollow(ctx context.Context, follow *gtsmodel.Follow) error { +	// Populate model. +	if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { +		return gtserror.Newf("error populating follow: %w", err) +	} + +	// Bail if requesting account is ours: +	// we've already accepted internally and +	// shouldn't send an Accept to ourselves. +	if follow.Account.IsLocal() { +		return nil +	} + +	// Bail if target account isn't ours: +	// we can't Accept a follow on +	// another instance's behalf. +	if follow.TargetAccount.IsRemote() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(follow.TargetAccount.OutboxURI) +	if err != nil { +		return err +	} + +	acceptingAccountIRI, err := parseURI(follow.TargetAccount.URI) +	if err != nil { +		return err +	} + +	requestingAccountIRI, err := parseURI(follow.Account.URI) +	if err != nil { +		return err +	} + +	// Recreate the ActivityStreams Follow. +	asFollow, err := f.tc.FollowToAS(ctx, follow) +	if err != nil { +		return gtserror.Newf("error converting follow to AS: %w", err) +	} + +	// Create a new Accept. +	// todo: tc.FollowToASAccept +	accept := streams.NewActivityStreamsAccept() + +	// Set the requestee as Actor of the Accept. +	acceptActorProp := streams.NewActivityStreamsActorProperty() +	acceptActorProp.AppendIRI(acceptingAccountIRI) +	accept.SetActivityStreamsActor(acceptActorProp) + +	// Set recreated Follow as the 'object' property. +	// +	// For most AP implementations, it's not enough +	// to just send the URI of the original Follow, +	// we have to send the whole object again. +	acceptObject := streams.NewActivityStreamsObjectProperty() +	acceptObject.AppendActivityStreamsFollow(asFollow) +	accept.SetActivityStreamsObject(acceptObject) + +	// Address the Accept To the Follow requester. +	acceptTo := streams.NewActivityStreamsToProperty() +	acceptTo.AppendIRI(requestingAccountIRI) +	accept.SetActivityStreamsTo(acceptTo) + +	// Send the Accept via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, accept, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			accept, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) RejectFollow(ctx context.Context, follow *gtsmodel.Follow) error { +	// Ensure follow populated before proceeding. +	if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { +		return gtserror.Newf("error populating follow: %w", err) +	} + +	// Bail if requesting account is ours: +	// we've already rejected internally and +	// shouldn't send an Reject to ourselves. +	if follow.Account.IsLocal() { +		return nil +	} + +	// Bail if target account isn't ours: +	// we can't Reject a follow on +	// another instance's behalf. +	if follow.TargetAccount.IsRemote() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(follow.TargetAccount.OutboxURI) +	if err != nil { +		return err +	} + +	rejectingAccountIRI, err := parseURI(follow.TargetAccount.URI) +	if err != nil { +		return err +	} + +	requestingAccountIRI, err := parseURI(follow.Account.URI) +	if err != nil { +		return err +	} + +	// Recreate the ActivityStreams Follow. +	asFollow, err := f.tc.FollowToAS(ctx, follow) +	if err != nil { +		return gtserror.Newf("error converting follow to AS: %w", err) +	} + +	// Create a new Reject. +	// todo: tc.FollowRequestToASReject +	reject := streams.NewActivityStreamsReject() + +	// Set the requestee as Actor of the Reject. +	rejectActorProp := streams.NewActivityStreamsActorProperty() +	rejectActorProp.AppendIRI(rejectingAccountIRI) +	reject.SetActivityStreamsActor(rejectActorProp) + +	// Set recreated Follow as the 'object' property. +	// +	// For most AP implementations, it's not enough +	// to just send the URI of the original Follow, +	// we have to send the whole object again. +	rejectObject := streams.NewActivityStreamsObjectProperty() +	rejectObject.AppendActivityStreamsFollow(asFollow) +	reject.SetActivityStreamsObject(rejectObject) + +	// Address the Reject To the Follow requester. +	rejectTo := streams.NewActivityStreamsToProperty() +	rejectTo.AppendIRI(requestingAccountIRI) +	reject.SetActivityStreamsTo(rejectTo) + +	// Send the Reject via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, reject, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			reject, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) Like(ctx context.Context, fave *gtsmodel.StatusFave) error { +	// Populate model. +	if err := f.state.DB.PopulateStatusFave(ctx, fave); err != nil { +		return gtserror.Newf("error populating fave: %w", err) +	} + +	// Do nothing if both accounts are local. +	if fave.Account.IsLocal() && +		fave.TargetAccount.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(fave.Account.OutboxURI) +	if err != nil { +		return err +	} + +	// Create the ActivityStreams Like. +	like, err := f.tc.FaveToAS(ctx, fave) +	if err != nil { +		return gtserror.Newf("error converting fave to AS Like: %w", err) +	} + +	// Send the Like via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, like, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			like, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) Announce(ctx context.Context, boost *gtsmodel.Status) error { +	// Populate model. +	if err := f.state.DB.PopulateStatus(ctx, boost); err != nil { +		return gtserror.Newf("error populating status: %w", err) +	} + +	// Do nothing if boosting +	// account isn't ours. +	if !boost.Account.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(boost.Account.OutboxURI) +	if err != nil { +		return err +	} + +	// Create the ActivityStreams Announce. +	announce, err := f.tc.BoostToAS( +		ctx, +		boost, +		boost.Account, +		boost.BoostOfAccount, +	) +	if err != nil { +		return gtserror.Newf("error converting boost to AS: %w", err) +	} + +	// Send the Announce via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, announce, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			announce, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) UpdateAccount(ctx context.Context, account *gtsmodel.Account) error { +	// Populate model. +	if err := f.state.DB.PopulateAccount(ctx, account); err != nil { +		return gtserror.Newf("error populating account: %w", err) +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(account.OutboxURI) +	if err != nil { +		return err +	} + +	// Convert account to ActivityStreams Person. +	person, err := f.tc.AccountToAS(ctx, account) +	if err != nil { +		return gtserror.Newf("error converting account to Person: %w", err) +	} + +	// Use ActivityStreams Person as Object of Update. +	update, err := f.tc.WrapPersonInUpdate(person, account) +	if err != nil { +		return gtserror.Newf("error wrapping Person in Update: %w", err) +	} + +	// Send the Update via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, update, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			update, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) Block(ctx context.Context, block *gtsmodel.Block) error { +	// Populate model. +	if err := f.state.DB.PopulateBlock(ctx, block); err != nil { +		return gtserror.Newf("error populating block: %w", err) +	} + +	// Do nothing if both accounts are local. +	if block.Account.IsLocal() && +		block.TargetAccount.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(block.Account.OutboxURI) +	if err != nil { +		return err +	} + +	// Convert block to ActivityStreams Block. +	asBlock, err := f.tc.BlockToAS(ctx, block) +	if err != nil { +		return gtserror.Newf("error converting block to AS: %w", err) +	} + +	// Send the Block via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, asBlock, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			asBlock, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) UndoBlock(ctx context.Context, block *gtsmodel.Block) error { +	// Populate model. +	if err := f.state.DB.PopulateBlock(ctx, block); err != nil { +		return gtserror.Newf("error populating block: %w", err) +	} + +	// Do nothing if both accounts are local. +	if block.Account.IsLocal() && +		block.TargetAccount.IsLocal() { +		return nil +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(block.Account.OutboxURI) +	if err != nil { +		return err +	} + +	targetAccountIRI, err := parseURI(block.TargetAccount.URI) +	if err != nil { +		return err +	} + +	// Convert block to ActivityStreams Block. +	asBlock, err := f.tc.BlockToAS(ctx, block) +	if err != nil { +		return gtserror.Newf("error converting block to AS: %w", err) +	} + +	// Create a new Undo. +	// todo: tc.BlockToASUndo +	undo := streams.NewActivityStreamsUndo() + +	// Set the Actor for the Undo: +	// same as the actor for the Block. +	undo.SetActivityStreamsActor(asBlock.GetActivityStreamsActor()) + +	// Set Block as the 'object' property. +	// +	// For most AP implementations, it's not enough +	// to just send the URI of the original Block, +	// we have to send the whole object again. +	undoObject := streams.NewActivityStreamsObjectProperty() +	undoObject.AppendActivityStreamsBlock(asBlock) +	undo.SetActivityStreamsObject(undoObject) + +	// Address the Undo To the target account. +	undoTo := streams.NewActivityStreamsToProperty() +	undoTo.AppendIRI(targetAccountIRI) +	undo.SetActivityStreamsTo(undoTo) + +	// Send the Undo via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, undo, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			undo, outboxIRI, err, +		) +	} + +	return nil +} + +func (f *federate) Flag(ctx context.Context, report *gtsmodel.Report) error { +	// Populate model. +	if err := f.state.DB.PopulateReport(ctx, report); err != nil { +		return gtserror.Newf("error populating report: %w", err) +	} + +	// Do nothing if report target +	// is not remote account. +	if report.TargetAccount.IsLocal() { +		return nil +	} + +	// Get our instance account from the db: +	// to anonymize the report, we'll deliver +	// using the outbox of the instance account. +	instanceAcct, err := f.state.DB.GetInstanceAccount(ctx, "") +	if err != nil { +		return gtserror.Newf("error getting instance account: %w", err) +	} + +	// Parse relevant URI(s). +	outboxIRI, err := parseURI(instanceAcct.OutboxURI) +	if err != nil { +		return err +	} + +	targetAccountIRI, err := parseURI(report.TargetAccount.URI) +	if err != nil { +		return err +	} + +	// Convert report to ActivityStreams Flag. +	flag, err := f.tc.ReportToASFlag(ctx, report) +	if err != nil { +		return gtserror.Newf("error converting report to AS: %w", err) +	} + +	// To is not set explicitly on Flags. Instead, +	// address Flag BTo report target account URI. +	// This ensures that our federating actor still +	// knows where to send the report, but the BTo +	// property will be stripped before sending. +	// +	// Happily, BTo does not prevent federating +	// actor from using shared inbox to deliver. +	bTo := streams.NewActivityStreamsBtoProperty() +	bTo.AppendIRI(targetAccountIRI) +	flag.SetActivityStreamsBto(bTo) + +	// Send the Flag via the Actor's outbox. +	if _, err := f.FederatingActor().Send( +		ctx, outboxIRI, flag, +	); err != nil { +		return gtserror.Newf( +			"error sending activity %T via outbox %s: %w", +			flag, outboxIRI, err, +		) +	} + +	return nil +} diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go new file mode 100644 index 000000000..40efc20bb --- /dev/null +++ b/internal/processing/workers/fromclientapi.go @@ -0,0 +1,548 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"context" +	"errors" + +	"codeberg.org/gruf/go-kv" +	"codeberg.org/gruf/go-logger/v2/level" +	"github.com/superseriousbusiness/gotosocial/internal/ap" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/processing/account" +	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +// clientAPI wraps processing functions +// specifically for messages originating +// from the client/REST API. +type clientAPI struct { +	state      *state.State +	tc         typeutils.TypeConverter +	surface    *surface +	federate   *federate +	wipeStatus wipeStatus +	account    *account.Processor +} + +func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) { +	log.Trace(ctx, "enqueuing") +	_ = p.workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) { +		for _, msg := range msgs { +			log.Trace(ctx, "processing: %+v", msg) +			if err := p.ProcessFromClientAPI(ctx, msg); err != nil { +				log.Errorf(ctx, "error processing client API message: %v", err) +			} +		} +	}) +} + +func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg messages.FromClientAPI) error { +	// Allocate new log fields slice +	fields := make([]kv.Field, 3, 4) +	fields[0] = kv.Field{"activityType", cMsg.APActivityType} +	fields[1] = kv.Field{"objectType", cMsg.APObjectType} +	fields[2] = kv.Field{"fromAccount", cMsg.OriginAccount.Username} + +	// Include GTSModel in logs if appropriate. +	if cMsg.GTSModel != nil && +		log.Level() >= level.DEBUG { +		fields = append(fields, kv.Field{ +			"model", cMsg.GTSModel, +		}) +	} + +	l := log.WithContext(ctx).WithFields(fields...) +	l.Info("processing from client API") + +	switch cMsg.APActivityType { + +	// CREATE SOMETHING +	case ap.ActivityCreate: +		switch cMsg.APObjectType { + +		// CREATE PROFILE/ACCOUNT +		case ap.ObjectProfile, ap.ActorPerson: +			return p.clientAPI.CreateAccount(ctx, cMsg) + +		// CREATE NOTE/STATUS +		case ap.ObjectNote: +			return p.clientAPI.CreateStatus(ctx, cMsg) + +		// CREATE FOLLOW (request) +		case ap.ActivityFollow: +			return p.clientAPI.CreateFollowReq(ctx, cMsg) + +		// CREATE LIKE/FAVE +		case ap.ActivityLike: +			return p.clientAPI.CreateLike(ctx, cMsg) + +		// CREATE ANNOUNCE/BOOST +		case ap.ActivityAnnounce: +			return p.clientAPI.CreateAnnounce(ctx, cMsg) + +		// CREATE BLOCK +		case ap.ActivityBlock: +			return p.clientAPI.CreateBlock(ctx, cMsg) +		} + +	// UPDATE SOMETHING +	case ap.ActivityUpdate: +		switch cMsg.APObjectType { + +		// UPDATE PROFILE/ACCOUNT +		case ap.ObjectProfile, ap.ActorPerson: +			return p.clientAPI.UpdateAccount(ctx, cMsg) + +		// UPDATE A FLAG/REPORT (mark as resolved/closed) +		case ap.ActivityFlag: +			return p.clientAPI.UpdateReport(ctx, cMsg) +		} + +	// ACCEPT SOMETHING +	case ap.ActivityAccept: +		switch cMsg.APObjectType { //nolint:gocritic + +		// ACCEPT FOLLOW (request) +		case ap.ActivityFollow: +			return p.clientAPI.AcceptFollow(ctx, cMsg) +		} + +	// REJECT SOMETHING +	case ap.ActivityReject: +		switch cMsg.APObjectType { //nolint:gocritic + +		// REJECT FOLLOW (request) +		case ap.ActivityFollow: +			return p.clientAPI.RejectFollowRequest(ctx, cMsg) +		} + +	// UNDO SOMETHING +	case ap.ActivityUndo: +		switch cMsg.APObjectType { + +		// UNDO FOLLOW (request) +		case ap.ActivityFollow: +			return p.clientAPI.UndoFollow(ctx, cMsg) + +		// UNDO BLOCK +		case ap.ActivityBlock: +			return p.clientAPI.UndoBlock(ctx, cMsg) + +		// UNDO LIKE/FAVE +		case ap.ActivityLike: +			return p.clientAPI.UndoFave(ctx, cMsg) + +		// UNDO ANNOUNCE/BOOST +		case ap.ActivityAnnounce: +			return p.clientAPI.UndoAnnounce(ctx, cMsg) +		} + +	// DELETE SOMETHING +	case ap.ActivityDelete: +		switch cMsg.APObjectType { + +		// DELETE NOTE/STATUS +		case ap.ObjectNote: +			return p.clientAPI.DeleteStatus(ctx, cMsg) + +		// DELETE PROFILE/ACCOUNT +		case ap.ObjectProfile, ap.ActorPerson: +			return p.clientAPI.DeleteAccount(ctx, cMsg) +		} + +	// FLAG/REPORT SOMETHING +	case ap.ActivityFlag: +		switch cMsg.APObjectType { //nolint:gocritic + +		// FLAG/REPORT A PROFILE +		case ap.ObjectProfile: +			return p.clientAPI.ReportAccount(ctx, cMsg) +		} +	} + +	return nil +} + +func (p *clientAPI) CreateAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +	account, ok := cMsg.GTSModel.(*gtsmodel.Account) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Account", cMsg.GTSModel) +	} + +	// Send a confirmation email to the newly created account. +	user, err := p.state.DB.GetUserByAccountID(ctx, account.ID) +	if err != nil { +		return gtserror.Newf("db error getting user for account id %s: %w", account.ID, err) +	} + +	if err := p.surface.emailPleaseConfirm(ctx, user, account.Username); err != nil { +		return gtserror.Newf("error emailing %s: %w", account.Username, err) +	} + +	return nil +} + +func (p *clientAPI) CreateStatus(ctx context.Context, cMsg messages.FromClientAPI) error { +	status, ok := cMsg.GTSModel.(*gtsmodel.Status) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) +	} + +	if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { +		return gtserror.Newf("error timelining status: %w", err) +	} + +	if status.InReplyToID != "" { +		// Interaction counts changed on the replied status; +		// uncache the prepared version from all timelines. +		p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) +	} + +	if err := p.federate.CreateStatus(ctx, status); err != nil { +		return gtserror.Newf("error federating status: %w", err) +	} + +	return nil +} + +func (p *clientAPI) CreateFollowReq(ctx context.Context, cMsg messages.FromClientAPI) error { +	followRequest, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel) +	} + +	if err := p.surface.notifyFollowRequest(ctx, followRequest); err != nil { +		return gtserror.Newf("error notifying follow request: %w", err) +	} + +	if err := p.federate.Follow( +		ctx, +		p.tc.FollowRequestToFollow(ctx, followRequest), +	); err != nil { +		return gtserror.Newf("error federating follow: %w", err) +	} + +	return nil +} + +func (p *clientAPI) CreateLike(ctx context.Context, cMsg messages.FromClientAPI) error { +	fave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) +	} + +	if err := p.surface.notifyFave(ctx, fave); err != nil { +		return gtserror.Newf("error notifying fave: %w", err) +	} + +	// Interaction counts changed on the faved status; +	// uncache the prepared version from all timelines. +	p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + +	if err := p.federate.Like(ctx, fave); err != nil { +		return gtserror.Newf("error federating like: %w", err) +	} + +	return nil +} + +func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error { +	boost, ok := cMsg.GTSModel.(*gtsmodel.Status) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) +	} + +	// Timeline and notify the boost wrapper status. +	if err := p.surface.timelineAndNotifyStatus(ctx, boost); err != nil { +		return gtserror.Newf("error timelining boost: %w", err) +	} + +	// Notify the boost target account. +	if err := p.surface.notifyAnnounce(ctx, boost); err != nil { +		return gtserror.Newf("error notifying boost: %w", err) +	} + +	// Interaction counts changed on the boosted status; +	// uncache the prepared version from all timelines. +	p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + +	if err := p.federate.Announce(ctx, boost); err != nil { +		return gtserror.Newf("error federating announce: %w", err) +	} + +	return nil +} + +func (p *clientAPI) CreateBlock(ctx context.Context, cMsg messages.FromClientAPI) error { +	block, ok := cMsg.GTSModel.(*gtsmodel.Block) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) +	} + +	// Remove blockee's statuses from blocker's timeline. +	if err := p.state.Timelines.Home.WipeItemsFromAccountID( +		ctx, +		block.AccountID, +		block.TargetAccountID, +	); err != nil { +		return gtserror.Newf("error wiping timeline items for block: %w", err) +	} + +	// Remove blocker's statuses from blockee's timeline. +	if err := p.state.Timelines.Home.WipeItemsFromAccountID( +		ctx, +		block.TargetAccountID, +		block.AccountID, +	); err != nil { +		return gtserror.Newf("error wiping timeline items for block: %w", err) +	} + +	// TODO: same with notifications? +	// TODO: same with bookmarks? + +	if err := p.federate.Block(ctx, block); err != nil { +		return gtserror.Newf("error federating block: %w", err) +	} + +	return nil +} + +func (p *clientAPI) UpdateAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +	account, ok := cMsg.GTSModel.(*gtsmodel.Account) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Account", cMsg.GTSModel) +	} + +	if err := p.federate.UpdateAccount(ctx, account); err != nil { +		return gtserror.Newf("error federating account update: %w", err) +	} + +	return nil +} + +func (p *clientAPI) UpdateReport(ctx context.Context, cMsg messages.FromClientAPI) error { +	report, ok := cMsg.GTSModel.(*gtsmodel.Report) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel) +	} + +	if report.Account.IsRemote() { +		// Report creator is a remote account, +		// we shouldn't try to email them! +		return nil +	} + +	if err := p.surface.emailReportClosed(ctx, report); err != nil { +		return gtserror.Newf("error sending report closed email: %w", err) +	} + +	return nil +} + +func (p *clientAPI) AcceptFollow(ctx context.Context, cMsg messages.FromClientAPI) error { +	follow, ok := cMsg.GTSModel.(*gtsmodel.Follow) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel) +	} + +	if err := p.surface.notifyFollow(ctx, follow); err != nil { +		return gtserror.Newf("error notifying follow: %w", err) +	} + +	if err := p.federate.AcceptFollow(ctx, follow); err != nil { +		return gtserror.Newf("error federating follow request accept: %w", err) +	} + +	return nil +} + +func (p *clientAPI) RejectFollowRequest(ctx context.Context, cMsg messages.FromClientAPI) error { +	followReq, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel) +	} + +	if err := p.federate.RejectFollow( +		ctx, +		p.tc.FollowRequestToFollow(ctx, followReq), +	); err != nil { +		return gtserror.Newf("error federating reject follow: %w", err) +	} + +	return nil +} + +func (p *clientAPI) UndoFollow(ctx context.Context, cMsg messages.FromClientAPI) error { +	follow, ok := cMsg.GTSModel.(*gtsmodel.Follow) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel) +	} + +	if err := p.federate.UndoFollow(ctx, follow); err != nil { +		return gtserror.Newf("error federating undo follow: %w", err) +	} + +	return nil +} + +func (p *clientAPI) UndoBlock(ctx context.Context, cMsg messages.FromClientAPI) error { +	block, ok := cMsg.GTSModel.(*gtsmodel.Block) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) +	} + +	if err := p.federate.UndoBlock(ctx, block); err != nil { +		return gtserror.Newf("error federating undo block: %w", err) +	} + +	return nil +} + +func (p *clientAPI) UndoFave(ctx context.Context, cMsg messages.FromClientAPI) error { +	statusFave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) +	} + +	// Interaction counts changed on the faved status; +	// uncache the prepared version from all timelines. +	p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + +	if err := p.federate.UndoLike(ctx, statusFave); err != nil { +		return gtserror.Newf("error federating undo like: %w", err) +	} + +	return nil +} + +func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error { +	status, ok := cMsg.GTSModel.(*gtsmodel.Status) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) +	} + +	if err := p.state.DB.DeleteStatusByID(ctx, status.ID); err != nil { +		return gtserror.Newf("db error deleting status: %w", err) +	} + +	if err := p.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil { +		return gtserror.Newf("error removing status from timelines: %w", err) +	} + +	// Interaction counts changed on the boosted status; +	// uncache the prepared version from all timelines. +	p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + +	if err := p.federate.UndoAnnounce(ctx, status); err != nil { +		return gtserror.Newf("error federating undo announce: %w", err) +	} + +	return nil +} + +func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg messages.FromClientAPI) error { +	// Don't delete attachments, just unattach them: +	// this request comes from the client API and the +	// poster may want to use attachments again later. +	const deleteAttachments = false + +	status, ok := cMsg.GTSModel.(*gtsmodel.Status) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) +	} + +	// Try to populate status structs if possible, +	// in order to more thoroughly remove them. +	if err := p.state.DB.PopulateStatus( +		ctx, status, +	); err != nil && !errors.Is(err, db.ErrNoEntries) { +		return gtserror.Newf("db error populating status: %w", err) +	} + +	if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil { +		return gtserror.Newf("error wiping status: %w", err) +	} + +	if status.InReplyToID != "" { +		// Interaction counts changed on the replied status; +		// uncache the prepared version from all timelines. +		p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) +	} + +	if err := p.federate.DeleteStatus(ctx, status); err != nil { +		return gtserror.Newf("error federating status delete: %w", err) +	} + +	return nil +} + +func (p *clientAPI) DeleteAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +	// The originID of the delete, one of: +	//   - ID of a domain block, for which +	//     this account delete is a side effect. +	//   - ID of the deleted account itself (self delete). +	//   - ID of an admin account (account suspension). +	var originID string + +	if domainBlock, ok := cMsg.GTSModel.(*gtsmodel.DomainBlock); ok { +		// Origin is a domain block. +		originID = domainBlock.ID +	} else { +		// Origin is whichever account +		// originated this message. +		originID = cMsg.OriginAccount.ID +	} + +	if err := p.federate.DeleteAccount(ctx, cMsg.TargetAccount); err != nil { +		return gtserror.Newf("error federating account delete: %w", err) +	} + +	if err := p.account.Delete(ctx, cMsg.TargetAccount, originID); err != nil { +		return gtserror.Newf("error deleting account: %w", err) +	} + +	return nil +} + +func (p *clientAPI) ReportAccount(ctx context.Context, cMsg messages.FromClientAPI) error { +	report, ok := cMsg.GTSModel.(*gtsmodel.Report) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel) +	} + +	// Federate this report to the +	// remote instance if desired. +	if *report.Forwarded { +		if err := p.federate.Flag(ctx, report); err != nil { +			return gtserror.Newf("error federating report: %w", err) +		} +	} + +	if err := p.surface.emailReportOpened(ctx, report); err != nil { +		return gtserror.Newf("error sending report opened email: %w", err) +	} + +	return nil +} diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go new file mode 100644 index 000000000..6690a43db --- /dev/null +++ b/internal/processing/workers/fromclientapi_test.go @@ -0,0 +1,589 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers_test + +import ( +	"context" +	"encoding/json" +	"errors" +	"testing" +	"time" + +	"github.com/stretchr/testify/suite" +	"github.com/superseriousbusiness/gotosocial/internal/ap" +	"github.com/superseriousbusiness/gotosocial/internal/config" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/id" +	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/stream" +	"github.com/superseriousbusiness/gotosocial/internal/util" +	"github.com/superseriousbusiness/gotosocial/testrig" +) + +type FromClientAPITestSuite struct { +	WorkersTestSuite +} + +func (suite *FromClientAPITestSuite) newStatus( +	ctx context.Context, +	account *gtsmodel.Account, +	visibility gtsmodel.Visibility, +	replyToStatus *gtsmodel.Status, +	boostOfStatus *gtsmodel.Status, +) *gtsmodel.Status { +	var ( +		protocol = config.GetProtocol() +		host     = config.GetHost() +		statusID = id.NewULID() +	) + +	// Make a new status from given account. +	newStatus := >smodel.Status{ +		ID:                  statusID, +		URI:                 protocol + "://" + host + "/users/" + account.Username + "/statuses/" + statusID, +		URL:                 protocol + "://" + host + "/@" + account.Username + "/statuses/" + statusID, +		Content:             "pee pee poo poo", +		Local:               util.Ptr(true), +		AccountURI:          account.URI, +		AccountID:           account.ID, +		Visibility:          visibility, +		ActivityStreamsType: ap.ObjectNote, +		Federated:           util.Ptr(true), +		Boostable:           util.Ptr(true), +		Replyable:           util.Ptr(true), +		Likeable:            util.Ptr(true), +	} + +	if replyToStatus != nil { +		// Status is a reply. +		newStatus.InReplyToAccountID = replyToStatus.AccountID +		newStatus.InReplyToID = replyToStatus.ID +		newStatus.InReplyToURI = replyToStatus.URI + +		// Mention the replied-to account. +		mention := >smodel.Mention{ +			ID:               id.NewULID(), +			StatusID:         statusID, +			OriginAccountID:  account.ID, +			OriginAccountURI: account.URI, +			TargetAccountID:  replyToStatus.AccountID, +		} + +		if err := suite.db.PutMention(ctx, mention); err != nil { +			suite.FailNow(err.Error()) +		} +		newStatus.Mentions = []*gtsmodel.Mention{mention} +		newStatus.MentionIDs = []string{mention.ID} +	} + +	if boostOfStatus != nil { +		// Status is a boost. + +	} + +	// Put the status in the db, to mimic what would +	// have already happened earlier up the flow. +	if err := suite.db.PutStatus(ctx, newStatus); err != nil { +		suite.FailNow(err.Error()) +	} + +	return newStatus +} + +func (suite *FromClientAPITestSuite) checkStreamed( +	str *stream.Stream, +	expectMessage bool, +	expectPayload string, +	expectEventType string, +) { +	var msg *stream.Message +streamLoop: +	for { +		select { +		case msg = <-str.Messages: +			break streamLoop // Got it. +		case <-time.After(5 * time.Second): +			break streamLoop // Didn't get it. +		} +	} + +	if expectMessage && msg == nil { +		suite.FailNow("expected a message but message was nil") +	} + +	if !expectMessage && msg != nil { +		suite.FailNow("expected no message but message was not nil") +	} + +	if expectPayload != "" && msg.Payload != expectPayload { +		suite.FailNow("", "expected payload %s but payload was: %s", expectPayload, msg.Payload) +	} + +	if expectEventType != "" && msg.Event != expectEventType { +		suite.FailNow("", "expected event type %s but event type was: %s", expectEventType, msg.Event) +	} +} + +func (suite *FromClientAPITestSuite) statusJSON( +	ctx context.Context, +	status *gtsmodel.Status, +	requestingAccount *gtsmodel.Account, +) string { +	apiStatus, err := suite.typeconverter.StatusToAPIStatus( +		ctx, +		status, +		requestingAccount, +	) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	statusJSON, err := json.Marshal(apiStatus) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	return string(statusJSON) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { +	var ( +		ctx              = context.Background() +		postingAccount   = suite.testAccounts["admin_account"] +		receivingAccount = suite.testAccounts["local_account_1"] +		testList         = suite.testLists["local_account_1_list_1"] +		streams          = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) +		homeStream       = streams[stream.TimelineHome] +		listStream       = streams[stream.TimelineList+":"+testList.ID] +		notifStream      = streams[stream.TimelineNotifications] + +		// Admin account posts a new top-level status. +		status = suite.newStatus( +			ctx, +			postingAccount, +			gtsmodel.VisibilityPublic, +			nil, +			nil, +		) +		statusJSON = suite.statusJSON( +			ctx, +			status, +			receivingAccount, +		) +	) + +	// Update the follow from receiving account -> posting account so +	// that receiving account wants notifs when posting account posts. +	follow := new(gtsmodel.Follow) +	*follow = *suite.testFollows["local_account_1_admin_account"] + +	follow.Notify = util.Ptr(true) +	if err := suite.db.UpdateFollow(ctx, follow); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Process the new status. +	if err := suite.processor.Workers().ProcessFromClientAPI( +		ctx, +		messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			GTSModel:       status, +			OriginAccount:  postingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check message in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) + +	// Check message in list stream. +	suite.checkStreamed( +		listStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) + +	// Wait for a notification to appear for the status. +	var notif *gtsmodel.Notification +	if !testrig.WaitFor(func() bool { +		var err error +		notif, err = suite.db.GetNotification( +			ctx, +			gtsmodel.NotificationStatus, +			receivingAccount.ID, +			postingAccount.ID, +			status.ID, +		) +		return err == nil +	}) { +		suite.FailNow("timed out waiting for new status notification") +	} + +	apiNotif, err := suite.typeconverter.NotificationToAPINotification(ctx, notif) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	notifJSON, err := json.Marshal(apiNotif) +	if err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check message in notification stream. +	suite.checkStreamed( +		notifStream, +		true, +		string(notifJSON), +		stream.EventTypeNotification, +	) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { +	var ( +		ctx              = context.Background() +		postingAccount   = suite.testAccounts["admin_account"] +		receivingAccount = suite.testAccounts["local_account_1"] +		testList         = suite.testLists["local_account_1_list_1"] +		streams          = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) +		homeStream       = streams[stream.TimelineHome] +		listStream       = streams[stream.TimelineList+":"+testList.ID] + +		// Admin account posts a reply to turtle. +		// Since turtle is followed by zork, and +		// the default replies policy for this list +		// is to show replies to followed accounts, +		// post should also show in the list stream. +		status = suite.newStatus( +			ctx, +			postingAccount, +			gtsmodel.VisibilityPublic, +			suite.testStatuses["local_account_2_status_1"], +			nil, +		) +		statusJSON = suite.statusJSON( +			ctx, +			status, +			receivingAccount, +		) +	) + +	// Process the new status. +	if err := suite.processor.Workers().ProcessFromClientAPI( +		ctx, +		messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			GTSModel:       status, +			OriginAccount:  postingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check message in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) + +	// Check message in list stream. +	suite.checkStreamed( +		listStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyOK() { +	// We're modifying the test list so take a copy. +	testList := new(gtsmodel.List) +	*testList = *suite.testLists["local_account_1_list_1"] + +	var ( +		ctx              = context.Background() +		postingAccount   = suite.testAccounts["admin_account"] +		receivingAccount = suite.testAccounts["local_account_1"] +		streams          = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) +		homeStream       = streams[stream.TimelineHome] +		listStream       = streams[stream.TimelineList+":"+testList.ID] + +		// Admin account posts a reply to turtle. +		status = suite.newStatus( +			ctx, +			postingAccount, +			gtsmodel.VisibilityPublic, +			suite.testStatuses["local_account_2_status_1"], +			nil, +		) +		statusJSON = suite.statusJSON( +			ctx, +			status, +			receivingAccount, +		) +	) + +	// Modify replies policy of test list to show replies +	// only to other accounts in the same list. Since turtle +	// and admin are in the same list, this means the reply +	// should be shown in the list. +	testList.RepliesPolicy = gtsmodel.RepliesPolicyList +	if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Process the new status. +	if err := suite.processor.Workers().ProcessFromClientAPI( +		ctx, +		messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			GTSModel:       status, +			OriginAccount:  postingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check message in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) + +	// Check message in list stream. +	suite.checkStreamed( +		listStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyNo() { +	// We're modifying the test list so take a copy. +	testList := new(gtsmodel.List) +	*testList = *suite.testLists["local_account_1_list_1"] + +	var ( +		ctx              = context.Background() +		postingAccount   = suite.testAccounts["admin_account"] +		receivingAccount = suite.testAccounts["local_account_1"] +		streams          = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) +		homeStream       = streams[stream.TimelineHome] +		listStream       = streams[stream.TimelineList+":"+testList.ID] + +		// Admin account posts a reply to turtle. +		status = suite.newStatus( +			ctx, +			postingAccount, +			gtsmodel.VisibilityPublic, +			suite.testStatuses["local_account_2_status_1"], +			nil, +		) +		statusJSON = suite.statusJSON( +			ctx, +			status, +			receivingAccount, +		) +	) + +	// Modify replies policy of test list to show replies +	// only to other accounts in the same list. We're +	// about to remove turtle from the same list as admin, +	// so the new post should not be streamed to the list. +	testList.RepliesPolicy = gtsmodel.RepliesPolicyList +	if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Remove turtle from the list. +	if err := suite.db.DeleteListEntry(ctx, suite.testListEntries["local_account_1_list_1_entry_1"].ID); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Process the new status. +	if err := suite.processor.Workers().ProcessFromClientAPI( +		ctx, +		messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			GTSModel:       status, +			OriginAccount:  postingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check message in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) + +	// Check message NOT in list stream. +	suite.checkStreamed( +		listStream, +		false, +		"", +		"", +	) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPolicyNone() { +	// We're modifying the test list so take a copy. +	testList := new(gtsmodel.List) +	*testList = *suite.testLists["local_account_1_list_1"] + +	var ( +		ctx              = context.Background() +		postingAccount   = suite.testAccounts["admin_account"] +		receivingAccount = suite.testAccounts["local_account_1"] +		streams          = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) +		homeStream       = streams[stream.TimelineHome] +		listStream       = streams[stream.TimelineList+":"+testList.ID] + +		// Admin account posts a reply to turtle. +		status = suite.newStatus( +			ctx, +			postingAccount, +			gtsmodel.VisibilityPublic, +			suite.testStatuses["local_account_2_status_1"], +			nil, +		) +		statusJSON = suite.statusJSON( +			ctx, +			status, +			receivingAccount, +		) +	) + +	// Modify replies policy of test list. +	// Since we're modifying the list to not +	// show any replies, the post should not +	// be streamed to the list. +	testList.RepliesPolicy = gtsmodel.RepliesPolicyNone +	if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Process the new status. +	if err := suite.processor.Workers().ProcessFromClientAPI( +		ctx, +		messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityCreate, +			GTSModel:       status, +			OriginAccount:  postingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Check message in home stream. +	suite.checkStreamed( +		homeStream, +		true, +		statusJSON, +		stream.EventTypeUpdate, +	) + +	// Check message NOT in list stream. +	suite.checkStreamed( +		listStream, +		false, +		"", +		"", +	) +} + +func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { +	var ( +		ctx                  = context.Background() +		deletingAccount      = suite.testAccounts["local_account_1"] +		receivingAccount     = suite.testAccounts["local_account_2"] +		deletedStatus        = suite.testStatuses["local_account_1_status_1"] +		boostOfDeletedStatus = suite.testStatuses["admin_account_status_4"] +		streams              = suite.openStreams(ctx, receivingAccount, nil) +		homeStream           = streams[stream.TimelineHome] +	) + +	// Delete the status from the db first, to mimic what +	// would have already happened earlier up the flow +	if err := suite.db.DeleteStatusByID(ctx, deletedStatus.ID); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Process the status delete. +	if err := suite.processor.Workers().ProcessFromClientAPI( +		ctx, +		messages.FromClientAPI{ +			APObjectType:   ap.ObjectNote, +			APActivityType: ap.ActivityDelete, +			GTSModel:       deletedStatus, +			OriginAccount:  deletingAccount, +		}, +	); err != nil { +		suite.FailNow(err.Error()) +	} + +	// Stream should have the delete +	// of admin's boost in it now. +	suite.checkStreamed( +		homeStream, +		true, +		boostOfDeletedStatus.ID, +		stream.EventTypeDelete, +	) + +	// Stream should also have the delete +	// of the message itself in it. +	suite.checkStreamed( +		homeStream, +		true, +		deletedStatus.ID, +		stream.EventTypeDelete, +	) + +	// Boost should no longer be in the database. +	if !testrig.WaitFor(func() bool { +		_, err := suite.db.GetStatusByID(ctx, boostOfDeletedStatus.ID) +		return errors.Is(err, db.ErrNoEntries) +	}) { +		suite.FailNow("timed out waiting for status delete") +	} +} + +func TestFromClientAPITestSuite(t *testing.T) { +	suite.Run(t, &FromClientAPITestSuite{}) +} diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go new file mode 100644 index 000000000..5fbb0066b --- /dev/null +++ b/internal/processing/workers/fromfediapi.go @@ -0,0 +1,540 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"context" +	"net/url" + +	"codeberg.org/gruf/go-kv" +	"codeberg.org/gruf/go-logger/v2/level" +	"github.com/superseriousbusiness/gotosocial/internal/ap" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/id" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/processing/account" +	"github.com/superseriousbusiness/gotosocial/internal/state" +) + +// fediAPI wraps processing functions +// specifically for messages originating +// from the federation/ActivityPub API. +type fediAPI struct { +	state      *state.State +	surface    *surface +	federate   *federate +	wipeStatus wipeStatus +	account    *account.Processor +} + +func (p *Processor) EnqueueFediAPI(ctx context.Context, msgs ...messages.FromFediAPI) { +	log.Trace(ctx, "enqueuing") +	_ = p.workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { +		for _, msg := range msgs { +			log.Trace(ctx, "processing: %+v", msg) +			if err := p.ProcessFromFediAPI(ctx, msg); err != nil { +				log.Errorf(ctx, "error processing fedi API message: %v", err) +			} +		} +	}) +} + +func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFediAPI) error { +	// Allocate new log fields slice +	fields := make([]kv.Field, 3, 5) +	fields[0] = kv.Field{"activityType", fMsg.APActivityType} +	fields[1] = kv.Field{"objectType", fMsg.APObjectType} +	fields[2] = kv.Field{"toAccount", fMsg.ReceivingAccount.Username} + +	if fMsg.APIri != nil { +		// An IRI was supplied, append to log +		fields = append(fields, kv.Field{ +			"iri", fMsg.APIri, +		}) +	} + +	// Include GTSModel in logs if appropriate. +	if fMsg.GTSModel != nil && +		log.Level() >= level.DEBUG { +		fields = append(fields, kv.Field{ +			"model", fMsg.GTSModel, +		}) +	} + +	l := log.WithContext(ctx).WithFields(fields...) +	l.Info("processing from fedi API") + +	switch fMsg.APActivityType { + +	// CREATE SOMETHING +	case ap.ActivityCreate: +		switch fMsg.APObjectType { + +		// CREATE NOTE/STATUS +		case ap.ObjectNote: +			return p.fediAPI.CreateStatus(ctx, fMsg) + +		// CREATE FOLLOW (request) +		case ap.ActivityFollow: +			return p.fediAPI.CreateFollowReq(ctx, fMsg) + +		// CREATE LIKE/FAVE +		case ap.ActivityLike: +			return p.fediAPI.CreateLike(ctx, fMsg) + +		// CREATE ANNOUNCE/BOOST +		case ap.ActivityAnnounce: +			return p.fediAPI.CreateAnnounce(ctx, fMsg) + +		// CREATE BLOCK +		case ap.ActivityBlock: +			return p.fediAPI.CreateBlock(ctx, fMsg) + +		// CREATE FLAG/REPORT +		case ap.ActivityFlag: +			return p.fediAPI.CreateFlag(ctx, fMsg) +		} + +	// UPDATE SOMETHING +	case ap.ActivityUpdate: +		switch fMsg.APObjectType { //nolint:gocritic + +		// UPDATE PROFILE/ACCOUNT +		case ap.ObjectProfile: +			return p.fediAPI.UpdateAccount(ctx, fMsg) +		} + +	// DELETE SOMETHING +	case ap.ActivityDelete: +		switch fMsg.APObjectType { + +		// DELETE NOTE/STATUS +		case ap.ObjectNote: +			return p.fediAPI.DeleteStatus(ctx, fMsg) + +		// DELETE PROFILE/ACCOUNT +		case ap.ObjectProfile: +			return p.fediAPI.DeleteAccount(ctx, fMsg) +		} +	} + +	return nil +} + +func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error { +	var ( +		status *gtsmodel.Status +		err    error + +		// Check the federatorMsg for either an already dereferenced +		// and converted status pinned to the message, or a forwarded +		// AP IRI that we still need to deref. +		forwarded = (fMsg.GTSModel == nil) +	) + +	if forwarded { +		// Model was not set, deref with IRI. +		// This will also cause the status to be inserted into the db. +		status, err = p.statusFromAPIRI(ctx, fMsg) +	} else { +		// Model is set, ensure we have the most up-to-date model. +		status, err = p.statusFromGTSModel(ctx, fMsg) +	} + +	if err != nil { +		return gtserror.Newf("error extracting status from federatorMsg: %w", err) +	} + +	if status.Account == nil || status.Account.IsRemote() { +		// Either no account attached yet, or a remote account. +		// Both situations we need to parse account URI to fetch it. +		accountURI, err := url.Parse(status.AccountURI) +		if err != nil { +			return err +		} + +		// Ensure that account for this status has been deref'd. +		status.Account, _, err = p.federate.GetAccountByURI( +			ctx, +			fMsg.ReceivingAccount.Username, +			accountURI, +		) +		if err != nil { +			return err +		} +	} + +	// Ensure status ancestors dereferenced. We need at least the +	// immediate parent (if present) to ascertain timelineability. +	if err := p.federate.DereferenceStatusAncestors( +		ctx, +		fMsg.ReceivingAccount.Username, +		status, +	); err != nil { +		return err +	} + +	if status.InReplyToID != "" { +		// Interaction counts changed on the replied status; +		// uncache the prepared version from all timelines. +		p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) +	} + +	if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { +		return gtserror.Newf("error timelining status: %w", err) +	} + +	return nil +} + +func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { +	// There should be a status pinned to the message: +	// we've already checked to ensure this is not nil. +	status, ok := fMsg.GTSModel.(*gtsmodel.Status) +	if !ok { +		err := gtserror.New("Note was not parseable as *gtsmodel.Status") +		return nil, err +	} + +	// AP statusable representation may have also +	// been set on message (no problem if not). +	statusable, _ := fMsg.APObjectModel.(ap.Statusable) + +	// Call refresh on status to update +	// it (deref remote) if necessary. +	var err error +	status, _, err = p.federate.RefreshStatus( +		ctx, +		fMsg.ReceivingAccount.Username, +		status, +		statusable, +		false, // Don't force refresh. +	) +	if err != nil { +		return nil, gtserror.Newf("%w", err) +	} + +	return status, nil +} + +func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { +	// There should be a status IRI pinned to +	// the federatorMsg for us to dereference. +	if fMsg.APIri == nil { +		err := gtserror.New( +			"status was not pinned to federatorMsg, " + +				"and neither was an IRI for us to dereference", +		) +		return nil, err +	} + +	// Get the status + ensure we have +	// the most up-to-date version. +	status, _, err := p.federate.GetStatusByURI( +		ctx, +		fMsg.ReceivingAccount.Username, +		fMsg.APIri, +	) +	if err != nil { +		return nil, gtserror.Newf("%w", err) +	} + +	return status, nil +} + +func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI) error { +	followRequest, ok := fMsg.GTSModel.(*gtsmodel.FollowRequest) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", fMsg.GTSModel) +	} + +	if *followRequest.TargetAccount.Locked { +		// Account on our instance is locked: +		// just notify the follow request. +		if err := p.surface.notifyFollowRequest(ctx, followRequest); err != nil { +			return gtserror.Newf("error notifying follow request: %w", err) +		} + +		return nil +	} + +	// Account on our instance is not locked: +	// Automatically accept the follow request +	// and notify about the new follower. +	follow, err := p.state.DB.AcceptFollowRequest( +		ctx, +		followRequest.AccountID, +		followRequest.TargetAccountID, +	) +	if err != nil { +		return gtserror.Newf("error accepting follow request: %w", err) +	} + +	if err := p.federate.AcceptFollow(ctx, follow); err != nil { +		return gtserror.Newf("error federating accept follow request: %w", err) +	} + +	if err := p.surface.notifyFollow(ctx, follow); err != nil { +		return gtserror.Newf("error notifying follow: %w", err) +	} + +	return nil +} + +func (p *fediAPI) CreateLike(ctx context.Context, fMsg messages.FromFediAPI) error { +	fave, ok := fMsg.GTSModel.(*gtsmodel.StatusFave) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", fMsg.GTSModel) +	} + +	if err := p.surface.notifyFave(ctx, fave); err != nil { +		return gtserror.Newf("error notifying fave: %w", err) +	} + +	// Interaction counts changed on the faved status; +	// uncache the prepared version from all timelines. +	p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + +	return nil +} + +func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) error { +	status, ok := fMsg.GTSModel.(*gtsmodel.Status) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel) +	} + +	// Dereference status that this status boosts. +	if err := p.federate.DereferenceAnnounce( +		ctx, +		status, +		fMsg.ReceivingAccount.Username, +	); err != nil { +		return gtserror.Newf("error dereferencing announce: %w", err) +	} + +	// Generate an ID for the boost wrapper status. +	statusID, err := id.NewULIDFromTime(status.CreatedAt) +	if err != nil { +		return gtserror.Newf("error generating id: %w", err) +	} +	status.ID = statusID + +	// Store the boost wrapper status. +	if err := p.state.DB.PutStatus(ctx, status); err != nil { +		return gtserror.Newf("db error inserting status: %w", err) +	} + +	// Ensure boosted status ancestors dereferenced. We need at least +	// the immediate parent (if present) to ascertain timelineability. +	if err := p.federate.DereferenceStatusAncestors(ctx, +		fMsg.ReceivingAccount.Username, +		status.BoostOf, +	); err != nil { +		return err +	} + +	// Timeline and notify the announce. +	if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { +		return gtserror.Newf("error timelining status: %w", err) +	} + +	if err := p.surface.notifyAnnounce(ctx, status); err != nil { +		return gtserror.Newf("error notifying status: %w", err) +	} + +	// Interaction counts changed on the boosted status; +	// uncache the prepared version from all timelines. +	p.surface.invalidateStatusFromTimelines(ctx, status.ID) + +	return nil +} + +func (p *fediAPI) CreateBlock(ctx context.Context, fMsg messages.FromFediAPI) error { +	block, ok := fMsg.GTSModel.(*gtsmodel.Block) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) +	} + +	// Remove each account's posts from the other's timelines. +	// +	// First home timelines. +	if err := p.state.Timelines.Home.WipeItemsFromAccountID( +		ctx, +		block.AccountID, +		block.TargetAccountID, +	); err != nil { +		return gtserror.Newf("%w", err) +	} + +	if err := p.state.Timelines.Home.WipeItemsFromAccountID( +		ctx, +		block.TargetAccountID, +		block.AccountID, +	); err != nil { +		return gtserror.Newf("%w", err) +	} + +	// Now list timelines. +	if err := p.state.Timelines.List.WipeItemsFromAccountID( +		ctx, +		block.AccountID, +		block.TargetAccountID, +	); err != nil { +		return gtserror.Newf("%w", err) +	} + +	if err := p.state.Timelines.List.WipeItemsFromAccountID( +		ctx, +		block.TargetAccountID, +		block.AccountID, +	); err != nil { +		return gtserror.Newf("%w", err) +	} + +	// Remove any follows that existed between blocker + blockee. +	if err := p.state.DB.DeleteFollow( +		ctx, +		block.AccountID, +		block.TargetAccountID, +	); err != nil { +		return gtserror.Newf( +			"db error deleting follow from %s targeting %s: %w", +			block.AccountID, block.TargetAccountID, err, +		) +	} + +	if err := p.state.DB.DeleteFollow( +		ctx, +		block.TargetAccountID, +		block.AccountID, +	); err != nil { +		return gtserror.Newf( +			"db error deleting follow from %s targeting %s: %w", +			block.TargetAccountID, block.AccountID, err, +		) +	} + +	// Remove any follow requests that existed between blocker + blockee. +	if err := p.state.DB.DeleteFollowRequest( +		ctx, +		block.AccountID, +		block.TargetAccountID, +	); err != nil { +		return gtserror.Newf( +			"db error deleting follow request from %s targeting %s: %w", +			block.AccountID, block.TargetAccountID, err, +		) +	} + +	if err := p.state.DB.DeleteFollowRequest( +		ctx, +		block.TargetAccountID, +		block.AccountID, +	); err != nil { +		return gtserror.Newf( +			"db error deleting follow request from %s targeting %s: %w", +			block.TargetAccountID, block.AccountID, err, +		) +	} + +	return nil +} + +func (p *fediAPI) CreateFlag(ctx context.Context, fMsg messages.FromFediAPI) error { +	incomingReport, ok := fMsg.GTSModel.(*gtsmodel.Report) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Report", fMsg.GTSModel) +	} + +	// TODO: handle additional side effects of flag creation: +	// - notify admins by dm / notification + +	if err := p.surface.emailReportOpened(ctx, incomingReport); err != nil { +		return gtserror.Newf("error sending report opened email: %w", err) +	} + +	return nil +} + +func (p *fediAPI) UpdateAccount(ctx context.Context, fMsg messages.FromFediAPI) error { +	// Parse the old/existing account model. +	account, ok := fMsg.GTSModel.(*gtsmodel.Account) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Account", fMsg.GTSModel) +	} + +	// Because this was an Update, the new Accountable should be set on the message. +	apubAcc, ok := fMsg.APObjectModel.(ap.Accountable) +	if !ok { +		return gtserror.Newf("%T not parseable as ap.Accountable", fMsg.APObjectModel) +	} + +	// Fetch up-to-date bio, avatar, header, etc. +	_, _, err := p.federate.RefreshAccount( +		ctx, +		fMsg.ReceivingAccount.Username, +		account, +		apubAcc, +		true, // Force refresh. +	) +	if err != nil { +		return gtserror.Newf("error refreshing updated account: %w", err) +	} + +	return nil +} + +func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg messages.FromFediAPI) error { +	// Delete attachments from this status, since this request +	// comes from the federating API, and there's no way the +	// poster can do a delete + redraft for it on our instance. +	const deleteAttachments = true + +	status, ok := fMsg.GTSModel.(*gtsmodel.Status) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel) +	} + +	if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil { +		return gtserror.Newf("error wiping status: %w", err) +	} + +	if status.InReplyToID != "" { +		// Interaction counts changed on the replied status; +		// uncache the prepared version from all timelines. +		p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) +	} + +	return nil +} + +func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg messages.FromFediAPI) error { +	account, ok := fMsg.GTSModel.(*gtsmodel.Account) +	if !ok { +		return gtserror.Newf("%T not parseable as *gtsmodel.Account", fMsg.GTSModel) +	} + +	if err := p.account.Delete(ctx, account, account.ID); err != nil { +		return gtserror.Newf("error deleting account: %w", err) +	} + +	return nil +} diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go new file mode 100644 index 000000000..f8e3941fc --- /dev/null +++ b/internal/processing/workers/fromfediapi_test.go @@ -0,0 +1,565 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers_test + +import ( +	"context" +	"encoding/json" +	"fmt" +	"testing" +	"time" + +	"github.com/stretchr/testify/suite" +	"github.com/superseriousbusiness/gotosocial/internal/ap" +	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/id" +	"github.com/superseriousbusiness/gotosocial/internal/messages" +	"github.com/superseriousbusiness/gotosocial/internal/stream" +	"github.com/superseriousbusiness/gotosocial/internal/util" +	"github.com/superseriousbusiness/gotosocial/testrig" +) + +type FromFediAPITestSuite struct { +	WorkersTestSuite +} + +// remote_account_1 boosts the first status of local_account_1 +func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() { +	boostedStatus := suite.testStatuses["local_account_1_status_1"] +	boostingAccount := suite.testAccounts["remote_account_1"] +	announceStatus := >smodel.Status{} +	announceStatus.URI = "https://example.org/some-announce-uri" +	announceStatus.BoostOf = >smodel.Status{ +		URI: boostedStatus.URI, +	} +	announceStatus.CreatedAt = time.Now() +	announceStatus.UpdatedAt = time.Now() +	announceStatus.AccountID = boostingAccount.ID +	announceStatus.AccountURI = boostingAccount.URI +	announceStatus.Account = boostingAccount +	announceStatus.Visibility = boostedStatus.Visibility + +	err := suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ +		APObjectType:     ap.ActivityAnnounce, +		APActivityType:   ap.ActivityCreate, +		GTSModel:         announceStatus, +		ReceivingAccount: suite.testAccounts["local_account_1"], +	}) +	suite.NoError(err) + +	// side effects should be triggered +	// 1. status should have an ID, and be in the database +	suite.NotEmpty(announceStatus.ID) +	_, err = suite.db.GetStatusByID(context.Background(), announceStatus.ID) +	suite.NoError(err) + +	// 2. a notification should exist for the announce +	where := []db.Where{ +		{ +			Key:   "status_id", +			Value: announceStatus.ID, +		}, +	} +	notif := >smodel.Notification{} +	err = suite.db.GetWhere(context.Background(), where, notif) +	suite.NoError(err) +	suite.Equal(gtsmodel.NotificationReblog, notif.NotificationType) +	suite.Equal(boostedStatus.AccountID, notif.TargetAccountID) +	suite.Equal(announceStatus.AccountID, notif.OriginAccountID) +	suite.Equal(announceStatus.ID, notif.StatusID) +	suite.False(*notif.Read) +} + +func (suite *FromFediAPITestSuite) TestProcessReplyMention() { +	repliedAccount := suite.testAccounts["local_account_1"] +	repliedStatus := suite.testStatuses["local_account_1_status_1"] +	replyingAccount := suite.testAccounts["remote_account_1"] + +	replyingStatus := >smodel.Status{ +		CreatedAt: time.Now(), +		UpdatedAt: time.Now(), +		URI:       "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552", +		URL:       "http://fossbros-anonymous.io/@foss_satan/106221634728637552", +		Content:   `<p><span class="h-card"><a href="http://localhost:8080/@the_mighty_zork" class="u-url mention">@<span>the_mighty_zork</span></a></span> nice there it is:</p><p><a href="http://localhost:8080/users/the_mighty_zork/statuses/01F8MHAMCHF6Y650WCRSCP4WMY/activity" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://</span><span class="ellipsis">social.pixie.town/users/f0x/st</span><span class="invisible">atuses/106221628567855262/activity</span></a></p>`, +		Mentions: []*gtsmodel.Mention{ +			{ +				TargetAccountURI: repliedAccount.URI, +				NameString:       "@the_mighty_zork@localhost:8080", +			}, +		}, +		AccountID:           replyingAccount.ID, +		AccountURI:          replyingAccount.URI, +		InReplyToID:         repliedStatus.ID, +		InReplyToURI:        repliedStatus.URI, +		InReplyToAccountID:  repliedAccount.ID, +		Visibility:          gtsmodel.VisibilityUnlocked, +		ActivityStreamsType: ap.ObjectNote, +		Federated:           util.Ptr(true), +		Boostable:           util.Ptr(true), +		Replyable:           util.Ptr(true), +		Likeable:            util.Ptr(false), +	} + +	wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), repliedAccount, stream.TimelineHome) +	suite.NoError(errWithCode) + +	// id the status based on the time it was created +	statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt) +	suite.NoError(err) +	replyingStatus.ID = statusID + +	err = suite.db.PutStatus(context.Background(), replyingStatus) +	suite.NoError(err) + +	err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ +		APObjectType:     ap.ObjectNote, +		APActivityType:   ap.ActivityCreate, +		GTSModel:         replyingStatus, +		ReceivingAccount: suite.testAccounts["local_account_1"], +	}) +	suite.NoError(err) + +	// side effects should be triggered +	// 1. status should be in the database +	suite.NotEmpty(replyingStatus.ID) +	_, err = suite.db.GetStatusByID(context.Background(), replyingStatus.ID) +	suite.NoError(err) + +	// 2. a notification should exist for the mention +	var notif gtsmodel.Notification +	err = suite.db.GetWhere(context.Background(), []db.Where{ +		{Key: "status_id", Value: replyingStatus.ID}, +	}, ¬if) +	suite.NoError(err) +	suite.Equal(gtsmodel.NotificationMention, notif.NotificationType) +	suite.Equal(replyingStatus.InReplyToAccountID, notif.TargetAccountID) +	suite.Equal(replyingStatus.AccountID, notif.OriginAccountID) +	suite.Equal(replyingStatus.ID, notif.StatusID) +	suite.False(*notif.Read) + +	// the notification should be streamed +	var msg *stream.Message +	select { +	case msg = <-wssStream.Messages: +		// fine +	case <-time.After(5 * time.Second): +		suite.FailNow("no message from wssStream") +	} + +	suite.Equal(stream.EventTypeNotification, msg.Event) +	suite.NotEmpty(msg.Payload) +	suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) +	notifStreamed := &apimodel.Notification{} +	err = json.Unmarshal([]byte(msg.Payload), notifStreamed) +	suite.NoError(err) +	suite.Equal("mention", notifStreamed.Type) +	suite.Equal(replyingAccount.ID, notifStreamed.Account.ID) +} + +func (suite *FromFediAPITestSuite) TestProcessFave() { +	favedAccount := suite.testAccounts["local_account_1"] +	favedStatus := suite.testStatuses["local_account_1_status_1"] +	favingAccount := suite.testAccounts["remote_account_1"] + +	wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), favedAccount, stream.TimelineNotifications) +	suite.NoError(errWithCode) + +	fave := >smodel.StatusFave{ +		ID:              "01FGKJPXFTVQPG9YSSZ95ADS7Q", +		CreatedAt:       time.Now(), +		UpdatedAt:       time.Now(), +		AccountID:       favingAccount.ID, +		Account:         favingAccount, +		TargetAccountID: favedAccount.ID, +		TargetAccount:   favedAccount, +		StatusID:        favedStatus.ID, +		Status:          favedStatus, +		URI:             favingAccount.URI + "/faves/aaaaaaaaaaaa", +	} + +	err := suite.db.Put(context.Background(), fave) +	suite.NoError(err) + +	err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ +		APObjectType:     ap.ActivityLike, +		APActivityType:   ap.ActivityCreate, +		GTSModel:         fave, +		ReceivingAccount: favedAccount, +	}) +	suite.NoError(err) + +	// side effects should be triggered +	// 1. a notification should exist for the fave +	where := []db.Where{ +		{ +			Key:   "status_id", +			Value: favedStatus.ID, +		}, +		{ +			Key:   "origin_account_id", +			Value: favingAccount.ID, +		}, +	} + +	notif := >smodel.Notification{} +	err = suite.db.GetWhere(context.Background(), where, notif) +	suite.NoError(err) +	suite.Equal(gtsmodel.NotificationFave, notif.NotificationType) +	suite.Equal(fave.TargetAccountID, notif.TargetAccountID) +	suite.Equal(fave.AccountID, notif.OriginAccountID) +	suite.Equal(fave.StatusID, notif.StatusID) +	suite.False(*notif.Read) + +	// 2. a notification should be streamed +	var msg *stream.Message +	select { +	case msg = <-wssStream.Messages: +		// fine +	case <-time.After(5 * time.Second): +		suite.FailNow("no message from wssStream") +	} +	suite.Equal(stream.EventTypeNotification, msg.Event) +	suite.NotEmpty(msg.Payload) +	suite.EqualValues([]string{stream.TimelineNotifications}, msg.Stream) +} + +// TestProcessFaveWithDifferentReceivingAccount ensures that when an account receives a fave that's for +// another account in their AP inbox, a notification isn't streamed to the receiving account. +// +// This tests for an issue we were seeing where Misskey sends out faves to inboxes of people that don't own +// the fave, but just follow the actor who received the fave. +func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount() { +	receivingAccount := suite.testAccounts["local_account_2"] +	favedAccount := suite.testAccounts["local_account_1"] +	favedStatus := suite.testStatuses["local_account_1_status_1"] +	favingAccount := suite.testAccounts["remote_account_1"] + +	wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), receivingAccount, stream.TimelineHome) +	suite.NoError(errWithCode) + +	fave := >smodel.StatusFave{ +		ID:              "01FGKJPXFTVQPG9YSSZ95ADS7Q", +		CreatedAt:       time.Now(), +		UpdatedAt:       time.Now(), +		AccountID:       favingAccount.ID, +		Account:         favingAccount, +		TargetAccountID: favedAccount.ID, +		TargetAccount:   favedAccount, +		StatusID:        favedStatus.ID, +		Status:          favedStatus, +		URI:             favingAccount.URI + "/faves/aaaaaaaaaaaa", +	} + +	err := suite.db.Put(context.Background(), fave) +	suite.NoError(err) + +	err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ +		APObjectType:     ap.ActivityLike, +		APActivityType:   ap.ActivityCreate, +		GTSModel:         fave, +		ReceivingAccount: receivingAccount, +	}) +	suite.NoError(err) + +	// side effects should be triggered +	// 1. a notification should exist for the fave +	where := []db.Where{ +		{ +			Key:   "status_id", +			Value: favedStatus.ID, +		}, +		{ +			Key:   "origin_account_id", +			Value: favingAccount.ID, +		}, +	} + +	notif := >smodel.Notification{} +	err = suite.db.GetWhere(context.Background(), where, notif) +	suite.NoError(err) +	suite.Equal(gtsmodel.NotificationFave, notif.NotificationType) +	suite.Equal(fave.TargetAccountID, notif.TargetAccountID) +	suite.Equal(fave.AccountID, notif.OriginAccountID) +	suite.Equal(fave.StatusID, notif.StatusID) +	suite.False(*notif.Read) + +	// 2. no notification should be streamed to the account that received the fave message, because they weren't the target +	suite.Empty(wssStream.Messages) +} + +func (suite *FromFediAPITestSuite) TestProcessAccountDelete() { +	ctx := context.Background() + +	deletedAccount := suite.testAccounts["remote_account_1"] +	receivingAccount := suite.testAccounts["local_account_1"] + +	// before doing the delete.... +	// make local_account_1 and remote_account_1 into mufos +	zorkFollowSatan := >smodel.Follow{ +		ID:              "01FGRY72ASHBSET64353DPHK9T", +		CreatedAt:       time.Now().Add(-1 * time.Hour), +		UpdatedAt:       time.Now().Add(-1 * time.Hour), +		AccountID:       deletedAccount.ID, +		TargetAccountID: receivingAccount.ID, +		ShowReblogs:     util.Ptr(true), +		URI:             fmt.Sprintf("%s/follows/01FGRY72ASHBSET64353DPHK9T", deletedAccount.URI), +		Notify:          util.Ptr(false), +	} +	err := suite.db.Put(ctx, zorkFollowSatan) +	suite.NoError(err) + +	satanFollowZork := >smodel.Follow{ +		ID:              "01FGRYAVAWWPP926J175QGM0WV", +		CreatedAt:       time.Now().Add(-1 * time.Hour), +		UpdatedAt:       time.Now().Add(-1 * time.Hour), +		AccountID:       receivingAccount.ID, +		TargetAccountID: deletedAccount.ID, +		ShowReblogs:     util.Ptr(true), +		URI:             fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", receivingAccount.URI), +		Notify:          util.Ptr(false), +	} +	err = suite.db.Put(ctx, satanFollowZork) +	suite.NoError(err) + +	// now they are mufos! +	err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ +		APObjectType:     ap.ObjectProfile, +		APActivityType:   ap.ActivityDelete, +		GTSModel:         deletedAccount, +		ReceivingAccount: receivingAccount, +	}) +	suite.NoError(err) + +	// local account 2 blocked foss_satan, that block should be gone now +	testBlock := suite.testBlocks["local_account_2_block_remote_account_1"] +	dbBlock := >smodel.Block{} +	err = suite.db.GetByID(ctx, testBlock.ID, dbBlock) +	suite.ErrorIs(err, db.ErrNoEntries) + +	// the mufos should be gone now too +	satanFollowsZork, err := suite.db.IsFollowing(ctx, deletedAccount.ID, receivingAccount.ID) +	suite.NoError(err) +	suite.False(satanFollowsZork) +	zorkFollowsSatan, err := suite.db.IsFollowing(ctx, receivingAccount.ID, deletedAccount.ID) +	suite.NoError(err) +	suite.False(zorkFollowsSatan) + +	// no statuses from foss satan should be left in the database +	if !testrig.WaitFor(func() bool { +		s, err := suite.db.GetAccountStatuses(ctx, deletedAccount.ID, 0, false, false, "", "", false, false) +		return s == nil && err == db.ErrNoEntries +	}) { +		suite.FailNow("timeout waiting for statuses to be deleted") +	} + +	dbAccount, err := suite.db.GetAccountByID(ctx, deletedAccount.ID) +	suite.NoError(err) + +	suite.Empty(dbAccount.Note) +	suite.Empty(dbAccount.DisplayName) +	suite.Empty(dbAccount.AvatarMediaAttachmentID) +	suite.Empty(dbAccount.AvatarRemoteURL) +	suite.Empty(dbAccount.HeaderMediaAttachmentID) +	suite.Empty(dbAccount.HeaderRemoteURL) +	suite.Empty(dbAccount.Reason) +	suite.Empty(dbAccount.Fields) +	suite.True(*dbAccount.HideCollections) +	suite.False(*dbAccount.Discoverable) +	suite.WithinDuration(time.Now(), dbAccount.SuspendedAt, 30*time.Second) +	suite.Equal(dbAccount.ID, dbAccount.SuspensionOrigin) +} + +func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() { +	ctx := context.Background() + +	originAccount := suite.testAccounts["remote_account_1"] + +	// target is a locked account +	targetAccount := suite.testAccounts["local_account_2"] + +	wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), targetAccount, stream.TimelineHome) +	suite.NoError(errWithCode) + +	// put the follow request in the database as though it had passed through the federating db already +	satanFollowRequestTurtle := >smodel.FollowRequest{ +		ID:              "01FGRYAVAWWPP926J175QGM0WV", +		CreatedAt:       time.Now(), +		UpdatedAt:       time.Now(), +		AccountID:       originAccount.ID, +		Account:         originAccount, +		TargetAccountID: targetAccount.ID, +		TargetAccount:   targetAccount, +		ShowReblogs:     util.Ptr(true), +		URI:             fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", originAccount.URI), +		Notify:          util.Ptr(false), +	} + +	err := suite.db.Put(ctx, satanFollowRequestTurtle) +	suite.NoError(err) + +	err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ +		APObjectType:     ap.ActivityFollow, +		APActivityType:   ap.ActivityCreate, +		GTSModel:         satanFollowRequestTurtle, +		ReceivingAccount: targetAccount, +	}) +	suite.NoError(err) + +	// a notification should be streamed +	var msg *stream.Message +	select { +	case msg = <-wssStream.Messages: +		// fine +	case <-time.After(5 * time.Second): +		suite.FailNow("no message from wssStream") +	} +	suite.Equal(stream.EventTypeNotification, msg.Event) +	suite.NotEmpty(msg.Payload) +	suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) +	notif := &apimodel.Notification{} +	err = json.Unmarshal([]byte(msg.Payload), notif) +	suite.NoError(err) +	suite.Equal("follow_request", notif.Type) +	suite.Equal(originAccount.ID, notif.Account.ID) + +	// no messages should have been sent out, since we didn't need to federate an accept +	suite.Empty(suite.httpClient.SentMessages) +} + +func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() { +	ctx := context.Background() + +	originAccount := suite.testAccounts["remote_account_1"] + +	// target is an unlocked account +	targetAccount := suite.testAccounts["local_account_1"] + +	wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), targetAccount, stream.TimelineHome) +	suite.NoError(errWithCode) + +	// put the follow request in the database as though it had passed through the federating db already +	satanFollowRequestTurtle := >smodel.FollowRequest{ +		ID:              "01FGRYAVAWWPP926J175QGM0WV", +		CreatedAt:       time.Now(), +		UpdatedAt:       time.Now(), +		AccountID:       originAccount.ID, +		Account:         originAccount, +		TargetAccountID: targetAccount.ID, +		TargetAccount:   targetAccount, +		ShowReblogs:     util.Ptr(true), +		URI:             fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", originAccount.URI), +		Notify:          util.Ptr(false), +	} + +	err := suite.db.Put(ctx, satanFollowRequestTurtle) +	suite.NoError(err) + +	err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ +		APObjectType:     ap.ActivityFollow, +		APActivityType:   ap.ActivityCreate, +		GTSModel:         satanFollowRequestTurtle, +		ReceivingAccount: targetAccount, +	}) +	suite.NoError(err) + +	// an accept message should be sent to satan's inbox +	var sent [][]byte +	if !testrig.WaitFor(func() bool { +		sentI, ok := suite.httpClient.SentMessages.Load(*originAccount.SharedInboxURI) +		if ok { +			sent, ok = sentI.([][]byte) +			if !ok { +				panic("SentMessages entry was not []byte") +			} +			return true +		} +		return false +	}) { +		suite.FailNow("timed out waiting for message") +	} + +	accept := &struct { +		Actor  string `json:"actor"` +		ID     string `json:"id"` +		Object struct { +			Actor  string `json:"actor"` +			ID     string `json:"id"` +			Object string `json:"object"` +			To     string `json:"to"` +			Type   string `json:"type"` +		} +		To   string `json:"to"` +		Type string `json:"type"` +	}{} +	err = json.Unmarshal(sent[0], accept) +	suite.NoError(err) + +	suite.Equal(targetAccount.URI, accept.Actor) +	suite.Equal(originAccount.URI, accept.Object.Actor) +	suite.Equal(satanFollowRequestTurtle.URI, accept.Object.ID) +	suite.Equal(targetAccount.URI, accept.Object.Object) +	suite.Equal(targetAccount.URI, accept.Object.To) +	suite.Equal("Follow", accept.Object.Type) +	suite.Equal(originAccount.URI, accept.To) +	suite.Equal("Accept", accept.Type) + +	// a notification should be streamed +	var msg *stream.Message +	select { +	case msg = <-wssStream.Messages: +		// fine +	case <-time.After(5 * time.Second): +		suite.FailNow("no message from wssStream") +	} +	suite.Equal(stream.EventTypeNotification, msg.Event) +	suite.NotEmpty(msg.Payload) +	suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) +	notif := &apimodel.Notification{} +	err = json.Unmarshal([]byte(msg.Payload), notif) +	suite.NoError(err) +	suite.Equal("follow", notif.Type) +	suite.Equal(originAccount.ID, notif.Account.ID) +} + +// TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor. +func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() { +	ctx := context.Background() + +	receivingAccount := suite.testAccounts["local_account_1"] +	statusCreator := suite.testAccounts["remote_account_2"] + +	err := suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ +		APObjectType:     ap.ObjectNote, +		APActivityType:   ap.ActivityCreate, +		GTSModel:         nil, // gtsmodel is nil because this is a forwarded status -- we want to dereference it using the iri +		ReceivingAccount: receivingAccount, +		APIri:            testrig.URLMustParse("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1"), +	}) +	suite.NoError(err) + +	// status should now be in the database, attributed to remote_account_2 +	s, err := suite.db.GetStatusByURI(context.Background(), "http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1") +	suite.NoError(err) +	suite.Equal(statusCreator.URI, s.AccountURI) +} + +func TestFromFederatorTestSuite(t *testing.T) { +	suite.Run(t, &FromFediAPITestSuite{}) +} diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go new file mode 100644 index 000000000..a3cf9a3e1 --- /dev/null +++ b/internal/processing/workers/surface.go @@ -0,0 +1,40 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"github.com/superseriousbusiness/gotosocial/internal/email" +	"github.com/superseriousbusiness/gotosocial/internal/processing/stream" +	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +// surface wraps functions for 'surfacing' the result +// of processing a message, eg: +//   - timelining a status +//   - removing a status from timelines +//   - sending a notification to a user +//   - sending an email +type surface struct { +	state       *state.State +	tc          typeutils.TypeConverter +	stream      *stream.Processor +	filter      *visibility.Filter +	emailSender email.Sender +} diff --git a/internal/processing/workers/surfaceemail.go b/internal/processing/workers/surfaceemail.go new file mode 100644 index 000000000..a6c97f48f --- /dev/null +++ b/internal/processing/workers/surfaceemail.go @@ -0,0 +1,160 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"context" +	"errors" +	"time" + +	"github.com/google/uuid" +	"github.com/superseriousbusiness/gotosocial/internal/config" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/email" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/uris" +) + +func (s *surface) emailReportOpened(ctx context.Context, report *gtsmodel.Report) error { +	instance, err := s.state.DB.GetInstance(ctx, config.GetHost()) +	if err != nil { +		return gtserror.Newf("error getting instance: %w", err) +	} + +	toAddresses, err := s.state.DB.GetInstanceModeratorAddresses(ctx) +	if err != nil { +		if errors.Is(err, db.ErrNoEntries) { +			// No registered moderator addresses. +			return nil +		} +		return gtserror.Newf("error getting instance moderator addresses: %w", err) +	} + +	if err := s.state.DB.PopulateReport(ctx, report); err != nil { +		return gtserror.Newf("error populating report: %w", err) +	} + +	reportData := email.NewReportData{ +		InstanceURL:        instance.URI, +		InstanceName:       instance.Title, +		ReportURL:          instance.URI + "/settings/admin/reports/" + report.ID, +		ReportDomain:       report.Account.Domain, +		ReportTargetDomain: report.TargetAccount.Domain, +	} + +	if err := s.emailSender.SendNewReportEmail(toAddresses, reportData); err != nil { +		return gtserror.Newf("error emailing instance moderators: %w", err) +	} + +	return nil +} + +func (s *surface) emailReportClosed(ctx context.Context, report *gtsmodel.Report) error { +	user, err := s.state.DB.GetUserByAccountID(ctx, report.Account.ID) +	if err != nil { +		return gtserror.Newf("db error getting user: %w", err) +	} + +	if user.ConfirmedAt.IsZero() || +		!*user.Approved || +		*user.Disabled || +		user.Email == "" { +		// Only email users who: +		// - are confirmed +		// - are approved +		// - are not disabled +		// - have an email address +		return nil +	} + +	instance, err := s.state.DB.GetInstance(ctx, config.GetHost()) +	if err != nil { +		return gtserror.Newf("db error getting instance: %w", err) +	} + +	if err := s.state.DB.PopulateReport(ctx, report); err != nil { +		return gtserror.Newf("error populating report: %w", err) +	} + +	reportClosedData := email.ReportClosedData{ +		Username:             report.Account.Username, +		InstanceURL:          instance.URI, +		InstanceName:         instance.Title, +		ReportTargetUsername: report.TargetAccount.Username, +		ReportTargetDomain:   report.TargetAccount.Domain, +		ActionTakenComment:   report.ActionTaken, +	} + +	return s.emailSender.SendReportClosedEmail(user.Email, reportClosedData) +} + +func (s *surface) emailPleaseConfirm(ctx context.Context, user *gtsmodel.User, username string) error { +	if user.UnconfirmedEmail == "" || +		user.UnconfirmedEmail == user.Email { +		// User has already confirmed this +		// email address; nothing to do. +		return nil +	} + +	instance, err := s.state.DB.GetInstance(ctx, config.GetHost()) +	if err != nil { +		return gtserror.Newf("db error getting instance: %w", err) +	} + +	// We need a token and a link for the +	// user to click on. We'll use a uuid +	// as our token since it's secure enough +	// for this purpose. +	var ( +		confirmToken = uuid.NewString() +		confirmLink  = uris.GenerateURIForEmailConfirm(confirmToken) +	) + +	// Assemble email contents and send the email. +	if err := s.emailSender.SendConfirmEmail( +		user.UnconfirmedEmail, +		email.ConfirmData{ +			Username:     username, +			InstanceURL:  instance.URI, +			InstanceName: instance.Title, +			ConfirmLink:  confirmLink, +		}, +	); err != nil { +		return err +	} + +	// Email sent, update the user entry +	// with the new confirmation token. +	now := time.Now() +	user.ConfirmationToken = confirmToken +	user.ConfirmationSentAt = now +	user.LastEmailedAt = now + +	if err := s.state.DB.UpdateUser( +		ctx, +		user, +		"confirmation_token", +		"confirmation_sent_at", +		"last_emailed_at", +	); err != nil { +		return gtserror.Newf("error updating user entry after email sent: %w", err) +	} + +	return nil +} diff --git a/internal/processing/workers/surfacenotify.go b/internal/processing/workers/surfacenotify.go new file mode 100644 index 000000000..00e1205e6 --- /dev/null +++ b/internal/processing/workers/surfacenotify.go @@ -0,0 +1,221 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"context" +	"errors" + +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/id" +) + +// notifyMentions notifies each targeted account in +// the given mentions that they have a new mention. +func (s *surface) notifyMentions( +	ctx context.Context, +	mentions []*gtsmodel.Mention, +) error { +	var errs = gtserror.NewMultiError(len(mentions)) + +	for _, mention := range mentions { +		if err := s.notify( +			ctx, +			gtsmodel.NotificationMention, +			mention.TargetAccountID, +			mention.OriginAccountID, +			mention.StatusID, +		); err != nil { +			errs.Append(err) +		} +	} + +	return errs.Combine() +} + +// notifyFollowRequest notifies the target of the given +// follow request that they have a new follow request. +func (s *surface) notifyFollowRequest( +	ctx context.Context, +	followRequest *gtsmodel.FollowRequest, +) error { +	return s.notify( +		ctx, +		gtsmodel.NotificationFollowRequest, +		followRequest.TargetAccountID, +		followRequest.AccountID, +		"", +	) +} + +// notifyFollow notifies the target of the given follow that +// they have a new follow. It will also remove any previous +// notification of a follow request, essentially replacing +// that notification. +func (s *surface) notifyFollow( +	ctx context.Context, +	follow *gtsmodel.Follow, +) error { +	// Check if previous follow req notif exists. +	prevNotif, err := s.state.DB.GetNotification( +		gtscontext.SetBarebones(ctx), +		gtsmodel.NotificationFollowRequest, +		follow.TargetAccountID, +		follow.AccountID, +		"", +	) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		return gtserror.Newf("db error checking for previous follow request notification: %w", err) +	} + +	if prevNotif != nil { +		// Previous notif existed, delete it. +		if err := s.state.DB.DeleteNotificationByID(ctx, prevNotif.ID); err != nil { +			return gtserror.Newf("db error removing previous follow request notification %s: %w", prevNotif.ID, err) +		} +	} + +	// Now notify the follow itself. +	return s.notify( +		ctx, +		gtsmodel.NotificationFollow, +		follow.TargetAccountID, +		follow.AccountID, +		"", +	) +} + +// notifyFave notifies the target of the given +// fave that their status has been liked/faved. +func (s *surface) notifyFave( +	ctx context.Context, +	fave *gtsmodel.StatusFave, +) error { +	if fave.TargetAccountID == fave.AccountID { +		// Self-fave, nothing to do. +		return nil +	} + +	return s.notify( +		ctx, +		gtsmodel.NotificationFave, +		fave.TargetAccountID, +		fave.AccountID, +		fave.StatusID, +	) +} + +// notifyAnnounce notifies the status boost target +// account that their status has been boosted. +func (s *surface) notifyAnnounce( +	ctx context.Context, +	status *gtsmodel.Status, +) error { +	if status.BoostOfID == "" { +		// Not a boost, nothing to do. +		return nil +	} + +	if status.BoostOfAccountID == status.AccountID { +		// Self-boost, nothing to do. +		return nil +	} + +	return s.notify( +		ctx, +		gtsmodel.NotificationReblog, +		status.BoostOfAccountID, +		status.AccountID, +		status.ID, +	) +} + +// notify creates, inserts, and streams a new +// notification to the target account if it +// doesn't yet exist with the given parameters. +// +// It filters out non-local target accounts, so +// it is safe to pass all sorts of notification +// targets into this function without filtering +// for non-local first. +// +// targetAccountID and originAccountID must be +// set, but statusID can be an empty string. +func (s *surface) notify( +	ctx context.Context, +	notificationType gtsmodel.NotificationType, +	targetAccountID string, +	originAccountID string, +	statusID string, +) error { +	targetAccount, err := s.state.DB.GetAccountByID(ctx, targetAccountID) +	if err != nil { +		return gtserror.Newf("error getting target account %s: %w", targetAccountID, err) +	} + +	if !targetAccount.IsLocal() { +		// Nothing to do. +		return nil +	} + +	// Make sure a notification doesn't +	// already exist with these params. +	if _, err := s.state.DB.GetNotification( +		gtscontext.SetBarebones(ctx), +		notificationType, +		targetAccountID, +		originAccountID, +		statusID, +	); err == nil { +		// Notification exists; +		// nothing to do. +		return nil +	} else if !errors.Is(err, db.ErrNoEntries) { +		// Real error. +		return gtserror.Newf("error checking existence of notification: %w", err) +	} + +	// Notification doesn't yet exist, so +	// we need to create + store one. +	notif := >smodel.Notification{ +		ID:               id.NewULID(), +		NotificationType: notificationType, +		TargetAccountID:  targetAccountID, +		OriginAccountID:  originAccountID, +		StatusID:         statusID, +	} + +	if err := s.state.DB.PutNotification(ctx, notif); err != nil { +		return gtserror.Newf("error putting notification in database: %w", err) +	} + +	// Stream notification to the user. +	apiNotif, err := s.tc.NotificationToAPINotification(ctx, notif) +	if err != nil { +		return gtserror.Newf("error converting notification to api representation: %w", err) +	} + +	if err := s.stream.Notify(apiNotif, targetAccount); err != nil { +		return gtserror.Newf("error streaming notification to account: %w", err) +	} + +	return nil +} diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go new file mode 100644 index 000000000..827cbe2f8 --- /dev/null +++ b/internal/processing/workers/surfacetimeline.go @@ -0,0 +1,401 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"context" +	"errors" + +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/stream" +	"github.com/superseriousbusiness/gotosocial/internal/timeline" +) + +// timelineAndNotifyStatus inserts the given status into the HOME +// and LIST timelines of accounts that follow the status author. +// +// It will also handle notifications for any mentions attached to +// the account, and notifications for any local accounts that want +// to know when this account posts. +func (s *surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error { +	// Ensure status fully populated; including account, mentions, etc. +	if err := s.state.DB.PopulateStatus(ctx, status); err != nil { +		return gtserror.Newf("error populating status with id %s: %w", status.ID, err) +	} + +	// Get all local followers of the account that posted the status. +	follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID) +	if err != nil { +		return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err) +	} + +	// If the poster is also local, add a fake entry for them +	// so they can see their own status in their timeline. +	if status.Account.IsLocal() { +		follows = append(follows, >smodel.Follow{ +			AccountID:   status.AccountID, +			Account:     status.Account, +			Notify:      func() *bool { b := false; return &b }(), // Account shouldn't notify itself. +			ShowReblogs: func() *bool { b := true; return &b }(),  // Account should show own reblogs. +		}) +	} + +	// Timeline the status for each local follower of this account. +	// This will also handle notifying any followers with notify +	// set to true on their follow. +	if err := s.timelineAndNotifyStatusForFollowers(ctx, status, follows); err != nil { +		return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err) +	} + +	// Notify each local account that's mentioned by this status. +	if err := s.notifyMentions(ctx, status.Mentions); err != nil { +		return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err) +	} + +	return nil +} + +// timelineAndNotifyStatusForFollowers iterates through the given +// slice of followers of the account that posted the given status, +// adding the status to list timelines + home timelines of each +// follower, as appropriate, and notifying each follower of the +// new status, if the status is eligible for notification. +func (s *surface) timelineAndNotifyStatusForFollowers( +	ctx context.Context, +	status *gtsmodel.Status, +	follows []*gtsmodel.Follow, +) error { +	var ( +		errs  = new(gtserror.MultiError) +		boost = status.BoostOfID != "" +		reply = status.InReplyToURI != "" +	) + +	for _, follow := range follows { +		// Do an initial rough-grained check to see if the +		// status is timelineable for this follower at all +		// based on its visibility and who it replies to etc. +		timelineable, err := s.filter.StatusHomeTimelineable( +			ctx, follow.Account, status, +		) +		if err != nil { +			errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err) +			continue +		} + +		if !timelineable { +			// Nothing to do. +			continue +		} + +		if boost && !*follow.ShowReblogs { +			// Status is a boost, but the owner of +			// this follow doesn't want to see boosts +			// from this account. We can safely skip +			// everything, then, because we also know +			// that the follow owner won't want to be +			// have the status put in any list timelines, +			// or be notified about the status either. +			continue +		} + +		// Add status to any relevant lists +		// for this follow, if applicable. +		s.listTimelineStatusForFollow( +			ctx, +			status, +			follow, +			errs, +		) + +		// Add status to home timeline for owner +		// of this follow, if applicable. +		homeTimelined, err := s.timelineStatus( +			ctx, +			s.state.Timelines.Home.IngestOne, +			follow.AccountID, // home timelines are keyed by account ID +			follow.Account, +			status, +			stream.TimelineHome, +		) +		if err != nil { +			errs.Appendf("error home timelining status: %w", err) +			continue +		} + +		if !homeTimelined { +			// If status wasn't added to home +			// timeline, we shouldn't notify it. +			continue +		} + +		if !*follow.Notify { +			// This follower doesn't have notifs +			// set for this account's new posts. +			continue +		} + +		if boost || reply { +			// Don't notify for boosts or replies. +			continue +		} + +		// If we reach here, we know: +		// +		//   - This status is hometimelineable. +		//   - This status was added to the home timeline for this follower. +		//   - This follower wants to be notified when this account posts. +		//   - This is a top-level post (not a reply or boost). +		// +		// That means we can officially notify this one. +		if err := s.notify( +			ctx, +			gtsmodel.NotificationStatus, +			follow.AccountID, +			status.AccountID, +			status.ID, +		); err != nil { +			errs.Appendf("error notifying account %s about new status: %w", follow.AccountID, err) +		} +	} + +	return errs.Combine() +} + +// listTimelineStatusForFollow puts the given status +// in any eligible lists owned by the given follower. +func (s *surface) listTimelineStatusForFollow( +	ctx context.Context, +	status *gtsmodel.Status, +	follow *gtsmodel.Follow, +	errs *gtserror.MultiError, +) { +	// To put this status in appropriate list timelines, +	// we need to get each listEntry that pertains to +	// this follow. Then, we want to iterate through all +	// those list entries, and add the status to the list +	// that the entry belongs to if it meets criteria for +	// inclusion in the list. + +	// Get every list entry that targets this follow's ID. +	listEntries, err := s.state.DB.GetListEntriesForFollowID( +		// We only need the list IDs. +		gtscontext.SetBarebones(ctx), +		follow.ID, +	) +	if err != nil && !errors.Is(err, db.ErrNoEntries) { +		errs.Appendf("error getting list entries: %w", err) +		return +	} + +	// Check eligibility for each list entry (if any). +	for _, listEntry := range listEntries { +		eligible, err := s.listEligible(ctx, listEntry, status) +		if err != nil { +			errs.Appendf("error checking list eligibility: %w", err) +			continue +		} + +		if !eligible { +			// Don't add this. +			continue +		} + +		// At this point we are certain this status +		// should be included in the timeline of the +		// list that this list entry belongs to. +		if _, err := s.timelineStatus( +			ctx, +			s.state.Timelines.List.IngestOne, +			listEntry.ListID, // list timelines are keyed by list ID +			follow.Account, +			status, +			stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list +		); err != nil { +			errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err) +			// implicit continue +		} +	} +} + +// listEligible checks if the given status is eligible +// for inclusion in the list that that the given listEntry +// belongs to, based on the replies policy of the list. +func (s *surface) listEligible( +	ctx context.Context, +	listEntry *gtsmodel.ListEntry, +	status *gtsmodel.Status, +) (bool, error) { +	if status.InReplyToURI == "" { +		// If status is not a reply, +		// then it's all gravy baby. +		return true, nil +	} + +	if status.InReplyToID == "" { +		// Status is a reply but we don't +		// have the replied-to account! +		return false, nil +	} + +	// Status is a reply to a known account. +	// We need to fetch the list that this +	// entry belongs to, in order to check +	// the list's replies policy. +	list, err := s.state.DB.GetListByID( +		ctx, listEntry.ListID, +	) +	if err != nil { +		err := gtserror.Newf("db error getting list %s: %w", listEntry.ListID, err) +		return false, err +	} + +	switch list.RepliesPolicy { +	case gtsmodel.RepliesPolicyNone: +		// This list should not show +		// replies at all, so skip it. +		return false, nil + +	case gtsmodel.RepliesPolicyList: +		// This list should show replies +		// only to other people in the list. +		// +		// Check if replied-to account is +		// also included in this list. +		includes, err := s.state.DB.ListIncludesAccount( +			ctx, +			list.ID, +			status.InReplyToAccountID, +		) + +		if err != nil { +			err := gtserror.Newf( +				"db error checking if account %s in list %s: %w", +				status.InReplyToAccountID, listEntry.ListID, err, +			) +			return false, err +		} + +		return includes, nil + +	case gtsmodel.RepliesPolicyFollowed: +		// This list should show replies +		// only to people that the list +		// owner also follows. +		// +		// Check if replied-to account is +		// followed by list owner account. +		follows, err := s.state.DB.IsFollowing( +			ctx, +			list.AccountID, +			status.InReplyToAccountID, +		) +		if err != nil { +			err := gtserror.Newf( +				"db error checking if account %s is followed by %s: %w", +				status.InReplyToAccountID, list.AccountID, err, +			) +			return false, err +		} + +		return follows, nil + +	default: +		// HUH?? +		err := gtserror.Newf( +			"reply policy '%s' not recognized on list %s", +			list.RepliesPolicy, list.ID, +		) +		return false, err +	} +} + +// timelineStatus uses the provided ingest function to put the given +// status in a timeline with the given ID, if it's timelineable. +// +// If the status was inserted into the timeline, true will be returned +// + it will also be streamed to the user using the given streamType. +func (s *surface) timelineStatus( +	ctx context.Context, +	ingest func(context.Context, string, timeline.Timelineable) (bool, error), +	timelineID string, +	account *gtsmodel.Account, +	status *gtsmodel.Status, +	streamType string, +) (bool, error) { +	// Ingest status into given timeline using provided function. +	if inserted, err := ingest(ctx, timelineID, status); err != nil { +		err = gtserror.Newf("error ingesting status %s: %w", status.ID, err) +		return false, err +	} else if !inserted { +		// Nothing more to do. +		return false, nil +	} + +	// The status was inserted so stream it to the user. +	apiStatus, err := s.tc.StatusToAPIStatus(ctx, status, account) +	if err != nil { +		err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err) +		return true, err +	} + +	if err := s.stream.Update(apiStatus, account, []string{streamType}); err != nil { +		err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err) +		return true, err +	} + +	return true, nil +} + +// deleteStatusFromTimelines completely removes the given status from all timelines. +// It will also stream deletion of the status to all open streams. +func (s *surface) deleteStatusFromTimelines(ctx context.Context, statusID string) error { +	if err := s.state.Timelines.Home.WipeItemFromAllTimelines(ctx, statusID); err != nil { +		return err +	} + +	if err := s.state.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil { +		return err +	} + +	return s.stream.Delete(statusID) +} + +// invalidateStatusFromTimelines does cache invalidation on the given status by +// unpreparing it from all timelines, forcing it to be prepared again (with updated +// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes +// both for the status itself, and for any boosts of the status. +func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID string) { +	if err := s.state.Timelines.Home.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { +		log. +			WithContext(ctx). +			WithField("statusID", statusID). +			Errorf("error unpreparing status from home timelines: %v", err) +	} + +	if err := s.state.Timelines.List.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { +		log. +			WithContext(ctx). +			WithField("statusID", statusID). +			Errorf("error unpreparing status from list timelines: %v", err) +	} +} diff --git a/internal/processing/workers/wipestatus.go b/internal/processing/workers/wipestatus.go new file mode 100644 index 000000000..0891d9e24 --- /dev/null +++ b/internal/processing/workers/wipestatus.go @@ -0,0 +1,119 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"context" + +	"github.com/superseriousbusiness/gotosocial/internal/gtscontext" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/processing/media" +	"github.com/superseriousbusiness/gotosocial/internal/state" +) + +// wipeStatus encapsulates common logic used to totally delete a status +// + all its attachments, notifications, boosts, and timeline entries. +type wipeStatus func(context.Context, *gtsmodel.Status, bool) error + +// wipeStatusF returns a wipeStatus util function. +func wipeStatusF(state *state.State, media *media.Processor, surface *surface) wipeStatus { +	return func( +		ctx context.Context, +		statusToDelete *gtsmodel.Status, +		deleteAttachments bool, +	) error { +		errs := new(gtserror.MultiError) + +		// Either delete all attachments for this status, +		// or simply unattach + clean them separately later. +		// +		// Reason to unattach rather than delete is that +		// the poster might want to reattach them to another +		// status immediately (in case of delete + redraft) +		if deleteAttachments { +			// todo:state.DB.DeleteAttachmentsForStatus +			for _, a := range statusToDelete.AttachmentIDs { +				if err := media.Delete(ctx, a); err != nil { +					errs.Appendf("error deleting media: %w", err) +				} +			} +		} else { +			// todo:state.DB.UnattachAttachmentsForStatus +			for _, a := range statusToDelete.AttachmentIDs { +				if _, err := media.Unattach(ctx, statusToDelete.Account, a); err != nil { +					errs.Appendf("error unattaching media: %w", err) +				} +			} +		} + +		// delete all mention entries generated by this status +		// todo:state.DB.DeleteMentionsForStatus +		for _, id := range statusToDelete.MentionIDs { +			if err := state.DB.DeleteMentionByID(ctx, id); err != nil { +				errs.Appendf("error deleting status mention: %w", err) +			} +		} + +		// delete all notification entries generated by this status +		if err := state.DB.DeleteNotificationsForStatus(ctx, statusToDelete.ID); err != nil { +			errs.Appendf("error deleting status notifications: %w", err) +		} + +		// delete all bookmarks that point to this status +		if err := state.DB.DeleteStatusBookmarksForStatus(ctx, statusToDelete.ID); err != nil { +			errs.Appendf("error deleting status bookmarks: %w", err) +		} + +		// delete all faves of this status +		if err := state.DB.DeleteStatusFavesForStatus(ctx, statusToDelete.ID); err != nil { +			errs.Appendf("error deleting status faves: %w", err) +		} + +		// delete all boosts for this status + remove them from timelines +		boosts, err := state.DB.GetStatusBoosts( +			// we MUST set a barebones context here, +			// as depending on where it came from the +			// original BoostOf may already be gone. +			gtscontext.SetBarebones(ctx), +			statusToDelete.ID) +		if err != nil { +			errs.Appendf("error fetching status boosts: %w", err) +		} +		for _, b := range boosts { +			if err := surface.deleteStatusFromTimelines(ctx, b.ID); err != nil { +				errs.Appendf("error deleting boost from timelines: %w", err) +			} +			if err := state.DB.DeleteStatusByID(ctx, b.ID); err != nil { +				errs.Appendf("error deleting boost: %w", err) +			} +		} + +		// delete this status from any and all timelines +		if err := surface.deleteStatusFromTimelines(ctx, statusToDelete.ID); err != nil { +			errs.Appendf("error deleting status from timelines: %w", err) +		} + +		// finally, delete the status itself +		if err := state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil { +			errs.Appendf("error deleting status: %w", err) +		} + +		return errs.Combine() +	} +} diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go new file mode 100644 index 000000000..24b18a405 --- /dev/null +++ b/internal/processing/workers/workers.go @@ -0,0 +1,92 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( +	"github.com/superseriousbusiness/gotosocial/internal/email" +	"github.com/superseriousbusiness/gotosocial/internal/federation" +	"github.com/superseriousbusiness/gotosocial/internal/processing/account" +	"github.com/superseriousbusiness/gotosocial/internal/processing/media" +	"github.com/superseriousbusiness/gotosocial/internal/processing/stream" +	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/internal/visibility" +	"github.com/superseriousbusiness/gotosocial/internal/workers" +) + +type Processor struct { +	workers   *workers.Workers +	clientAPI *clientAPI +	fediAPI   *fediAPI +} + +func New( +	state *state.State, +	federator federation.Federator, +	tc typeutils.TypeConverter, +	filter *visibility.Filter, +	emailSender email.Sender, +	account *account.Processor, +	media *media.Processor, +	stream *stream.Processor, +) Processor { +	// Init surface logic +	// wrapper struct. +	surface := &surface{ +		state:       state, +		tc:          tc, +		stream:      stream, +		filter:      filter, +		emailSender: emailSender, +	} + +	// Init federate logic +	// wrapper struct. +	federate := &federate{ +		Federator: federator, +		state:     state, +		tc:        tc, +	} + +	// Init shared logic wipe +	// status util func. +	wipeStatus := wipeStatusF( +		state, +		media, +		surface, +	) + +	return Processor{ +		workers: &state.Workers, +		clientAPI: &clientAPI{ +			state:      state, +			tc:         tc, +			surface:    surface, +			federate:   federate, +			wipeStatus: wipeStatus, +			account:    account, +		}, +		fediAPI: &fediAPI{ +			state:      state, +			surface:    surface, +			federate:   federate, +			wipeStatus: wipeStatus, +			account:    account, +		}, +	} +} diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go new file mode 100644 index 000000000..2d5a7f5d3 --- /dev/null +++ b/internal/processing/workers/workers_test.go @@ -0,0 +1,169 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. + +package workers_test + +import ( +	"context" + +	"github.com/stretchr/testify/suite" +	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/email" +	"github.com/superseriousbusiness/gotosocial/internal/federation" +	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +	"github.com/superseriousbusiness/gotosocial/internal/media" +	"github.com/superseriousbusiness/gotosocial/internal/oauth" +	"github.com/superseriousbusiness/gotosocial/internal/processing" +	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/storage" +	"github.com/superseriousbusiness/gotosocial/internal/stream" +	"github.com/superseriousbusiness/gotosocial/internal/transport" +	"github.com/superseriousbusiness/gotosocial/internal/typeutils" +	"github.com/superseriousbusiness/gotosocial/internal/visibility" +	"github.com/superseriousbusiness/gotosocial/testrig" +) + +type WorkersTestSuite struct { +	// standard suite interfaces +	suite.Suite +	db                  db.DB +	storage             *storage.Driver +	state               state.State +	mediaManager        *media.Manager +	typeconverter       typeutils.TypeConverter +	httpClient          *testrig.MockHTTPClient +	transportController transport.Controller +	federator           federation.Federator +	oauthServer         oauth.Server +	emailSender         email.Sender + +	// standard suite models +	testTokens       map[string]*gtsmodel.Token +	testClients      map[string]*gtsmodel.Client +	testApplications map[string]*gtsmodel.Application +	testUsers        map[string]*gtsmodel.User +	testAccounts     map[string]*gtsmodel.Account +	testFollows      map[string]*gtsmodel.Follow +	testAttachments  map[string]*gtsmodel.MediaAttachment +	testStatuses     map[string]*gtsmodel.Status +	testTags         map[string]*gtsmodel.Tag +	testMentions     map[string]*gtsmodel.Mention +	testAutheds      map[string]*oauth.Auth +	testBlocks       map[string]*gtsmodel.Block +	testActivities   map[string]testrig.ActivityWithSignature +	testLists        map[string]*gtsmodel.List +	testListEntries  map[string]*gtsmodel.ListEntry + +	processor *processing.Processor +} + +func (suite *WorkersTestSuite) SetupSuite() { +	suite.testTokens = testrig.NewTestTokens() +	suite.testClients = testrig.NewTestClients() +	suite.testApplications = testrig.NewTestApplications() +	suite.testUsers = testrig.NewTestUsers() +	suite.testAccounts = testrig.NewTestAccounts() +	suite.testFollows = testrig.NewTestFollows() +	suite.testAttachments = testrig.NewTestAttachments() +	suite.testStatuses = testrig.NewTestStatuses() +	suite.testTags = testrig.NewTestTags() +	suite.testMentions = testrig.NewTestMentions() +	suite.testAutheds = map[string]*oauth.Auth{ +		"local_account_1": { +			Application: suite.testApplications["local_account_1"], +			User:        suite.testUsers["local_account_1"], +			Account:     suite.testAccounts["local_account_1"], +		}, +	} +	suite.testBlocks = testrig.NewTestBlocks() +	suite.testLists = testrig.NewTestLists() +	suite.testListEntries = testrig.NewTestListEntries() +} + +func (suite *WorkersTestSuite) SetupTest() { +	suite.state.Caches.Init() +	testrig.StartWorkers(&suite.state) + +	testrig.InitTestConfig() +	testrig.InitTestLog() + +	suite.db = testrig.NewTestDB(&suite.state) +	suite.state.DB = suite.db +	suite.testActivities = testrig.NewTestActivities(suite.testAccounts) +	suite.storage = testrig.NewInMemoryStorage() +	suite.state.Storage = suite.storage +	suite.typeconverter = testrig.NewTestTypeConverter(suite.db) + +	testrig.StartTimelines( +		&suite.state, +		visibility.NewFilter(&suite.state), +		suite.typeconverter, +	) + +	suite.httpClient = testrig.NewMockHTTPClient(nil, "../../../testrig/media") +	suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople() +	suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses() + +	suite.transportController = testrig.NewTestTransportController(&suite.state, suite.httpClient) +	suite.mediaManager = testrig.NewTestMediaManager(&suite.state) +	suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) +	suite.oauthServer = testrig.NewTestOauthServer(suite.db) +	suite.emailSender = testrig.NewEmailSender("../../../web/template/", nil) + +	suite.processor = processing.NewProcessor(suite.typeconverter, suite.federator, suite.oauthServer, suite.mediaManager, &suite.state, suite.emailSender) +	suite.state.Workers.EnqueueClientAPI = suite.processor.Workers().EnqueueClientAPI +	suite.state.Workers.EnqueueFediAPI = suite.processor.Workers().EnqueueFediAPI + +	testrig.StandardDBSetup(suite.db, suite.testAccounts) +	testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") +} + +func (suite *WorkersTestSuite) TearDownTest() { +	testrig.StandardDBTeardown(suite.db) +	testrig.StandardStorageTeardown(suite.storage) +	testrig.StopWorkers(&suite.state) +} + +func (suite *WorkersTestSuite) openStreams(ctx context.Context, account *gtsmodel.Account, listIDs []string) map[string]*stream.Stream { +	streams := make(map[string]*stream.Stream) + +	for _, streamType := range []string{ +		stream.TimelineHome, +		stream.TimelinePublic, +		stream.TimelineNotifications, +	} { +		stream, err := suite.processor.Stream().Open(ctx, account, streamType) +		if err != nil { +			suite.FailNow(err.Error()) +		} + +		streams[streamType] = stream +	} + +	for _, listID := range listIDs { +		streamType := stream.TimelineList + ":" + listID + +		stream, err := suite.processor.Stream().Open(ctx, account, streamType) +		if err != nil { +			suite.FailNow(err.Error()) +		} + +		streams[streamType] = stream +	} + +	return streams +}  | 
