diff options
author | 2023-08-09 19:14:33 +0200 | |
---|---|---|
committer | 2023-08-09 19:14:33 +0200 | |
commit | 9770d54237bea828cab7e50aec7dff452c203138 (patch) | |
tree | 59c444a02e81925bab47d3656a489a8c7087d530 /internal/processing/workers | |
parent | [bugfix] Fix incorrect per-loop variable capture (#2092) (diff) | |
download | gotosocial-9770d54237bea828cab7e50aec7dff452c203138.tar.xz |
[feature] List replies policy, refactor async workers (#2087)
* Add/update some DB functions.
* move async workers into subprocessor
* rename FromFederator -> FromFediAPI
* update home timeline check to include check for current status first before moving to parent status
* change streamMap to pointer to mollify linter
* update followtoas func signature
* fix merge
* remove errant debug log
* don't use separate errs.Combine() check to wrap errs
* wrap parts of workers functionality in sub-structs
* populate report using new db funcs
* embed federator (tiny bit tidier)
* flesh out error msg, add continue(!)
* fix other error messages to be more specific
* better, nicer
* give parseURI util function a bit more util
* missing headers
* use pointers for subprocessors
Diffstat (limited to 'internal/processing/workers')
-rw-r--r-- | internal/processing/workers/federate.go | 892 | ||||
-rw-r--r-- | internal/processing/workers/fromclientapi.go | 548 | ||||
-rw-r--r-- | internal/processing/workers/fromclientapi_test.go | 589 | ||||
-rw-r--r-- | internal/processing/workers/fromfediapi.go | 540 | ||||
-rw-r--r-- | internal/processing/workers/fromfediapi_test.go | 565 | ||||
-rw-r--r-- | internal/processing/workers/surface.go | 40 | ||||
-rw-r--r-- | internal/processing/workers/surfaceemail.go | 160 | ||||
-rw-r--r-- | internal/processing/workers/surfacenotify.go | 221 | ||||
-rw-r--r-- | internal/processing/workers/surfacetimeline.go | 401 | ||||
-rw-r--r-- | internal/processing/workers/wipestatus.go | 119 | ||||
-rw-r--r-- | internal/processing/workers/workers.go | 92 | ||||
-rw-r--r-- | internal/processing/workers/workers_test.go | 169 |
12 files changed, 4336 insertions, 0 deletions
diff --git a/internal/processing/workers/federate.go b/internal/processing/workers/federate.go new file mode 100644 index 000000000..76bfc892e --- /dev/null +++ b/internal/processing/workers/federate.go @@ -0,0 +1,892 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + "net/url" + + "github.com/superseriousbusiness/activity/pub" + "github.com/superseriousbusiness/activity/streams" + "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +// federate wraps functions for federating +// something out via ActivityPub in response +// to message processing. +type federate struct { + // Embed federator to give access + // to send and retrieve functions. + federation.Federator + state *state.State + tc typeutils.TypeConverter +} + +// parseURI is a cheeky little +// shortcut to wrap parsing errors. +// +// The returned err will be prepended +// with the name of the function that +// called this function, so it can be +// returned without further wrapping. +func parseURI(s string) (*url.URL, error) { + const ( + // Provides enough calldepth to + // prepend the name of whatever + // function called *this* one, + // so that they don't have to + // wrap the error themselves. + calldepth = 3 + errFmt = "error parsing uri %s: %w" + ) + + uri, err := url.Parse(s) + if err != nil { + return nil, gtserror.NewfAt(calldepth, errFmt, s, err) + } + + return uri, err +} + +func (f *federate) DeleteAccount(ctx context.Context, account *gtsmodel.Account) error { + // Do nothing if it's not our + // account that's been deleted. + if !account.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(account.OutboxURI) + if err != nil { + return err + } + + actorIRI, err := parseURI(account.URI) + if err != nil { + return err + } + + followersIRI, err := parseURI(account.FollowersURI) + if err != nil { + return err + } + + publicIRI, err := parseURI(pub.PublicActivityPubIRI) + if err != nil { + return err + } + + // Create a new delete. + // todo: tc.AccountToASDelete + delete := streams.NewActivityStreamsDelete() + + // Set the Actor for the delete; no matter + // who actually did the delete, we should + // use the account owner for this. + deleteActor := streams.NewActivityStreamsActorProperty() + deleteActor.AppendIRI(actorIRI) + delete.SetActivityStreamsActor(deleteActor) + + // Set the account's IRI as the 'object' property. + deleteObject := streams.NewActivityStreamsObjectProperty() + deleteObject.AppendIRI(actorIRI) + delete.SetActivityStreamsObject(deleteObject) + + // Address the delete To followers. + deleteTo := streams.NewActivityStreamsToProperty() + deleteTo.AppendIRI(followersIRI) + delete.SetActivityStreamsTo(deleteTo) + + // Address the delete CC public. + deleteCC := streams.NewActivityStreamsCcProperty() + deleteCC.AppendIRI(publicIRI) + delete.SetActivityStreamsCc(deleteCC) + + // Send the Delete via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, delete, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + delete, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) CreateStatus(ctx context.Context, status *gtsmodel.Status) error { + // Do nothing if the status + // shouldn't be federated. + if !*status.Federated { + return nil + } + + // Do nothing if this + // isn't our status. + if !*status.Local { + return nil + } + + // Populate model. + if err := f.state.DB.PopulateStatus(ctx, status); err != nil { + return gtserror.Newf("error populating status: %w", err) + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(status.Account.OutboxURI) + if err != nil { + return err + } + + // Convert status to an ActivityStreams + // Note, wrapped in a Create activity. + asStatus, err := f.tc.StatusToAS(ctx, status) + if err != nil { + return gtserror.Newf("error converting status to AS: %w", err) + } + + create, err := f.tc.WrapNoteInCreate(asStatus, false) + if err != nil { + return gtserror.Newf("error wrapping status in create: %w", err) + } + + // Send the Create via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, create, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + create, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) DeleteStatus(ctx context.Context, status *gtsmodel.Status) error { + // Do nothing if the status + // shouldn't be federated. + if !*status.Federated { + return nil + } + + // Do nothing if this + // isn't our status. + if !*status.Local { + return nil + } + + // Populate model. + if err := f.state.DB.PopulateStatus(ctx, status); err != nil { + return gtserror.Newf("error populating status: %w", err) + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(status.Account.OutboxURI) + if err != nil { + return err + } + + // Wrap the status URI in a Delete activity. + delete, err := f.tc.StatusToASDelete(ctx, status) + if err != nil { + return gtserror.Newf("error creating Delete: %w", err) + } + + // Send the Delete via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, delete, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + delete, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) Follow(ctx context.Context, follow *gtsmodel.Follow) error { + // Populate model. + if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { + return gtserror.Newf("error populating follow: %w", err) + } + + // Do nothing if both accounts are local. + if follow.Account.IsLocal() && + follow.TargetAccount.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(follow.Account.OutboxURI) + if err != nil { + return err + } + + // Convert follow to ActivityStreams Follow. + asFollow, err := f.tc.FollowToAS(ctx, follow) + if err != nil { + return gtserror.Newf("error converting follow to AS: %s", err) + } + + // Send the Follow via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, asFollow, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + asFollow, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) UndoFollow(ctx context.Context, follow *gtsmodel.Follow) error { + // Populate model. + if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { + return gtserror.Newf("error populating follow: %w", err) + } + + // Do nothing if both accounts are local. + if follow.Account.IsLocal() && + follow.TargetAccount.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(follow.Account.OutboxURI) + if err != nil { + return err + } + + targetAccountIRI, err := parseURI(follow.TargetAccount.URI) + if err != nil { + return err + } + + // Recreate the ActivityStreams Follow. + asFollow, err := f.tc.FollowToAS(ctx, follow) + if err != nil { + return gtserror.Newf("error converting follow to AS: %w", err) + } + + // Create a new Undo. + // todo: tc.FollowToASUndo + undo := streams.NewActivityStreamsUndo() + + // Set the Actor for the Undo: + // same as the actor for the Follow. + undo.SetActivityStreamsActor(asFollow.GetActivityStreamsActor()) + + // Set recreated Follow as the 'object' property. + // + // For most AP implementations, it's not enough + // to just send the URI of the original Follow, + // we have to send the whole object again. + undoObject := streams.NewActivityStreamsObjectProperty() + undoObject.AppendActivityStreamsFollow(asFollow) + undo.SetActivityStreamsObject(undoObject) + + // Address the Undo To the target account. + undoTo := streams.NewActivityStreamsToProperty() + undoTo.AppendIRI(targetAccountIRI) + undo.SetActivityStreamsTo(undoTo) + + // Send the Undo via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, undo, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + undo, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) UndoLike(ctx context.Context, fave *gtsmodel.StatusFave) error { + // Populate model. + if err := f.state.DB.PopulateStatusFave(ctx, fave); err != nil { + return gtserror.Newf("error populating fave: %w", err) + } + + // Do nothing if both accounts are local. + if fave.Account.IsLocal() && + fave.TargetAccount.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(fave.Account.OutboxURI) + if err != nil { + return err + } + + targetAccountIRI, err := parseURI(fave.TargetAccount.URI) + if err != nil { + return err + } + + // Recreate the ActivityStreams Like. + like, err := f.tc.FaveToAS(ctx, fave) + if err != nil { + return gtserror.Newf("error converting fave to AS: %w", err) + } + + // Create a new Undo. + // todo: tc.FaveToASUndo + undo := streams.NewActivityStreamsUndo() + + // Set the Actor for the Undo: + // same as the actor for the Like. + undo.SetActivityStreamsActor(like.GetActivityStreamsActor()) + + // Set recreated Like as the 'object' property. + // + // For most AP implementations, it's not enough + // to just send the URI of the original Like, + // we have to send the whole object again. + undoObject := streams.NewActivityStreamsObjectProperty() + undoObject.AppendActivityStreamsLike(like) + undo.SetActivityStreamsObject(undoObject) + + // Address the Undo To the target account. + undoTo := streams.NewActivityStreamsToProperty() + undoTo.AppendIRI(targetAccountIRI) + undo.SetActivityStreamsTo(undoTo) + + // Send the Undo via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, undo, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + undo, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) UndoAnnounce(ctx context.Context, boost *gtsmodel.Status) error { + // Populate model. + if err := f.state.DB.PopulateStatus(ctx, boost); err != nil { + return gtserror.Newf("error populating status: %w", err) + } + + // Do nothing if boosting + // account isn't ours. + if !boost.Account.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(boost.Account.OutboxURI) + if err != nil { + return err + } + + // Recreate the ActivityStreams Announce. + asAnnounce, err := f.tc.BoostToAS( + ctx, + boost, + boost.Account, + boost.BoostOfAccount, + ) + if err != nil { + return gtserror.Newf("error converting boost to AS: %w", err) + } + + // Create a new Undo. + // todo: tc.AnnounceToASUndo + undo := streams.NewActivityStreamsUndo() + + // Set the Actor for the Undo: + // same as the actor for the Announce. + undo.SetActivityStreamsActor(asAnnounce.GetActivityStreamsActor()) + + // Set recreated Announce as the 'object' property. + // + // For most AP implementations, it's not enough + // to just send the URI of the original Announce, + // we have to send the whole object again. + undoObject := streams.NewActivityStreamsObjectProperty() + undoObject.AppendActivityStreamsAnnounce(asAnnounce) + undo.SetActivityStreamsObject(undoObject) + + // Address the Undo To the Announce To. + undo.SetActivityStreamsTo(asAnnounce.GetActivityStreamsTo()) + + // Address the Undo CC the Announce CC. + undo.SetActivityStreamsCc(asAnnounce.GetActivityStreamsCc()) + + // Send the Undo via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, undo, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + undo, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) AcceptFollow(ctx context.Context, follow *gtsmodel.Follow) error { + // Populate model. + if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { + return gtserror.Newf("error populating follow: %w", err) + } + + // Bail if requesting account is ours: + // we've already accepted internally and + // shouldn't send an Accept to ourselves. + if follow.Account.IsLocal() { + return nil + } + + // Bail if target account isn't ours: + // we can't Accept a follow on + // another instance's behalf. + if follow.TargetAccount.IsRemote() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(follow.TargetAccount.OutboxURI) + if err != nil { + return err + } + + acceptingAccountIRI, err := parseURI(follow.TargetAccount.URI) + if err != nil { + return err + } + + requestingAccountIRI, err := parseURI(follow.Account.URI) + if err != nil { + return err + } + + // Recreate the ActivityStreams Follow. + asFollow, err := f.tc.FollowToAS(ctx, follow) + if err != nil { + return gtserror.Newf("error converting follow to AS: %w", err) + } + + // Create a new Accept. + // todo: tc.FollowToASAccept + accept := streams.NewActivityStreamsAccept() + + // Set the requestee as Actor of the Accept. + acceptActorProp := streams.NewActivityStreamsActorProperty() + acceptActorProp.AppendIRI(acceptingAccountIRI) + accept.SetActivityStreamsActor(acceptActorProp) + + // Set recreated Follow as the 'object' property. + // + // For most AP implementations, it's not enough + // to just send the URI of the original Follow, + // we have to send the whole object again. + acceptObject := streams.NewActivityStreamsObjectProperty() + acceptObject.AppendActivityStreamsFollow(asFollow) + accept.SetActivityStreamsObject(acceptObject) + + // Address the Accept To the Follow requester. + acceptTo := streams.NewActivityStreamsToProperty() + acceptTo.AppendIRI(requestingAccountIRI) + accept.SetActivityStreamsTo(acceptTo) + + // Send the Accept via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, accept, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + accept, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) RejectFollow(ctx context.Context, follow *gtsmodel.Follow) error { + // Ensure follow populated before proceeding. + if err := f.state.DB.PopulateFollow(ctx, follow); err != nil { + return gtserror.Newf("error populating follow: %w", err) + } + + // Bail if requesting account is ours: + // we've already rejected internally and + // shouldn't send an Reject to ourselves. + if follow.Account.IsLocal() { + return nil + } + + // Bail if target account isn't ours: + // we can't Reject a follow on + // another instance's behalf. + if follow.TargetAccount.IsRemote() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(follow.TargetAccount.OutboxURI) + if err != nil { + return err + } + + rejectingAccountIRI, err := parseURI(follow.TargetAccount.URI) + if err != nil { + return err + } + + requestingAccountIRI, err := parseURI(follow.Account.URI) + if err != nil { + return err + } + + // Recreate the ActivityStreams Follow. + asFollow, err := f.tc.FollowToAS(ctx, follow) + if err != nil { + return gtserror.Newf("error converting follow to AS: %w", err) + } + + // Create a new Reject. + // todo: tc.FollowRequestToASReject + reject := streams.NewActivityStreamsReject() + + // Set the requestee as Actor of the Reject. + rejectActorProp := streams.NewActivityStreamsActorProperty() + rejectActorProp.AppendIRI(rejectingAccountIRI) + reject.SetActivityStreamsActor(rejectActorProp) + + // Set recreated Follow as the 'object' property. + // + // For most AP implementations, it's not enough + // to just send the URI of the original Follow, + // we have to send the whole object again. + rejectObject := streams.NewActivityStreamsObjectProperty() + rejectObject.AppendActivityStreamsFollow(asFollow) + reject.SetActivityStreamsObject(rejectObject) + + // Address the Reject To the Follow requester. + rejectTo := streams.NewActivityStreamsToProperty() + rejectTo.AppendIRI(requestingAccountIRI) + reject.SetActivityStreamsTo(rejectTo) + + // Send the Reject via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, reject, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + reject, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) Like(ctx context.Context, fave *gtsmodel.StatusFave) error { + // Populate model. + if err := f.state.DB.PopulateStatusFave(ctx, fave); err != nil { + return gtserror.Newf("error populating fave: %w", err) + } + + // Do nothing if both accounts are local. + if fave.Account.IsLocal() && + fave.TargetAccount.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(fave.Account.OutboxURI) + if err != nil { + return err + } + + // Create the ActivityStreams Like. + like, err := f.tc.FaveToAS(ctx, fave) + if err != nil { + return gtserror.Newf("error converting fave to AS Like: %w", err) + } + + // Send the Like via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, like, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + like, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) Announce(ctx context.Context, boost *gtsmodel.Status) error { + // Populate model. + if err := f.state.DB.PopulateStatus(ctx, boost); err != nil { + return gtserror.Newf("error populating status: %w", err) + } + + // Do nothing if boosting + // account isn't ours. + if !boost.Account.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(boost.Account.OutboxURI) + if err != nil { + return err + } + + // Create the ActivityStreams Announce. + announce, err := f.tc.BoostToAS( + ctx, + boost, + boost.Account, + boost.BoostOfAccount, + ) + if err != nil { + return gtserror.Newf("error converting boost to AS: %w", err) + } + + // Send the Announce via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, announce, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + announce, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) UpdateAccount(ctx context.Context, account *gtsmodel.Account) error { + // Populate model. + if err := f.state.DB.PopulateAccount(ctx, account); err != nil { + return gtserror.Newf("error populating account: %w", err) + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(account.OutboxURI) + if err != nil { + return err + } + + // Convert account to ActivityStreams Person. + person, err := f.tc.AccountToAS(ctx, account) + if err != nil { + return gtserror.Newf("error converting account to Person: %w", err) + } + + // Use ActivityStreams Person as Object of Update. + update, err := f.tc.WrapPersonInUpdate(person, account) + if err != nil { + return gtserror.Newf("error wrapping Person in Update: %w", err) + } + + // Send the Update via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, update, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + update, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) Block(ctx context.Context, block *gtsmodel.Block) error { + // Populate model. + if err := f.state.DB.PopulateBlock(ctx, block); err != nil { + return gtserror.Newf("error populating block: %w", err) + } + + // Do nothing if both accounts are local. + if block.Account.IsLocal() && + block.TargetAccount.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(block.Account.OutboxURI) + if err != nil { + return err + } + + // Convert block to ActivityStreams Block. + asBlock, err := f.tc.BlockToAS(ctx, block) + if err != nil { + return gtserror.Newf("error converting block to AS: %w", err) + } + + // Send the Block via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, asBlock, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + asBlock, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) UndoBlock(ctx context.Context, block *gtsmodel.Block) error { + // Populate model. + if err := f.state.DB.PopulateBlock(ctx, block); err != nil { + return gtserror.Newf("error populating block: %w", err) + } + + // Do nothing if both accounts are local. + if block.Account.IsLocal() && + block.TargetAccount.IsLocal() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(block.Account.OutboxURI) + if err != nil { + return err + } + + targetAccountIRI, err := parseURI(block.TargetAccount.URI) + if err != nil { + return err + } + + // Convert block to ActivityStreams Block. + asBlock, err := f.tc.BlockToAS(ctx, block) + if err != nil { + return gtserror.Newf("error converting block to AS: %w", err) + } + + // Create a new Undo. + // todo: tc.BlockToASUndo + undo := streams.NewActivityStreamsUndo() + + // Set the Actor for the Undo: + // same as the actor for the Block. + undo.SetActivityStreamsActor(asBlock.GetActivityStreamsActor()) + + // Set Block as the 'object' property. + // + // For most AP implementations, it's not enough + // to just send the URI of the original Block, + // we have to send the whole object again. + undoObject := streams.NewActivityStreamsObjectProperty() + undoObject.AppendActivityStreamsBlock(asBlock) + undo.SetActivityStreamsObject(undoObject) + + // Address the Undo To the target account. + undoTo := streams.NewActivityStreamsToProperty() + undoTo.AppendIRI(targetAccountIRI) + undo.SetActivityStreamsTo(undoTo) + + // Send the Undo via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, undo, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + undo, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) Flag(ctx context.Context, report *gtsmodel.Report) error { + // Populate model. + if err := f.state.DB.PopulateReport(ctx, report); err != nil { + return gtserror.Newf("error populating report: %w", err) + } + + // Do nothing if report target + // is not remote account. + if report.TargetAccount.IsLocal() { + return nil + } + + // Get our instance account from the db: + // to anonymize the report, we'll deliver + // using the outbox of the instance account. + instanceAcct, err := f.state.DB.GetInstanceAccount(ctx, "") + if err != nil { + return gtserror.Newf("error getting instance account: %w", err) + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(instanceAcct.OutboxURI) + if err != nil { + return err + } + + targetAccountIRI, err := parseURI(report.TargetAccount.URI) + if err != nil { + return err + } + + // Convert report to ActivityStreams Flag. + flag, err := f.tc.ReportToASFlag(ctx, report) + if err != nil { + return gtserror.Newf("error converting report to AS: %w", err) + } + + // To is not set explicitly on Flags. Instead, + // address Flag BTo report target account URI. + // This ensures that our federating actor still + // knows where to send the report, but the BTo + // property will be stripped before sending. + // + // Happily, BTo does not prevent federating + // actor from using shared inbox to deliver. + bTo := streams.NewActivityStreamsBtoProperty() + bTo.AppendIRI(targetAccountIRI) + flag.SetActivityStreamsBto(bTo) + + // Send the Flag via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, flag, + ); err != nil { + return gtserror.Newf( + "error sending activity %T via outbox %s: %w", + flag, outboxIRI, err, + ) + } + + return nil +} diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go new file mode 100644 index 000000000..40efc20bb --- /dev/null +++ b/internal/processing/workers/fromclientapi.go @@ -0,0 +1,548 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + "errors" + + "codeberg.org/gruf/go-kv" + "codeberg.org/gruf/go-logger/v2/level" + "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +// clientAPI wraps processing functions +// specifically for messages originating +// from the client/REST API. +type clientAPI struct { + state *state.State + tc typeutils.TypeConverter + surface *surface + federate *federate + wipeStatus wipeStatus + account *account.Processor +} + +func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) { + log.Trace(ctx, "enqueuing") + _ = p.workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) { + for _, msg := range msgs { + log.Trace(ctx, "processing: %+v", msg) + if err := p.ProcessFromClientAPI(ctx, msg); err != nil { + log.Errorf(ctx, "error processing client API message: %v", err) + } + } + }) +} + +func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg messages.FromClientAPI) error { + // Allocate new log fields slice + fields := make([]kv.Field, 3, 4) + fields[0] = kv.Field{"activityType", cMsg.APActivityType} + fields[1] = kv.Field{"objectType", cMsg.APObjectType} + fields[2] = kv.Field{"fromAccount", cMsg.OriginAccount.Username} + + // Include GTSModel in logs if appropriate. + if cMsg.GTSModel != nil && + log.Level() >= level.DEBUG { + fields = append(fields, kv.Field{ + "model", cMsg.GTSModel, + }) + } + + l := log.WithContext(ctx).WithFields(fields...) + l.Info("processing from client API") + + switch cMsg.APActivityType { + + // CREATE SOMETHING + case ap.ActivityCreate: + switch cMsg.APObjectType { + + // CREATE PROFILE/ACCOUNT + case ap.ObjectProfile, ap.ActorPerson: + return p.clientAPI.CreateAccount(ctx, cMsg) + + // CREATE NOTE/STATUS + case ap.ObjectNote: + return p.clientAPI.CreateStatus(ctx, cMsg) + + // CREATE FOLLOW (request) + case ap.ActivityFollow: + return p.clientAPI.CreateFollowReq(ctx, cMsg) + + // CREATE LIKE/FAVE + case ap.ActivityLike: + return p.clientAPI.CreateLike(ctx, cMsg) + + // CREATE ANNOUNCE/BOOST + case ap.ActivityAnnounce: + return p.clientAPI.CreateAnnounce(ctx, cMsg) + + // CREATE BLOCK + case ap.ActivityBlock: + return p.clientAPI.CreateBlock(ctx, cMsg) + } + + // UPDATE SOMETHING + case ap.ActivityUpdate: + switch cMsg.APObjectType { + + // UPDATE PROFILE/ACCOUNT + case ap.ObjectProfile, ap.ActorPerson: + return p.clientAPI.UpdateAccount(ctx, cMsg) + + // UPDATE A FLAG/REPORT (mark as resolved/closed) + case ap.ActivityFlag: + return p.clientAPI.UpdateReport(ctx, cMsg) + } + + // ACCEPT SOMETHING + case ap.ActivityAccept: + switch cMsg.APObjectType { //nolint:gocritic + + // ACCEPT FOLLOW (request) + case ap.ActivityFollow: + return p.clientAPI.AcceptFollow(ctx, cMsg) + } + + // REJECT SOMETHING + case ap.ActivityReject: + switch cMsg.APObjectType { //nolint:gocritic + + // REJECT FOLLOW (request) + case ap.ActivityFollow: + return p.clientAPI.RejectFollowRequest(ctx, cMsg) + } + + // UNDO SOMETHING + case ap.ActivityUndo: + switch cMsg.APObjectType { + + // UNDO FOLLOW (request) + case ap.ActivityFollow: + return p.clientAPI.UndoFollow(ctx, cMsg) + + // UNDO BLOCK + case ap.ActivityBlock: + return p.clientAPI.UndoBlock(ctx, cMsg) + + // UNDO LIKE/FAVE + case ap.ActivityLike: + return p.clientAPI.UndoFave(ctx, cMsg) + + // UNDO ANNOUNCE/BOOST + case ap.ActivityAnnounce: + return p.clientAPI.UndoAnnounce(ctx, cMsg) + } + + // DELETE SOMETHING + case ap.ActivityDelete: + switch cMsg.APObjectType { + + // DELETE NOTE/STATUS + case ap.ObjectNote: + return p.clientAPI.DeleteStatus(ctx, cMsg) + + // DELETE PROFILE/ACCOUNT + case ap.ObjectProfile, ap.ActorPerson: + return p.clientAPI.DeleteAccount(ctx, cMsg) + } + + // FLAG/REPORT SOMETHING + case ap.ActivityFlag: + switch cMsg.APObjectType { //nolint:gocritic + + // FLAG/REPORT A PROFILE + case ap.ObjectProfile: + return p.clientAPI.ReportAccount(ctx, cMsg) + } + } + + return nil +} + +func (p *clientAPI) CreateAccount(ctx context.Context, cMsg messages.FromClientAPI) error { + account, ok := cMsg.GTSModel.(*gtsmodel.Account) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Account", cMsg.GTSModel) + } + + // Send a confirmation email to the newly created account. + user, err := p.state.DB.GetUserByAccountID(ctx, account.ID) + if err != nil { + return gtserror.Newf("db error getting user for account id %s: %w", account.ID, err) + } + + if err := p.surface.emailPleaseConfirm(ctx, user, account.Username); err != nil { + return gtserror.Newf("error emailing %s: %w", account.Username, err) + } + + return nil +} + +func (p *clientAPI) CreateStatus(ctx context.Context, cMsg messages.FromClientAPI) error { + status, ok := cMsg.GTSModel.(*gtsmodel.Status) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) + } + + if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { + return gtserror.Newf("error timelining status: %w", err) + } + + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + if err := p.federate.CreateStatus(ctx, status); err != nil { + return gtserror.Newf("error federating status: %w", err) + } + + return nil +} + +func (p *clientAPI) CreateFollowReq(ctx context.Context, cMsg messages.FromClientAPI) error { + followRequest, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel) + } + + if err := p.surface.notifyFollowRequest(ctx, followRequest); err != nil { + return gtserror.Newf("error notifying follow request: %w", err) + } + + if err := p.federate.Follow( + ctx, + p.tc.FollowRequestToFollow(ctx, followRequest), + ); err != nil { + return gtserror.Newf("error federating follow: %w", err) + } + + return nil +} + +func (p *clientAPI) CreateLike(ctx context.Context, cMsg messages.FromClientAPI) error { + fave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) + } + + if err := p.surface.notifyFave(ctx, fave); err != nil { + return gtserror.Newf("error notifying fave: %w", err) + } + + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + + if err := p.federate.Like(ctx, fave); err != nil { + return gtserror.Newf("error federating like: %w", err) + } + + return nil +} + +func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error { + boost, ok := cMsg.GTSModel.(*gtsmodel.Status) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) + } + + // Timeline and notify the boost wrapper status. + if err := p.surface.timelineAndNotifyStatus(ctx, boost); err != nil { + return gtserror.Newf("error timelining boost: %w", err) + } + + // Notify the boost target account. + if err := p.surface.notifyAnnounce(ctx, boost); err != nil { + return gtserror.Newf("error notifying boost: %w", err) + } + + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + + if err := p.federate.Announce(ctx, boost); err != nil { + return gtserror.Newf("error federating announce: %w", err) + } + + return nil +} + +func (p *clientAPI) CreateBlock(ctx context.Context, cMsg messages.FromClientAPI) error { + block, ok := cMsg.GTSModel.(*gtsmodel.Block) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) + } + + // Remove blockee's statuses from blocker's timeline. + if err := p.state.Timelines.Home.WipeItemsFromAccountID( + ctx, + block.AccountID, + block.TargetAccountID, + ); err != nil { + return gtserror.Newf("error wiping timeline items for block: %w", err) + } + + // Remove blocker's statuses from blockee's timeline. + if err := p.state.Timelines.Home.WipeItemsFromAccountID( + ctx, + block.TargetAccountID, + block.AccountID, + ); err != nil { + return gtserror.Newf("error wiping timeline items for block: %w", err) + } + + // TODO: same with notifications? + // TODO: same with bookmarks? + + if err := p.federate.Block(ctx, block); err != nil { + return gtserror.Newf("error federating block: %w", err) + } + + return nil +} + +func (p *clientAPI) UpdateAccount(ctx context.Context, cMsg messages.FromClientAPI) error { + account, ok := cMsg.GTSModel.(*gtsmodel.Account) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Account", cMsg.GTSModel) + } + + if err := p.federate.UpdateAccount(ctx, account); err != nil { + return gtserror.Newf("error federating account update: %w", err) + } + + return nil +} + +func (p *clientAPI) UpdateReport(ctx context.Context, cMsg messages.FromClientAPI) error { + report, ok := cMsg.GTSModel.(*gtsmodel.Report) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel) + } + + if report.Account.IsRemote() { + // Report creator is a remote account, + // we shouldn't try to email them! + return nil + } + + if err := p.surface.emailReportClosed(ctx, report); err != nil { + return gtserror.Newf("error sending report closed email: %w", err) + } + + return nil +} + +func (p *clientAPI) AcceptFollow(ctx context.Context, cMsg messages.FromClientAPI) error { + follow, ok := cMsg.GTSModel.(*gtsmodel.Follow) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel) + } + + if err := p.surface.notifyFollow(ctx, follow); err != nil { + return gtserror.Newf("error notifying follow: %w", err) + } + + if err := p.federate.AcceptFollow(ctx, follow); err != nil { + return gtserror.Newf("error federating follow request accept: %w", err) + } + + return nil +} + +func (p *clientAPI) RejectFollowRequest(ctx context.Context, cMsg messages.FromClientAPI) error { + followReq, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel) + } + + if err := p.federate.RejectFollow( + ctx, + p.tc.FollowRequestToFollow(ctx, followReq), + ); err != nil { + return gtserror.Newf("error federating reject follow: %w", err) + } + + return nil +} + +func (p *clientAPI) UndoFollow(ctx context.Context, cMsg messages.FromClientAPI) error { + follow, ok := cMsg.GTSModel.(*gtsmodel.Follow) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel) + } + + if err := p.federate.UndoFollow(ctx, follow); err != nil { + return gtserror.Newf("error federating undo follow: %w", err) + } + + return nil +} + +func (p *clientAPI) UndoBlock(ctx context.Context, cMsg messages.FromClientAPI) error { + block, ok := cMsg.GTSModel.(*gtsmodel.Block) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel) + } + + if err := p.federate.UndoBlock(ctx, block); err != nil { + return gtserror.Newf("error federating undo block: %w", err) + } + + return nil +} + +func (p *clientAPI) UndoFave(ctx context.Context, cMsg messages.FromClientAPI) error { + statusFave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) + } + + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + + if err := p.federate.UndoLike(ctx, statusFave); err != nil { + return gtserror.Newf("error federating undo like: %w", err) + } + + return nil +} + +func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error { + status, ok := cMsg.GTSModel.(*gtsmodel.Status) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) + } + + if err := p.state.DB.DeleteStatusByID(ctx, status.ID); err != nil { + return gtserror.Newf("db error deleting status: %w", err) + } + + if err := p.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil { + return gtserror.Newf("error removing status from timelines: %w", err) + } + + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + + if err := p.federate.UndoAnnounce(ctx, status); err != nil { + return gtserror.Newf("error federating undo announce: %w", err) + } + + return nil +} + +func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg messages.FromClientAPI) error { + // Don't delete attachments, just unattach them: + // this request comes from the client API and the + // poster may want to use attachments again later. + const deleteAttachments = false + + status, ok := cMsg.GTSModel.(*gtsmodel.Status) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel) + } + + // Try to populate status structs if possible, + // in order to more thoroughly remove them. + if err := p.state.DB.PopulateStatus( + ctx, status, + ); err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error populating status: %w", err) + } + + if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil { + return gtserror.Newf("error wiping status: %w", err) + } + + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + if err := p.federate.DeleteStatus(ctx, status); err != nil { + return gtserror.Newf("error federating status delete: %w", err) + } + + return nil +} + +func (p *clientAPI) DeleteAccount(ctx context.Context, cMsg messages.FromClientAPI) error { + // The originID of the delete, one of: + // - ID of a domain block, for which + // this account delete is a side effect. + // - ID of the deleted account itself (self delete). + // - ID of an admin account (account suspension). + var originID string + + if domainBlock, ok := cMsg.GTSModel.(*gtsmodel.DomainBlock); ok { + // Origin is a domain block. + originID = domainBlock.ID + } else { + // Origin is whichever account + // originated this message. + originID = cMsg.OriginAccount.ID + } + + if err := p.federate.DeleteAccount(ctx, cMsg.TargetAccount); err != nil { + return gtserror.Newf("error federating account delete: %w", err) + } + + if err := p.account.Delete(ctx, cMsg.TargetAccount, originID); err != nil { + return gtserror.Newf("error deleting account: %w", err) + } + + return nil +} + +func (p *clientAPI) ReportAccount(ctx context.Context, cMsg messages.FromClientAPI) error { + report, ok := cMsg.GTSModel.(*gtsmodel.Report) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel) + } + + // Federate this report to the + // remote instance if desired. + if *report.Forwarded { + if err := p.federate.Flag(ctx, report); err != nil { + return gtserror.Newf("error federating report: %w", err) + } + } + + if err := p.surface.emailReportOpened(ctx, report); err != nil { + return gtserror.Newf("error sending report opened email: %w", err) + } + + return nil +} diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go new file mode 100644 index 000000000..6690a43db --- /dev/null +++ b/internal/processing/workers/fromclientapi_test.go @@ -0,0 +1,589 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers_test + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/stream" + "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type FromClientAPITestSuite struct { + WorkersTestSuite +} + +func (suite *FromClientAPITestSuite) newStatus( + ctx context.Context, + account *gtsmodel.Account, + visibility gtsmodel.Visibility, + replyToStatus *gtsmodel.Status, + boostOfStatus *gtsmodel.Status, +) *gtsmodel.Status { + var ( + protocol = config.GetProtocol() + host = config.GetHost() + statusID = id.NewULID() + ) + + // Make a new status from given account. + newStatus := >smodel.Status{ + ID: statusID, + URI: protocol + "://" + host + "/users/" + account.Username + "/statuses/" + statusID, + URL: protocol + "://" + host + "/@" + account.Username + "/statuses/" + statusID, + Content: "pee pee poo poo", + Local: util.Ptr(true), + AccountURI: account.URI, + AccountID: account.ID, + Visibility: visibility, + ActivityStreamsType: ap.ObjectNote, + Federated: util.Ptr(true), + Boostable: util.Ptr(true), + Replyable: util.Ptr(true), + Likeable: util.Ptr(true), + } + + if replyToStatus != nil { + // Status is a reply. + newStatus.InReplyToAccountID = replyToStatus.AccountID + newStatus.InReplyToID = replyToStatus.ID + newStatus.InReplyToURI = replyToStatus.URI + + // Mention the replied-to account. + mention := >smodel.Mention{ + ID: id.NewULID(), + StatusID: statusID, + OriginAccountID: account.ID, + OriginAccountURI: account.URI, + TargetAccountID: replyToStatus.AccountID, + } + + if err := suite.db.PutMention(ctx, mention); err != nil { + suite.FailNow(err.Error()) + } + newStatus.Mentions = []*gtsmodel.Mention{mention} + newStatus.MentionIDs = []string{mention.ID} + } + + if boostOfStatus != nil { + // Status is a boost. + + } + + // Put the status in the db, to mimic what would + // have already happened earlier up the flow. + if err := suite.db.PutStatus(ctx, newStatus); err != nil { + suite.FailNow(err.Error()) + } + + return newStatus +} + +func (suite *FromClientAPITestSuite) checkStreamed( + str *stream.Stream, + expectMessage bool, + expectPayload string, + expectEventType string, +) { + var msg *stream.Message +streamLoop: + for { + select { + case msg = <-str.Messages: + break streamLoop // Got it. + case <-time.After(5 * time.Second): + break streamLoop // Didn't get it. + } + } + + if expectMessage && msg == nil { + suite.FailNow("expected a message but message was nil") + } + + if !expectMessage && msg != nil { + suite.FailNow("expected no message but message was not nil") + } + + if expectPayload != "" && msg.Payload != expectPayload { + suite.FailNow("", "expected payload %s but payload was: %s", expectPayload, msg.Payload) + } + + if expectEventType != "" && msg.Event != expectEventType { + suite.FailNow("", "expected event type %s but event type was: %s", expectEventType, msg.Event) + } +} + +func (suite *FromClientAPITestSuite) statusJSON( + ctx context.Context, + status *gtsmodel.Status, + requestingAccount *gtsmodel.Account, +) string { + apiStatus, err := suite.typeconverter.StatusToAPIStatus( + ctx, + status, + requestingAccount, + ) + if err != nil { + suite.FailNow(err.Error()) + } + + statusJSON, err := json.Marshal(apiStatus) + if err != nil { + suite.FailNow(err.Error()) + } + + return string(statusJSON) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["admin_account"] + receivingAccount = suite.testAccounts["local_account_1"] + testList = suite.testLists["local_account_1_list_1"] + streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + notifStream = streams[stream.TimelineNotifications] + + // Admin account posts a new top-level status. + status = suite.newStatus( + ctx, + postingAccount, + gtsmodel.VisibilityPublic, + nil, + nil, + ) + statusJSON = suite.statusJSON( + ctx, + status, + receivingAccount, + ) + ) + + // Update the follow from receiving account -> posting account so + // that receiving account wants notifs when posting account posts. + follow := new(gtsmodel.Follow) + *follow = *suite.testFollows["local_account_1_admin_account"] + + follow.Notify = util.Ptr(true) + if err := suite.db.UpdateFollow(ctx, follow); err != nil { + suite.FailNow(err.Error()) + } + + // Process the new status. + if err := suite.processor.Workers().ProcessFromClientAPI( + ctx, + messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + OriginAccount: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Check message in home stream. + suite.checkStreamed( + homeStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) + + // Check message in list stream. + suite.checkStreamed( + listStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) + + // Wait for a notification to appear for the status. + var notif *gtsmodel.Notification + if !testrig.WaitFor(func() bool { + var err error + notif, err = suite.db.GetNotification( + ctx, + gtsmodel.NotificationStatus, + receivingAccount.ID, + postingAccount.ID, + status.ID, + ) + return err == nil + }) { + suite.FailNow("timed out waiting for new status notification") + } + + apiNotif, err := suite.typeconverter.NotificationToAPINotification(ctx, notif) + if err != nil { + suite.FailNow(err.Error()) + } + + notifJSON, err := json.Marshal(apiNotif) + if err != nil { + suite.FailNow(err.Error()) + } + + // Check message in notification stream. + suite.checkStreamed( + notifStream, + true, + string(notifJSON), + stream.EventTypeNotification, + ) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["admin_account"] + receivingAccount = suite.testAccounts["local_account_1"] + testList = suite.testLists["local_account_1_list_1"] + streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + + // Admin account posts a reply to turtle. + // Since turtle is followed by zork, and + // the default replies policy for this list + // is to show replies to followed accounts, + // post should also show in the list stream. + status = suite.newStatus( + ctx, + postingAccount, + gtsmodel.VisibilityPublic, + suite.testStatuses["local_account_2_status_1"], + nil, + ) + statusJSON = suite.statusJSON( + ctx, + status, + receivingAccount, + ) + ) + + // Process the new status. + if err := suite.processor.Workers().ProcessFromClientAPI( + ctx, + messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + OriginAccount: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Check message in home stream. + suite.checkStreamed( + homeStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) + + // Check message in list stream. + suite.checkStreamed( + listStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyOK() { + // We're modifying the test list so take a copy. + testList := new(gtsmodel.List) + *testList = *suite.testLists["local_account_1_list_1"] + + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["admin_account"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + + // Admin account posts a reply to turtle. + status = suite.newStatus( + ctx, + postingAccount, + gtsmodel.VisibilityPublic, + suite.testStatuses["local_account_2_status_1"], + nil, + ) + statusJSON = suite.statusJSON( + ctx, + status, + receivingAccount, + ) + ) + + // Modify replies policy of test list to show replies + // only to other accounts in the same list. Since turtle + // and admin are in the same list, this means the reply + // should be shown in the list. + testList.RepliesPolicy = gtsmodel.RepliesPolicyList + if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil { + suite.FailNow(err.Error()) + } + + // Process the new status. + if err := suite.processor.Workers().ProcessFromClientAPI( + ctx, + messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + OriginAccount: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Check message in home stream. + suite.checkStreamed( + homeStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) + + // Check message in list stream. + suite.checkStreamed( + listStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyNo() { + // We're modifying the test list so take a copy. + testList := new(gtsmodel.List) + *testList = *suite.testLists["local_account_1_list_1"] + + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["admin_account"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + + // Admin account posts a reply to turtle. + status = suite.newStatus( + ctx, + postingAccount, + gtsmodel.VisibilityPublic, + suite.testStatuses["local_account_2_status_1"], + nil, + ) + statusJSON = suite.statusJSON( + ctx, + status, + receivingAccount, + ) + ) + + // Modify replies policy of test list to show replies + // only to other accounts in the same list. We're + // about to remove turtle from the same list as admin, + // so the new post should not be streamed to the list. + testList.RepliesPolicy = gtsmodel.RepliesPolicyList + if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil { + suite.FailNow(err.Error()) + } + + // Remove turtle from the list. + if err := suite.db.DeleteListEntry(ctx, suite.testListEntries["local_account_1_list_1_entry_1"].ID); err != nil { + suite.FailNow(err.Error()) + } + + // Process the new status. + if err := suite.processor.Workers().ProcessFromClientAPI( + ctx, + messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + OriginAccount: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Check message in home stream. + suite.checkStreamed( + homeStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) + + // Check message NOT in list stream. + suite.checkStreamed( + listStream, + false, + "", + "", + ) +} + +func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPolicyNone() { + // We're modifying the test list so take a copy. + testList := new(gtsmodel.List) + *testList = *suite.testLists["local_account_1_list_1"] + + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["admin_account"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + + // Admin account posts a reply to turtle. + status = suite.newStatus( + ctx, + postingAccount, + gtsmodel.VisibilityPublic, + suite.testStatuses["local_account_2_status_1"], + nil, + ) + statusJSON = suite.statusJSON( + ctx, + status, + receivingAccount, + ) + ) + + // Modify replies policy of test list. + // Since we're modifying the list to not + // show any replies, the post should not + // be streamed to the list. + testList.RepliesPolicy = gtsmodel.RepliesPolicyNone + if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil { + suite.FailNow(err.Error()) + } + + // Process the new status. + if err := suite.processor.Workers().ProcessFromClientAPI( + ctx, + messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: status, + OriginAccount: postingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Check message in home stream. + suite.checkStreamed( + homeStream, + true, + statusJSON, + stream.EventTypeUpdate, + ) + + // Check message NOT in list stream. + suite.checkStreamed( + listStream, + false, + "", + "", + ) +} + +func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { + var ( + ctx = context.Background() + deletingAccount = suite.testAccounts["local_account_1"] + receivingAccount = suite.testAccounts["local_account_2"] + deletedStatus = suite.testStatuses["local_account_1_status_1"] + boostOfDeletedStatus = suite.testStatuses["admin_account_status_4"] + streams = suite.openStreams(ctx, receivingAccount, nil) + homeStream = streams[stream.TimelineHome] + ) + + // Delete the status from the db first, to mimic what + // would have already happened earlier up the flow + if err := suite.db.DeleteStatusByID(ctx, deletedStatus.ID); err != nil { + suite.FailNow(err.Error()) + } + + // Process the status delete. + if err := suite.processor.Workers().ProcessFromClientAPI( + ctx, + messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityDelete, + GTSModel: deletedStatus, + OriginAccount: deletingAccount, + }, + ); err != nil { + suite.FailNow(err.Error()) + } + + // Stream should have the delete + // of admin's boost in it now. + suite.checkStreamed( + homeStream, + true, + boostOfDeletedStatus.ID, + stream.EventTypeDelete, + ) + + // Stream should also have the delete + // of the message itself in it. + suite.checkStreamed( + homeStream, + true, + deletedStatus.ID, + stream.EventTypeDelete, + ) + + // Boost should no longer be in the database. + if !testrig.WaitFor(func() bool { + _, err := suite.db.GetStatusByID(ctx, boostOfDeletedStatus.ID) + return errors.Is(err, db.ErrNoEntries) + }) { + suite.FailNow("timed out waiting for status delete") + } +} + +func TestFromClientAPITestSuite(t *testing.T) { + suite.Run(t, &FromClientAPITestSuite{}) +} diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go new file mode 100644 index 000000000..5fbb0066b --- /dev/null +++ b/internal/processing/workers/fromfediapi.go @@ -0,0 +1,540 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + "net/url" + + "codeberg.org/gruf/go-kv" + "codeberg.org/gruf/go-logger/v2/level" + "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/state" +) + +// fediAPI wraps processing functions +// specifically for messages originating +// from the federation/ActivityPub API. +type fediAPI struct { + state *state.State + surface *surface + federate *federate + wipeStatus wipeStatus + account *account.Processor +} + +func (p *Processor) EnqueueFediAPI(ctx context.Context, msgs ...messages.FromFediAPI) { + log.Trace(ctx, "enqueuing") + _ = p.workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + for _, msg := range msgs { + log.Trace(ctx, "processing: %+v", msg) + if err := p.ProcessFromFediAPI(ctx, msg); err != nil { + log.Errorf(ctx, "error processing fedi API message: %v", err) + } + } + }) +} + +func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFediAPI) error { + // Allocate new log fields slice + fields := make([]kv.Field, 3, 5) + fields[0] = kv.Field{"activityType", fMsg.APActivityType} + fields[1] = kv.Field{"objectType", fMsg.APObjectType} + fields[2] = kv.Field{"toAccount", fMsg.ReceivingAccount.Username} + + if fMsg.APIri != nil { + // An IRI was supplied, append to log + fields = append(fields, kv.Field{ + "iri", fMsg.APIri, + }) + } + + // Include GTSModel in logs if appropriate. + if fMsg.GTSModel != nil && + log.Level() >= level.DEBUG { + fields = append(fields, kv.Field{ + "model", fMsg.GTSModel, + }) + } + + l := log.WithContext(ctx).WithFields(fields...) + l.Info("processing from fedi API") + + switch fMsg.APActivityType { + + // CREATE SOMETHING + case ap.ActivityCreate: + switch fMsg.APObjectType { + + // CREATE NOTE/STATUS + case ap.ObjectNote: + return p.fediAPI.CreateStatus(ctx, fMsg) + + // CREATE FOLLOW (request) + case ap.ActivityFollow: + return p.fediAPI.CreateFollowReq(ctx, fMsg) + + // CREATE LIKE/FAVE + case ap.ActivityLike: + return p.fediAPI.CreateLike(ctx, fMsg) + + // CREATE ANNOUNCE/BOOST + case ap.ActivityAnnounce: + return p.fediAPI.CreateAnnounce(ctx, fMsg) + + // CREATE BLOCK + case ap.ActivityBlock: + return p.fediAPI.CreateBlock(ctx, fMsg) + + // CREATE FLAG/REPORT + case ap.ActivityFlag: + return p.fediAPI.CreateFlag(ctx, fMsg) + } + + // UPDATE SOMETHING + case ap.ActivityUpdate: + switch fMsg.APObjectType { //nolint:gocritic + + // UPDATE PROFILE/ACCOUNT + case ap.ObjectProfile: + return p.fediAPI.UpdateAccount(ctx, fMsg) + } + + // DELETE SOMETHING + case ap.ActivityDelete: + switch fMsg.APObjectType { + + // DELETE NOTE/STATUS + case ap.ObjectNote: + return p.fediAPI.DeleteStatus(ctx, fMsg) + + // DELETE PROFILE/ACCOUNT + case ap.ObjectProfile: + return p.fediAPI.DeleteAccount(ctx, fMsg) + } + } + + return nil +} + +func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error { + var ( + status *gtsmodel.Status + err error + + // Check the federatorMsg for either an already dereferenced + // and converted status pinned to the message, or a forwarded + // AP IRI that we still need to deref. + forwarded = (fMsg.GTSModel == nil) + ) + + if forwarded { + // Model was not set, deref with IRI. + // This will also cause the status to be inserted into the db. + status, err = p.statusFromAPIRI(ctx, fMsg) + } else { + // Model is set, ensure we have the most up-to-date model. + status, err = p.statusFromGTSModel(ctx, fMsg) + } + + if err != nil { + return gtserror.Newf("error extracting status from federatorMsg: %w", err) + } + + if status.Account == nil || status.Account.IsRemote() { + // Either no account attached yet, or a remote account. + // Both situations we need to parse account URI to fetch it. + accountURI, err := url.Parse(status.AccountURI) + if err != nil { + return err + } + + // Ensure that account for this status has been deref'd. + status.Account, _, err = p.federate.GetAccountByURI( + ctx, + fMsg.ReceivingAccount.Username, + accountURI, + ) + if err != nil { + return err + } + } + + // Ensure status ancestors dereferenced. We need at least the + // immediate parent (if present) to ascertain timelineability. + if err := p.federate.DereferenceStatusAncestors( + ctx, + fMsg.ReceivingAccount.Username, + status, + ); err != nil { + return err + } + + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { + return gtserror.Newf("error timelining status: %w", err) + } + + return nil +} + +func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { + // There should be a status pinned to the message: + // we've already checked to ensure this is not nil. + status, ok := fMsg.GTSModel.(*gtsmodel.Status) + if !ok { + err := gtserror.New("Note was not parseable as *gtsmodel.Status") + return nil, err + } + + // AP statusable representation may have also + // been set on message (no problem if not). + statusable, _ := fMsg.APObjectModel.(ap.Statusable) + + // Call refresh on status to update + // it (deref remote) if necessary. + var err error + status, _, err = p.federate.RefreshStatus( + ctx, + fMsg.ReceivingAccount.Username, + status, + statusable, + false, // Don't force refresh. + ) + if err != nil { + return nil, gtserror.Newf("%w", err) + } + + return status, nil +} + +func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { + // There should be a status IRI pinned to + // the federatorMsg for us to dereference. + if fMsg.APIri == nil { + err := gtserror.New( + "status was not pinned to federatorMsg, " + + "and neither was an IRI for us to dereference", + ) + return nil, err + } + + // Get the status + ensure we have + // the most up-to-date version. + status, _, err := p.federate.GetStatusByURI( + ctx, + fMsg.ReceivingAccount.Username, + fMsg.APIri, + ) + if err != nil { + return nil, gtserror.Newf("%w", err) + } + + return status, nil +} + +func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI) error { + followRequest, ok := fMsg.GTSModel.(*gtsmodel.FollowRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", fMsg.GTSModel) + } + + if *followRequest.TargetAccount.Locked { + // Account on our instance is locked: + // just notify the follow request. + if err := p.surface.notifyFollowRequest(ctx, followRequest); err != nil { + return gtserror.Newf("error notifying follow request: %w", err) + } + + return nil + } + + // Account on our instance is not locked: + // Automatically accept the follow request + // and notify about the new follower. + follow, err := p.state.DB.AcceptFollowRequest( + ctx, + followRequest.AccountID, + followRequest.TargetAccountID, + ) + if err != nil { + return gtserror.Newf("error accepting follow request: %w", err) + } + + if err := p.federate.AcceptFollow(ctx, follow); err != nil { + return gtserror.Newf("error federating accept follow request: %w", err) + } + + if err := p.surface.notifyFollow(ctx, follow); err != nil { + return gtserror.Newf("error notifying follow: %w", err) + } + + return nil +} + +func (p *fediAPI) CreateLike(ctx context.Context, fMsg messages.FromFediAPI) error { + fave, ok := fMsg.GTSModel.(*gtsmodel.StatusFave) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", fMsg.GTSModel) + } + + if err := p.surface.notifyFave(ctx, fave); err != nil { + return gtserror.Newf("error notifying fave: %w", err) + } + + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + + return nil +} + +func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) error { + status, ok := fMsg.GTSModel.(*gtsmodel.Status) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel) + } + + // Dereference status that this status boosts. + if err := p.federate.DereferenceAnnounce( + ctx, + status, + fMsg.ReceivingAccount.Username, + ); err != nil { + return gtserror.Newf("error dereferencing announce: %w", err) + } + + // Generate an ID for the boost wrapper status. + statusID, err := id.NewULIDFromTime(status.CreatedAt) + if err != nil { + return gtserror.Newf("error generating id: %w", err) + } + status.ID = statusID + + // Store the boost wrapper status. + if err := p.state.DB.PutStatus(ctx, status); err != nil { + return gtserror.Newf("db error inserting status: %w", err) + } + + // Ensure boosted status ancestors dereferenced. We need at least + // the immediate parent (if present) to ascertain timelineability. + if err := p.federate.DereferenceStatusAncestors(ctx, + fMsg.ReceivingAccount.Username, + status.BoostOf, + ); err != nil { + return err + } + + // Timeline and notify the announce. + if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { + return gtserror.Newf("error timelining status: %w", err) + } + + if err := p.surface.notifyAnnounce(ctx, status); err != nil { + return gtserror.Newf("error notifying status: %w", err) + } + + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.ID) + + return nil +} + +func (p *fediAPI) CreateBlock(ctx context.Context, fMsg messages.FromFediAPI) error { + block, ok := fMsg.GTSModel.(*gtsmodel.Block) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) + } + + // Remove each account's posts from the other's timelines. + // + // First home timelines. + if err := p.state.Timelines.Home.WipeItemsFromAccountID( + ctx, + block.AccountID, + block.TargetAccountID, + ); err != nil { + return gtserror.Newf("%w", err) + } + + if err := p.state.Timelines.Home.WipeItemsFromAccountID( + ctx, + block.TargetAccountID, + block.AccountID, + ); err != nil { + return gtserror.Newf("%w", err) + } + + // Now list timelines. + if err := p.state.Timelines.List.WipeItemsFromAccountID( + ctx, + block.AccountID, + block.TargetAccountID, + ); err != nil { + return gtserror.Newf("%w", err) + } + + if err := p.state.Timelines.List.WipeItemsFromAccountID( + ctx, + block.TargetAccountID, + block.AccountID, + ); err != nil { + return gtserror.Newf("%w", err) + } + + // Remove any follows that existed between blocker + blockee. + if err := p.state.DB.DeleteFollow( + ctx, + block.AccountID, + block.TargetAccountID, + ); err != nil { + return gtserror.Newf( + "db error deleting follow from %s targeting %s: %w", + block.AccountID, block.TargetAccountID, err, + ) + } + + if err := p.state.DB.DeleteFollow( + ctx, + block.TargetAccountID, + block.AccountID, + ); err != nil { + return gtserror.Newf( + "db error deleting follow from %s targeting %s: %w", + block.TargetAccountID, block.AccountID, err, + ) + } + + // Remove any follow requests that existed between blocker + blockee. + if err := p.state.DB.DeleteFollowRequest( + ctx, + block.AccountID, + block.TargetAccountID, + ); err != nil { + return gtserror.Newf( + "db error deleting follow request from %s targeting %s: %w", + block.AccountID, block.TargetAccountID, err, + ) + } + + if err := p.state.DB.DeleteFollowRequest( + ctx, + block.TargetAccountID, + block.AccountID, + ); err != nil { + return gtserror.Newf( + "db error deleting follow request from %s targeting %s: %w", + block.TargetAccountID, block.AccountID, err, + ) + } + + return nil +} + +func (p *fediAPI) CreateFlag(ctx context.Context, fMsg messages.FromFediAPI) error { + incomingReport, ok := fMsg.GTSModel.(*gtsmodel.Report) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Report", fMsg.GTSModel) + } + + // TODO: handle additional side effects of flag creation: + // - notify admins by dm / notification + + if err := p.surface.emailReportOpened(ctx, incomingReport); err != nil { + return gtserror.Newf("error sending report opened email: %w", err) + } + + return nil +} + +func (p *fediAPI) UpdateAccount(ctx context.Context, fMsg messages.FromFediAPI) error { + // Parse the old/existing account model. + account, ok := fMsg.GTSModel.(*gtsmodel.Account) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Account", fMsg.GTSModel) + } + + // Because this was an Update, the new Accountable should be set on the message. + apubAcc, ok := fMsg.APObjectModel.(ap.Accountable) + if !ok { + return gtserror.Newf("%T not parseable as ap.Accountable", fMsg.APObjectModel) + } + + // Fetch up-to-date bio, avatar, header, etc. + _, _, err := p.federate.RefreshAccount( + ctx, + fMsg.ReceivingAccount.Username, + account, + apubAcc, + true, // Force refresh. + ) + if err != nil { + return gtserror.Newf("error refreshing updated account: %w", err) + } + + return nil +} + +func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg messages.FromFediAPI) error { + // Delete attachments from this status, since this request + // comes from the federating API, and there's no way the + // poster can do a delete + redraft for it on our instance. + const deleteAttachments = true + + status, ok := fMsg.GTSModel.(*gtsmodel.Status) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel) + } + + if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil { + return gtserror.Newf("error wiping status: %w", err) + } + + if status.InReplyToID != "" { + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + } + + return nil +} + +func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg messages.FromFediAPI) error { + account, ok := fMsg.GTSModel.(*gtsmodel.Account) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.Account", fMsg.GTSModel) + } + + if err := p.account.Delete(ctx, account, account.ID); err != nil { + return gtserror.Newf("error deleting account: %w", err) + } + + return nil +} diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go new file mode 100644 index 000000000..f8e3941fc --- /dev/null +++ b/internal/processing/workers/fromfediapi_test.go @@ -0,0 +1,565 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/ap" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/stream" + "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type FromFediAPITestSuite struct { + WorkersTestSuite +} + +// remote_account_1 boosts the first status of local_account_1 +func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() { + boostedStatus := suite.testStatuses["local_account_1_status_1"] + boostingAccount := suite.testAccounts["remote_account_1"] + announceStatus := >smodel.Status{} + announceStatus.URI = "https://example.org/some-announce-uri" + announceStatus.BoostOf = >smodel.Status{ + URI: boostedStatus.URI, + } + announceStatus.CreatedAt = time.Now() + announceStatus.UpdatedAt = time.Now() + announceStatus.AccountID = boostingAccount.ID + announceStatus.AccountURI = boostingAccount.URI + announceStatus.Account = boostingAccount + announceStatus.Visibility = boostedStatus.Visibility + + err := suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ + APObjectType: ap.ActivityAnnounce, + APActivityType: ap.ActivityCreate, + GTSModel: announceStatus, + ReceivingAccount: suite.testAccounts["local_account_1"], + }) + suite.NoError(err) + + // side effects should be triggered + // 1. status should have an ID, and be in the database + suite.NotEmpty(announceStatus.ID) + _, err = suite.db.GetStatusByID(context.Background(), announceStatus.ID) + suite.NoError(err) + + // 2. a notification should exist for the announce + where := []db.Where{ + { + Key: "status_id", + Value: announceStatus.ID, + }, + } + notif := >smodel.Notification{} + err = suite.db.GetWhere(context.Background(), where, notif) + suite.NoError(err) + suite.Equal(gtsmodel.NotificationReblog, notif.NotificationType) + suite.Equal(boostedStatus.AccountID, notif.TargetAccountID) + suite.Equal(announceStatus.AccountID, notif.OriginAccountID) + suite.Equal(announceStatus.ID, notif.StatusID) + suite.False(*notif.Read) +} + +func (suite *FromFediAPITestSuite) TestProcessReplyMention() { + repliedAccount := suite.testAccounts["local_account_1"] + repliedStatus := suite.testStatuses["local_account_1_status_1"] + replyingAccount := suite.testAccounts["remote_account_1"] + + replyingStatus := >smodel.Status{ + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + URI: "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552", + URL: "http://fossbros-anonymous.io/@foss_satan/106221634728637552", + Content: `<p><span class="h-card"><a href="http://localhost:8080/@the_mighty_zork" class="u-url mention">@<span>the_mighty_zork</span></a></span> nice there it is:</p><p><a href="http://localhost:8080/users/the_mighty_zork/statuses/01F8MHAMCHF6Y650WCRSCP4WMY/activity" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://</span><span class="ellipsis">social.pixie.town/users/f0x/st</span><span class="invisible">atuses/106221628567855262/activity</span></a></p>`, + Mentions: []*gtsmodel.Mention{ + { + TargetAccountURI: repliedAccount.URI, + NameString: "@the_mighty_zork@localhost:8080", + }, + }, + AccountID: replyingAccount.ID, + AccountURI: replyingAccount.URI, + InReplyToID: repliedStatus.ID, + InReplyToURI: repliedStatus.URI, + InReplyToAccountID: repliedAccount.ID, + Visibility: gtsmodel.VisibilityUnlocked, + ActivityStreamsType: ap.ObjectNote, + Federated: util.Ptr(true), + Boostable: util.Ptr(true), + Replyable: util.Ptr(true), + Likeable: util.Ptr(false), + } + + wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), repliedAccount, stream.TimelineHome) + suite.NoError(errWithCode) + + // id the status based on the time it was created + statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt) + suite.NoError(err) + replyingStatus.ID = statusID + + err = suite.db.PutStatus(context.Background(), replyingStatus) + suite.NoError(err) + + err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: replyingStatus, + ReceivingAccount: suite.testAccounts["local_account_1"], + }) + suite.NoError(err) + + // side effects should be triggered + // 1. status should be in the database + suite.NotEmpty(replyingStatus.ID) + _, err = suite.db.GetStatusByID(context.Background(), replyingStatus.ID) + suite.NoError(err) + + // 2. a notification should exist for the mention + var notif gtsmodel.Notification + err = suite.db.GetWhere(context.Background(), []db.Where{ + {Key: "status_id", Value: replyingStatus.ID}, + }, ¬if) + suite.NoError(err) + suite.Equal(gtsmodel.NotificationMention, notif.NotificationType) + suite.Equal(replyingStatus.InReplyToAccountID, notif.TargetAccountID) + suite.Equal(replyingStatus.AccountID, notif.OriginAccountID) + suite.Equal(replyingStatus.ID, notif.StatusID) + suite.False(*notif.Read) + + // the notification should be streamed + var msg *stream.Message + select { + case msg = <-wssStream.Messages: + // fine + case <-time.After(5 * time.Second): + suite.FailNow("no message from wssStream") + } + + suite.Equal(stream.EventTypeNotification, msg.Event) + suite.NotEmpty(msg.Payload) + suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) + notifStreamed := &apimodel.Notification{} + err = json.Unmarshal([]byte(msg.Payload), notifStreamed) + suite.NoError(err) + suite.Equal("mention", notifStreamed.Type) + suite.Equal(replyingAccount.ID, notifStreamed.Account.ID) +} + +func (suite *FromFediAPITestSuite) TestProcessFave() { + favedAccount := suite.testAccounts["local_account_1"] + favedStatus := suite.testStatuses["local_account_1_status_1"] + favingAccount := suite.testAccounts["remote_account_1"] + + wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), favedAccount, stream.TimelineNotifications) + suite.NoError(errWithCode) + + fave := >smodel.StatusFave{ + ID: "01FGKJPXFTVQPG9YSSZ95ADS7Q", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + AccountID: favingAccount.ID, + Account: favingAccount, + TargetAccountID: favedAccount.ID, + TargetAccount: favedAccount, + StatusID: favedStatus.ID, + Status: favedStatus, + URI: favingAccount.URI + "/faves/aaaaaaaaaaaa", + } + + err := suite.db.Put(context.Background(), fave) + suite.NoError(err) + + err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ + APObjectType: ap.ActivityLike, + APActivityType: ap.ActivityCreate, + GTSModel: fave, + ReceivingAccount: favedAccount, + }) + suite.NoError(err) + + // side effects should be triggered + // 1. a notification should exist for the fave + where := []db.Where{ + { + Key: "status_id", + Value: favedStatus.ID, + }, + { + Key: "origin_account_id", + Value: favingAccount.ID, + }, + } + + notif := >smodel.Notification{} + err = suite.db.GetWhere(context.Background(), where, notif) + suite.NoError(err) + suite.Equal(gtsmodel.NotificationFave, notif.NotificationType) + suite.Equal(fave.TargetAccountID, notif.TargetAccountID) + suite.Equal(fave.AccountID, notif.OriginAccountID) + suite.Equal(fave.StatusID, notif.StatusID) + suite.False(*notif.Read) + + // 2. a notification should be streamed + var msg *stream.Message + select { + case msg = <-wssStream.Messages: + // fine + case <-time.After(5 * time.Second): + suite.FailNow("no message from wssStream") + } + suite.Equal(stream.EventTypeNotification, msg.Event) + suite.NotEmpty(msg.Payload) + suite.EqualValues([]string{stream.TimelineNotifications}, msg.Stream) +} + +// TestProcessFaveWithDifferentReceivingAccount ensures that when an account receives a fave that's for +// another account in their AP inbox, a notification isn't streamed to the receiving account. +// +// This tests for an issue we were seeing where Misskey sends out faves to inboxes of people that don't own +// the fave, but just follow the actor who received the fave. +func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount() { + receivingAccount := suite.testAccounts["local_account_2"] + favedAccount := suite.testAccounts["local_account_1"] + favedStatus := suite.testStatuses["local_account_1_status_1"] + favingAccount := suite.testAccounts["remote_account_1"] + + wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), receivingAccount, stream.TimelineHome) + suite.NoError(errWithCode) + + fave := >smodel.StatusFave{ + ID: "01FGKJPXFTVQPG9YSSZ95ADS7Q", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + AccountID: favingAccount.ID, + Account: favingAccount, + TargetAccountID: favedAccount.ID, + TargetAccount: favedAccount, + StatusID: favedStatus.ID, + Status: favedStatus, + URI: favingAccount.URI + "/faves/aaaaaaaaaaaa", + } + + err := suite.db.Put(context.Background(), fave) + suite.NoError(err) + + err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ + APObjectType: ap.ActivityLike, + APActivityType: ap.ActivityCreate, + GTSModel: fave, + ReceivingAccount: receivingAccount, + }) + suite.NoError(err) + + // side effects should be triggered + // 1. a notification should exist for the fave + where := []db.Where{ + { + Key: "status_id", + Value: favedStatus.ID, + }, + { + Key: "origin_account_id", + Value: favingAccount.ID, + }, + } + + notif := >smodel.Notification{} + err = suite.db.GetWhere(context.Background(), where, notif) + suite.NoError(err) + suite.Equal(gtsmodel.NotificationFave, notif.NotificationType) + suite.Equal(fave.TargetAccountID, notif.TargetAccountID) + suite.Equal(fave.AccountID, notif.OriginAccountID) + suite.Equal(fave.StatusID, notif.StatusID) + suite.False(*notif.Read) + + // 2. no notification should be streamed to the account that received the fave message, because they weren't the target + suite.Empty(wssStream.Messages) +} + +func (suite *FromFediAPITestSuite) TestProcessAccountDelete() { + ctx := context.Background() + + deletedAccount := suite.testAccounts["remote_account_1"] + receivingAccount := suite.testAccounts["local_account_1"] + + // before doing the delete.... + // make local_account_1 and remote_account_1 into mufos + zorkFollowSatan := >smodel.Follow{ + ID: "01FGRY72ASHBSET64353DPHK9T", + CreatedAt: time.Now().Add(-1 * time.Hour), + UpdatedAt: time.Now().Add(-1 * time.Hour), + AccountID: deletedAccount.ID, + TargetAccountID: receivingAccount.ID, + ShowReblogs: util.Ptr(true), + URI: fmt.Sprintf("%s/follows/01FGRY72ASHBSET64353DPHK9T", deletedAccount.URI), + Notify: util.Ptr(false), + } + err := suite.db.Put(ctx, zorkFollowSatan) + suite.NoError(err) + + satanFollowZork := >smodel.Follow{ + ID: "01FGRYAVAWWPP926J175QGM0WV", + CreatedAt: time.Now().Add(-1 * time.Hour), + UpdatedAt: time.Now().Add(-1 * time.Hour), + AccountID: receivingAccount.ID, + TargetAccountID: deletedAccount.ID, + ShowReblogs: util.Ptr(true), + URI: fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", receivingAccount.URI), + Notify: util.Ptr(false), + } + err = suite.db.Put(ctx, satanFollowZork) + suite.NoError(err) + + // now they are mufos! + err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ + APObjectType: ap.ObjectProfile, + APActivityType: ap.ActivityDelete, + GTSModel: deletedAccount, + ReceivingAccount: receivingAccount, + }) + suite.NoError(err) + + // local account 2 blocked foss_satan, that block should be gone now + testBlock := suite.testBlocks["local_account_2_block_remote_account_1"] + dbBlock := >smodel.Block{} + err = suite.db.GetByID(ctx, testBlock.ID, dbBlock) + suite.ErrorIs(err, db.ErrNoEntries) + + // the mufos should be gone now too + satanFollowsZork, err := suite.db.IsFollowing(ctx, deletedAccount.ID, receivingAccount.ID) + suite.NoError(err) + suite.False(satanFollowsZork) + zorkFollowsSatan, err := suite.db.IsFollowing(ctx, receivingAccount.ID, deletedAccount.ID) + suite.NoError(err) + suite.False(zorkFollowsSatan) + + // no statuses from foss satan should be left in the database + if !testrig.WaitFor(func() bool { + s, err := suite.db.GetAccountStatuses(ctx, deletedAccount.ID, 0, false, false, "", "", false, false) + return s == nil && err == db.ErrNoEntries + }) { + suite.FailNow("timeout waiting for statuses to be deleted") + } + + dbAccount, err := suite.db.GetAccountByID(ctx, deletedAccount.ID) + suite.NoError(err) + + suite.Empty(dbAccount.Note) + suite.Empty(dbAccount.DisplayName) + suite.Empty(dbAccount.AvatarMediaAttachmentID) + suite.Empty(dbAccount.AvatarRemoteURL) + suite.Empty(dbAccount.HeaderMediaAttachmentID) + suite.Empty(dbAccount.HeaderRemoteURL) + suite.Empty(dbAccount.Reason) + suite.Empty(dbAccount.Fields) + suite.True(*dbAccount.HideCollections) + suite.False(*dbAccount.Discoverable) + suite.WithinDuration(time.Now(), dbAccount.SuspendedAt, 30*time.Second) + suite.Equal(dbAccount.ID, dbAccount.SuspensionOrigin) +} + +func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() { + ctx := context.Background() + + originAccount := suite.testAccounts["remote_account_1"] + + // target is a locked account + targetAccount := suite.testAccounts["local_account_2"] + + wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), targetAccount, stream.TimelineHome) + suite.NoError(errWithCode) + + // put the follow request in the database as though it had passed through the federating db already + satanFollowRequestTurtle := >smodel.FollowRequest{ + ID: "01FGRYAVAWWPP926J175QGM0WV", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + AccountID: originAccount.ID, + Account: originAccount, + TargetAccountID: targetAccount.ID, + TargetAccount: targetAccount, + ShowReblogs: util.Ptr(true), + URI: fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", originAccount.URI), + Notify: util.Ptr(false), + } + + err := suite.db.Put(ctx, satanFollowRequestTurtle) + suite.NoError(err) + + err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ + APObjectType: ap.ActivityFollow, + APActivityType: ap.ActivityCreate, + GTSModel: satanFollowRequestTurtle, + ReceivingAccount: targetAccount, + }) + suite.NoError(err) + + // a notification should be streamed + var msg *stream.Message + select { + case msg = <-wssStream.Messages: + // fine + case <-time.After(5 * time.Second): + suite.FailNow("no message from wssStream") + } + suite.Equal(stream.EventTypeNotification, msg.Event) + suite.NotEmpty(msg.Payload) + suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) + notif := &apimodel.Notification{} + err = json.Unmarshal([]byte(msg.Payload), notif) + suite.NoError(err) + suite.Equal("follow_request", notif.Type) + suite.Equal(originAccount.ID, notif.Account.ID) + + // no messages should have been sent out, since we didn't need to federate an accept + suite.Empty(suite.httpClient.SentMessages) +} + +func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() { + ctx := context.Background() + + originAccount := suite.testAccounts["remote_account_1"] + + // target is an unlocked account + targetAccount := suite.testAccounts["local_account_1"] + + wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), targetAccount, stream.TimelineHome) + suite.NoError(errWithCode) + + // put the follow request in the database as though it had passed through the federating db already + satanFollowRequestTurtle := >smodel.FollowRequest{ + ID: "01FGRYAVAWWPP926J175QGM0WV", + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + AccountID: originAccount.ID, + Account: originAccount, + TargetAccountID: targetAccount.ID, + TargetAccount: targetAccount, + ShowReblogs: util.Ptr(true), + URI: fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", originAccount.URI), + Notify: util.Ptr(false), + } + + err := suite.db.Put(ctx, satanFollowRequestTurtle) + suite.NoError(err) + + err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ + APObjectType: ap.ActivityFollow, + APActivityType: ap.ActivityCreate, + GTSModel: satanFollowRequestTurtle, + ReceivingAccount: targetAccount, + }) + suite.NoError(err) + + // an accept message should be sent to satan's inbox + var sent [][]byte + if !testrig.WaitFor(func() bool { + sentI, ok := suite.httpClient.SentMessages.Load(*originAccount.SharedInboxURI) + if ok { + sent, ok = sentI.([][]byte) + if !ok { + panic("SentMessages entry was not []byte") + } + return true + } + return false + }) { + suite.FailNow("timed out waiting for message") + } + + accept := &struct { + Actor string `json:"actor"` + ID string `json:"id"` + Object struct { + Actor string `json:"actor"` + ID string `json:"id"` + Object string `json:"object"` + To string `json:"to"` + Type string `json:"type"` + } + To string `json:"to"` + Type string `json:"type"` + }{} + err = json.Unmarshal(sent[0], accept) + suite.NoError(err) + + suite.Equal(targetAccount.URI, accept.Actor) + suite.Equal(originAccount.URI, accept.Object.Actor) + suite.Equal(satanFollowRequestTurtle.URI, accept.Object.ID) + suite.Equal(targetAccount.URI, accept.Object.Object) + suite.Equal(targetAccount.URI, accept.Object.To) + suite.Equal("Follow", accept.Object.Type) + suite.Equal(originAccount.URI, accept.To) + suite.Equal("Accept", accept.Type) + + // a notification should be streamed + var msg *stream.Message + select { + case msg = <-wssStream.Messages: + // fine + case <-time.After(5 * time.Second): + suite.FailNow("no message from wssStream") + } + suite.Equal(stream.EventTypeNotification, msg.Event) + suite.NotEmpty(msg.Payload) + suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) + notif := &apimodel.Notification{} + err = json.Unmarshal([]byte(msg.Payload), notif) + suite.NoError(err) + suite.Equal("follow", notif.Type) + suite.Equal(originAccount.ID, notif.Account.ID) +} + +// TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor. +func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() { + ctx := context.Background() + + receivingAccount := suite.testAccounts["local_account_1"] + statusCreator := suite.testAccounts["remote_account_2"] + + err := suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityCreate, + GTSModel: nil, // gtsmodel is nil because this is a forwarded status -- we want to dereference it using the iri + ReceivingAccount: receivingAccount, + APIri: testrig.URLMustParse("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1"), + }) + suite.NoError(err) + + // status should now be in the database, attributed to remote_account_2 + s, err := suite.db.GetStatusByURI(context.Background(), "http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1") + suite.NoError(err) + suite.Equal(statusCreator.URI, s.AccountURI) +} + +func TestFromFederatorTestSuite(t *testing.T) { + suite.Run(t, &FromFediAPITestSuite{}) +} diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go new file mode 100644 index 000000000..a3cf9a3e1 --- /dev/null +++ b/internal/processing/workers/surface.go @@ -0,0 +1,40 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/processing/stream" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +// surface wraps functions for 'surfacing' the result +// of processing a message, eg: +// - timelining a status +// - removing a status from timelines +// - sending a notification to a user +// - sending an email +type surface struct { + state *state.State + tc typeutils.TypeConverter + stream *stream.Processor + filter *visibility.Filter + emailSender email.Sender +} diff --git a/internal/processing/workers/surfaceemail.go b/internal/processing/workers/surfaceemail.go new file mode 100644 index 000000000..a6c97f48f --- /dev/null +++ b/internal/processing/workers/surfaceemail.go @@ -0,0 +1,160 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + "errors" + "time" + + "github.com/google/uuid" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/uris" +) + +func (s *surface) emailReportOpened(ctx context.Context, report *gtsmodel.Report) error { + instance, err := s.state.DB.GetInstance(ctx, config.GetHost()) + if err != nil { + return gtserror.Newf("error getting instance: %w", err) + } + + toAddresses, err := s.state.DB.GetInstanceModeratorAddresses(ctx) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + // No registered moderator addresses. + return nil + } + return gtserror.Newf("error getting instance moderator addresses: %w", err) + } + + if err := s.state.DB.PopulateReport(ctx, report); err != nil { + return gtserror.Newf("error populating report: %w", err) + } + + reportData := email.NewReportData{ + InstanceURL: instance.URI, + InstanceName: instance.Title, + ReportURL: instance.URI + "/settings/admin/reports/" + report.ID, + ReportDomain: report.Account.Domain, + ReportTargetDomain: report.TargetAccount.Domain, + } + + if err := s.emailSender.SendNewReportEmail(toAddresses, reportData); err != nil { + return gtserror.Newf("error emailing instance moderators: %w", err) + } + + return nil +} + +func (s *surface) emailReportClosed(ctx context.Context, report *gtsmodel.Report) error { + user, err := s.state.DB.GetUserByAccountID(ctx, report.Account.ID) + if err != nil { + return gtserror.Newf("db error getting user: %w", err) + } + + if user.ConfirmedAt.IsZero() || + !*user.Approved || + *user.Disabled || + user.Email == "" { + // Only email users who: + // - are confirmed + // - are approved + // - are not disabled + // - have an email address + return nil + } + + instance, err := s.state.DB.GetInstance(ctx, config.GetHost()) + if err != nil { + return gtserror.Newf("db error getting instance: %w", err) + } + + if err := s.state.DB.PopulateReport(ctx, report); err != nil { + return gtserror.Newf("error populating report: %w", err) + } + + reportClosedData := email.ReportClosedData{ + Username: report.Account.Username, + InstanceURL: instance.URI, + InstanceName: instance.Title, + ReportTargetUsername: report.TargetAccount.Username, + ReportTargetDomain: report.TargetAccount.Domain, + ActionTakenComment: report.ActionTaken, + } + + return s.emailSender.SendReportClosedEmail(user.Email, reportClosedData) +} + +func (s *surface) emailPleaseConfirm(ctx context.Context, user *gtsmodel.User, username string) error { + if user.UnconfirmedEmail == "" || + user.UnconfirmedEmail == user.Email { + // User has already confirmed this + // email address; nothing to do. + return nil + } + + instance, err := s.state.DB.GetInstance(ctx, config.GetHost()) + if err != nil { + return gtserror.Newf("db error getting instance: %w", err) + } + + // We need a token and a link for the + // user to click on. We'll use a uuid + // as our token since it's secure enough + // for this purpose. + var ( + confirmToken = uuid.NewString() + confirmLink = uris.GenerateURIForEmailConfirm(confirmToken) + ) + + // Assemble email contents and send the email. + if err := s.emailSender.SendConfirmEmail( + user.UnconfirmedEmail, + email.ConfirmData{ + Username: username, + InstanceURL: instance.URI, + InstanceName: instance.Title, + ConfirmLink: confirmLink, + }, + ); err != nil { + return err + } + + // Email sent, update the user entry + // with the new confirmation token. + now := time.Now() + user.ConfirmationToken = confirmToken + user.ConfirmationSentAt = now + user.LastEmailedAt = now + + if err := s.state.DB.UpdateUser( + ctx, + user, + "confirmation_token", + "confirmation_sent_at", + "last_emailed_at", + ); err != nil { + return gtserror.Newf("error updating user entry after email sent: %w", err) + } + + return nil +} diff --git a/internal/processing/workers/surfacenotify.go b/internal/processing/workers/surfacenotify.go new file mode 100644 index 000000000..00e1205e6 --- /dev/null +++ b/internal/processing/workers/surfacenotify.go @@ -0,0 +1,221 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" +) + +// notifyMentions notifies each targeted account in +// the given mentions that they have a new mention. +func (s *surface) notifyMentions( + ctx context.Context, + mentions []*gtsmodel.Mention, +) error { + var errs = gtserror.NewMultiError(len(mentions)) + + for _, mention := range mentions { + if err := s.notify( + ctx, + gtsmodel.NotificationMention, + mention.TargetAccountID, + mention.OriginAccountID, + mention.StatusID, + ); err != nil { + errs.Append(err) + } + } + + return errs.Combine() +} + +// notifyFollowRequest notifies the target of the given +// follow request that they have a new follow request. +func (s *surface) notifyFollowRequest( + ctx context.Context, + followRequest *gtsmodel.FollowRequest, +) error { + return s.notify( + ctx, + gtsmodel.NotificationFollowRequest, + followRequest.TargetAccountID, + followRequest.AccountID, + "", + ) +} + +// notifyFollow notifies the target of the given follow that +// they have a new follow. It will also remove any previous +// notification of a follow request, essentially replacing +// that notification. +func (s *surface) notifyFollow( + ctx context.Context, + follow *gtsmodel.Follow, +) error { + // Check if previous follow req notif exists. + prevNotif, err := s.state.DB.GetNotification( + gtscontext.SetBarebones(ctx), + gtsmodel.NotificationFollowRequest, + follow.TargetAccountID, + follow.AccountID, + "", + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error checking for previous follow request notification: %w", err) + } + + if prevNotif != nil { + // Previous notif existed, delete it. + if err := s.state.DB.DeleteNotificationByID(ctx, prevNotif.ID); err != nil { + return gtserror.Newf("db error removing previous follow request notification %s: %w", prevNotif.ID, err) + } + } + + // Now notify the follow itself. + return s.notify( + ctx, + gtsmodel.NotificationFollow, + follow.TargetAccountID, + follow.AccountID, + "", + ) +} + +// notifyFave notifies the target of the given +// fave that their status has been liked/faved. +func (s *surface) notifyFave( + ctx context.Context, + fave *gtsmodel.StatusFave, +) error { + if fave.TargetAccountID == fave.AccountID { + // Self-fave, nothing to do. + return nil + } + + return s.notify( + ctx, + gtsmodel.NotificationFave, + fave.TargetAccountID, + fave.AccountID, + fave.StatusID, + ) +} + +// notifyAnnounce notifies the status boost target +// account that their status has been boosted. +func (s *surface) notifyAnnounce( + ctx context.Context, + status *gtsmodel.Status, +) error { + if status.BoostOfID == "" { + // Not a boost, nothing to do. + return nil + } + + if status.BoostOfAccountID == status.AccountID { + // Self-boost, nothing to do. + return nil + } + + return s.notify( + ctx, + gtsmodel.NotificationReblog, + status.BoostOfAccountID, + status.AccountID, + status.ID, + ) +} + +// notify creates, inserts, and streams a new +// notification to the target account if it +// doesn't yet exist with the given parameters. +// +// It filters out non-local target accounts, so +// it is safe to pass all sorts of notification +// targets into this function without filtering +// for non-local first. +// +// targetAccountID and originAccountID must be +// set, but statusID can be an empty string. +func (s *surface) notify( + ctx context.Context, + notificationType gtsmodel.NotificationType, + targetAccountID string, + originAccountID string, + statusID string, +) error { + targetAccount, err := s.state.DB.GetAccountByID(ctx, targetAccountID) + if err != nil { + return gtserror.Newf("error getting target account %s: %w", targetAccountID, err) + } + + if !targetAccount.IsLocal() { + // Nothing to do. + return nil + } + + // Make sure a notification doesn't + // already exist with these params. + if _, err := s.state.DB.GetNotification( + gtscontext.SetBarebones(ctx), + notificationType, + targetAccountID, + originAccountID, + statusID, + ); err == nil { + // Notification exists; + // nothing to do. + return nil + } else if !errors.Is(err, db.ErrNoEntries) { + // Real error. + return gtserror.Newf("error checking existence of notification: %w", err) + } + + // Notification doesn't yet exist, so + // we need to create + store one. + notif := >smodel.Notification{ + ID: id.NewULID(), + NotificationType: notificationType, + TargetAccountID: targetAccountID, + OriginAccountID: originAccountID, + StatusID: statusID, + } + + if err := s.state.DB.PutNotification(ctx, notif); err != nil { + return gtserror.Newf("error putting notification in database: %w", err) + } + + // Stream notification to the user. + apiNotif, err := s.tc.NotificationToAPINotification(ctx, notif) + if err != nil { + return gtserror.Newf("error converting notification to api representation: %w", err) + } + + if err := s.stream.Notify(apiNotif, targetAccount); err != nil { + return gtserror.Newf("error streaming notification to account: %w", err) + } + + return nil +} diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go new file mode 100644 index 000000000..827cbe2f8 --- /dev/null +++ b/internal/processing/workers/surfacetimeline.go @@ -0,0 +1,401 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + "errors" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/stream" + "github.com/superseriousbusiness/gotosocial/internal/timeline" +) + +// timelineAndNotifyStatus inserts the given status into the HOME +// and LIST timelines of accounts that follow the status author. +// +// It will also handle notifications for any mentions attached to +// the account, and notifications for any local accounts that want +// to know when this account posts. +func (s *surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error { + // Ensure status fully populated; including account, mentions, etc. + if err := s.state.DB.PopulateStatus(ctx, status); err != nil { + return gtserror.Newf("error populating status with id %s: %w", status.ID, err) + } + + // Get all local followers of the account that posted the status. + follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID) + if err != nil { + return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err) + } + + // If the poster is also local, add a fake entry for them + // so they can see their own status in their timeline. + if status.Account.IsLocal() { + follows = append(follows, >smodel.Follow{ + AccountID: status.AccountID, + Account: status.Account, + Notify: func() *bool { b := false; return &b }(), // Account shouldn't notify itself. + ShowReblogs: func() *bool { b := true; return &b }(), // Account should show own reblogs. + }) + } + + // Timeline the status for each local follower of this account. + // This will also handle notifying any followers with notify + // set to true on their follow. + if err := s.timelineAndNotifyStatusForFollowers(ctx, status, follows); err != nil { + return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err) + } + + // Notify each local account that's mentioned by this status. + if err := s.notifyMentions(ctx, status.Mentions); err != nil { + return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err) + } + + return nil +} + +// timelineAndNotifyStatusForFollowers iterates through the given +// slice of followers of the account that posted the given status, +// adding the status to list timelines + home timelines of each +// follower, as appropriate, and notifying each follower of the +// new status, if the status is eligible for notification. +func (s *surface) timelineAndNotifyStatusForFollowers( + ctx context.Context, + status *gtsmodel.Status, + follows []*gtsmodel.Follow, +) error { + var ( + errs = new(gtserror.MultiError) + boost = status.BoostOfID != "" + reply = status.InReplyToURI != "" + ) + + for _, follow := range follows { + // Do an initial rough-grained check to see if the + // status is timelineable for this follower at all + // based on its visibility and who it replies to etc. + timelineable, err := s.filter.StatusHomeTimelineable( + ctx, follow.Account, status, + ) + if err != nil { + errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err) + continue + } + + if !timelineable { + // Nothing to do. + continue + } + + if boost && !*follow.ShowReblogs { + // Status is a boost, but the owner of + // this follow doesn't want to see boosts + // from this account. We can safely skip + // everything, then, because we also know + // that the follow owner won't want to be + // have the status put in any list timelines, + // or be notified about the status either. + continue + } + + // Add status to any relevant lists + // for this follow, if applicable. + s.listTimelineStatusForFollow( + ctx, + status, + follow, + errs, + ) + + // Add status to home timeline for owner + // of this follow, if applicable. + homeTimelined, err := s.timelineStatus( + ctx, + s.state.Timelines.Home.IngestOne, + follow.AccountID, // home timelines are keyed by account ID + follow.Account, + status, + stream.TimelineHome, + ) + if err != nil { + errs.Appendf("error home timelining status: %w", err) + continue + } + + if !homeTimelined { + // If status wasn't added to home + // timeline, we shouldn't notify it. + continue + } + + if !*follow.Notify { + // This follower doesn't have notifs + // set for this account's new posts. + continue + } + + if boost || reply { + // Don't notify for boosts or replies. + continue + } + + // If we reach here, we know: + // + // - This status is hometimelineable. + // - This status was added to the home timeline for this follower. + // - This follower wants to be notified when this account posts. + // - This is a top-level post (not a reply or boost). + // + // That means we can officially notify this one. + if err := s.notify( + ctx, + gtsmodel.NotificationStatus, + follow.AccountID, + status.AccountID, + status.ID, + ); err != nil { + errs.Appendf("error notifying account %s about new status: %w", follow.AccountID, err) + } + } + + return errs.Combine() +} + +// listTimelineStatusForFollow puts the given status +// in any eligible lists owned by the given follower. +func (s *surface) listTimelineStatusForFollow( + ctx context.Context, + status *gtsmodel.Status, + follow *gtsmodel.Follow, + errs *gtserror.MultiError, +) { + // To put this status in appropriate list timelines, + // we need to get each listEntry that pertains to + // this follow. Then, we want to iterate through all + // those list entries, and add the status to the list + // that the entry belongs to if it meets criteria for + // inclusion in the list. + + // Get every list entry that targets this follow's ID. + listEntries, err := s.state.DB.GetListEntriesForFollowID( + // We only need the list IDs. + gtscontext.SetBarebones(ctx), + follow.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + errs.Appendf("error getting list entries: %w", err) + return + } + + // Check eligibility for each list entry (if any). + for _, listEntry := range listEntries { + eligible, err := s.listEligible(ctx, listEntry, status) + if err != nil { + errs.Appendf("error checking list eligibility: %w", err) + continue + } + + if !eligible { + // Don't add this. + continue + } + + // At this point we are certain this status + // should be included in the timeline of the + // list that this list entry belongs to. + if _, err := s.timelineStatus( + ctx, + s.state.Timelines.List.IngestOne, + listEntry.ListID, // list timelines are keyed by list ID + follow.Account, + status, + stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list + ); err != nil { + errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err) + // implicit continue + } + } +} + +// listEligible checks if the given status is eligible +// for inclusion in the list that that the given listEntry +// belongs to, based on the replies policy of the list. +func (s *surface) listEligible( + ctx context.Context, + listEntry *gtsmodel.ListEntry, + status *gtsmodel.Status, +) (bool, error) { + if status.InReplyToURI == "" { + // If status is not a reply, + // then it's all gravy baby. + return true, nil + } + + if status.InReplyToID == "" { + // Status is a reply but we don't + // have the replied-to account! + return false, nil + } + + // Status is a reply to a known account. + // We need to fetch the list that this + // entry belongs to, in order to check + // the list's replies policy. + list, err := s.state.DB.GetListByID( + ctx, listEntry.ListID, + ) + if err != nil { + err := gtserror.Newf("db error getting list %s: %w", listEntry.ListID, err) + return false, err + } + + switch list.RepliesPolicy { + case gtsmodel.RepliesPolicyNone: + // This list should not show + // replies at all, so skip it. + return false, nil + + case gtsmodel.RepliesPolicyList: + // This list should show replies + // only to other people in the list. + // + // Check if replied-to account is + // also included in this list. + includes, err := s.state.DB.ListIncludesAccount( + ctx, + list.ID, + status.InReplyToAccountID, + ) + + if err != nil { + err := gtserror.Newf( + "db error checking if account %s in list %s: %w", + status.InReplyToAccountID, listEntry.ListID, err, + ) + return false, err + } + + return includes, nil + + case gtsmodel.RepliesPolicyFollowed: + // This list should show replies + // only to people that the list + // owner also follows. + // + // Check if replied-to account is + // followed by list owner account. + follows, err := s.state.DB.IsFollowing( + ctx, + list.AccountID, + status.InReplyToAccountID, + ) + if err != nil { + err := gtserror.Newf( + "db error checking if account %s is followed by %s: %w", + status.InReplyToAccountID, list.AccountID, err, + ) + return false, err + } + + return follows, nil + + default: + // HUH?? + err := gtserror.Newf( + "reply policy '%s' not recognized on list %s", + list.RepliesPolicy, list.ID, + ) + return false, err + } +} + +// timelineStatus uses the provided ingest function to put the given +// status in a timeline with the given ID, if it's timelineable. +// +// If the status was inserted into the timeline, true will be returned +// + it will also be streamed to the user using the given streamType. +func (s *surface) timelineStatus( + ctx context.Context, + ingest func(context.Context, string, timeline.Timelineable) (bool, error), + timelineID string, + account *gtsmodel.Account, + status *gtsmodel.Status, + streamType string, +) (bool, error) { + // Ingest status into given timeline using provided function. + if inserted, err := ingest(ctx, timelineID, status); err != nil { + err = gtserror.Newf("error ingesting status %s: %w", status.ID, err) + return false, err + } else if !inserted { + // Nothing more to do. + return false, nil + } + + // The status was inserted so stream it to the user. + apiStatus, err := s.tc.StatusToAPIStatus(ctx, status, account) + if err != nil { + err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err) + return true, err + } + + if err := s.stream.Update(apiStatus, account, []string{streamType}); err != nil { + err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err) + return true, err + } + + return true, nil +} + +// deleteStatusFromTimelines completely removes the given status from all timelines. +// It will also stream deletion of the status to all open streams. +func (s *surface) deleteStatusFromTimelines(ctx context.Context, statusID string) error { + if err := s.state.Timelines.Home.WipeItemFromAllTimelines(ctx, statusID); err != nil { + return err + } + + if err := s.state.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil { + return err + } + + return s.stream.Delete(statusID) +} + +// invalidateStatusFromTimelines does cache invalidation on the given status by +// unpreparing it from all timelines, forcing it to be prepared again (with updated +// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes +// both for the status itself, and for any boosts of the status. +func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID string) { + if err := s.state.Timelines.Home.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { + log. + WithContext(ctx). + WithField("statusID", statusID). + Errorf("error unpreparing status from home timelines: %v", err) + } + + if err := s.state.Timelines.List.UnprepareItemFromAllTimelines(ctx, statusID); err != nil { + log. + WithContext(ctx). + WithField("statusID", statusID). + Errorf("error unpreparing status from list timelines: %v", err) + } +} diff --git a/internal/processing/workers/wipestatus.go b/internal/processing/workers/wipestatus.go new file mode 100644 index 000000000..0891d9e24 --- /dev/null +++ b/internal/processing/workers/wipestatus.go @@ -0,0 +1,119 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/processing/media" + "github.com/superseriousbusiness/gotosocial/internal/state" +) + +// wipeStatus encapsulates common logic used to totally delete a status +// + all its attachments, notifications, boosts, and timeline entries. +type wipeStatus func(context.Context, *gtsmodel.Status, bool) error + +// wipeStatusF returns a wipeStatus util function. +func wipeStatusF(state *state.State, media *media.Processor, surface *surface) wipeStatus { + return func( + ctx context.Context, + statusToDelete *gtsmodel.Status, + deleteAttachments bool, + ) error { + errs := new(gtserror.MultiError) + + // Either delete all attachments for this status, + // or simply unattach + clean them separately later. + // + // Reason to unattach rather than delete is that + // the poster might want to reattach them to another + // status immediately (in case of delete + redraft) + if deleteAttachments { + // todo:state.DB.DeleteAttachmentsForStatus + for _, a := range statusToDelete.AttachmentIDs { + if err := media.Delete(ctx, a); err != nil { + errs.Appendf("error deleting media: %w", err) + } + } + } else { + // todo:state.DB.UnattachAttachmentsForStatus + for _, a := range statusToDelete.AttachmentIDs { + if _, err := media.Unattach(ctx, statusToDelete.Account, a); err != nil { + errs.Appendf("error unattaching media: %w", err) + } + } + } + + // delete all mention entries generated by this status + // todo:state.DB.DeleteMentionsForStatus + for _, id := range statusToDelete.MentionIDs { + if err := state.DB.DeleteMentionByID(ctx, id); err != nil { + errs.Appendf("error deleting status mention: %w", err) + } + } + + // delete all notification entries generated by this status + if err := state.DB.DeleteNotificationsForStatus(ctx, statusToDelete.ID); err != nil { + errs.Appendf("error deleting status notifications: %w", err) + } + + // delete all bookmarks that point to this status + if err := state.DB.DeleteStatusBookmarksForStatus(ctx, statusToDelete.ID); err != nil { + errs.Appendf("error deleting status bookmarks: %w", err) + } + + // delete all faves of this status + if err := state.DB.DeleteStatusFavesForStatus(ctx, statusToDelete.ID); err != nil { + errs.Appendf("error deleting status faves: %w", err) + } + + // delete all boosts for this status + remove them from timelines + boosts, err := state.DB.GetStatusBoosts( + // we MUST set a barebones context here, + // as depending on where it came from the + // original BoostOf may already be gone. + gtscontext.SetBarebones(ctx), + statusToDelete.ID) + if err != nil { + errs.Appendf("error fetching status boosts: %w", err) + } + for _, b := range boosts { + if err := surface.deleteStatusFromTimelines(ctx, b.ID); err != nil { + errs.Appendf("error deleting boost from timelines: %w", err) + } + if err := state.DB.DeleteStatusByID(ctx, b.ID); err != nil { + errs.Appendf("error deleting boost: %w", err) + } + } + + // delete this status from any and all timelines + if err := surface.deleteStatusFromTimelines(ctx, statusToDelete.ID); err != nil { + errs.Appendf("error deleting status from timelines: %w", err) + } + + // finally, delete the status itself + if err := state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil { + errs.Appendf("error deleting status: %w", err) + } + + return errs.Combine() + } +} diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go new file mode 100644 index 000000000..24b18a405 --- /dev/null +++ b/internal/processing/workers/workers.go @@ -0,0 +1,92 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers + +import ( + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/media" + "github.com/superseriousbusiness/gotosocial/internal/processing/stream" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" + "github.com/superseriousbusiness/gotosocial/internal/workers" +) + +type Processor struct { + workers *workers.Workers + clientAPI *clientAPI + fediAPI *fediAPI +} + +func New( + state *state.State, + federator federation.Federator, + tc typeutils.TypeConverter, + filter *visibility.Filter, + emailSender email.Sender, + account *account.Processor, + media *media.Processor, + stream *stream.Processor, +) Processor { + // Init surface logic + // wrapper struct. + surface := &surface{ + state: state, + tc: tc, + stream: stream, + filter: filter, + emailSender: emailSender, + } + + // Init federate logic + // wrapper struct. + federate := &federate{ + Federator: federator, + state: state, + tc: tc, + } + + // Init shared logic wipe + // status util func. + wipeStatus := wipeStatusF( + state, + media, + surface, + ) + + return Processor{ + workers: &state.Workers, + clientAPI: &clientAPI{ + state: state, + tc: tc, + surface: surface, + federate: federate, + wipeStatus: wipeStatus, + account: account, + }, + fediAPI: &fediAPI{ + state: state, + surface: surface, + federate: federate, + wipeStatus: wipeStatus, + account: account, + }, + } +} diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go new file mode 100644 index 000000000..2d5a7f5d3 --- /dev/null +++ b/internal/processing/workers/workers_test.go @@ -0,0 +1,169 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package workers_test + +import ( + "context" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/email" + "github.com/superseriousbusiness/gotosocial/internal/federation" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/media" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/processing" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" + "github.com/superseriousbusiness/gotosocial/internal/stream" + "github.com/superseriousbusiness/gotosocial/internal/transport" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type WorkersTestSuite struct { + // standard suite interfaces + suite.Suite + db db.DB + storage *storage.Driver + state state.State + mediaManager *media.Manager + typeconverter typeutils.TypeConverter + httpClient *testrig.MockHTTPClient + transportController transport.Controller + federator federation.Federator + oauthServer oauth.Server + emailSender email.Sender + + // standard suite models + testTokens map[string]*gtsmodel.Token + testClients map[string]*gtsmodel.Client + testApplications map[string]*gtsmodel.Application + testUsers map[string]*gtsmodel.User + testAccounts map[string]*gtsmodel.Account + testFollows map[string]*gtsmodel.Follow + testAttachments map[string]*gtsmodel.MediaAttachment + testStatuses map[string]*gtsmodel.Status + testTags map[string]*gtsmodel.Tag + testMentions map[string]*gtsmodel.Mention + testAutheds map[string]*oauth.Auth + testBlocks map[string]*gtsmodel.Block + testActivities map[string]testrig.ActivityWithSignature + testLists map[string]*gtsmodel.List + testListEntries map[string]*gtsmodel.ListEntry + + processor *processing.Processor +} + +func (suite *WorkersTestSuite) SetupSuite() { + suite.testTokens = testrig.NewTestTokens() + suite.testClients = testrig.NewTestClients() + suite.testApplications = testrig.NewTestApplications() + suite.testUsers = testrig.NewTestUsers() + suite.testAccounts = testrig.NewTestAccounts() + suite.testFollows = testrig.NewTestFollows() + suite.testAttachments = testrig.NewTestAttachments() + suite.testStatuses = testrig.NewTestStatuses() + suite.testTags = testrig.NewTestTags() + suite.testMentions = testrig.NewTestMentions() + suite.testAutheds = map[string]*oauth.Auth{ + "local_account_1": { + Application: suite.testApplications["local_account_1"], + User: suite.testUsers["local_account_1"], + Account: suite.testAccounts["local_account_1"], + }, + } + suite.testBlocks = testrig.NewTestBlocks() + suite.testLists = testrig.NewTestLists() + suite.testListEntries = testrig.NewTestListEntries() +} + +func (suite *WorkersTestSuite) SetupTest() { + suite.state.Caches.Init() + testrig.StartWorkers(&suite.state) + + testrig.InitTestConfig() + testrig.InitTestLog() + + suite.db = testrig.NewTestDB(&suite.state) + suite.state.DB = suite.db + suite.testActivities = testrig.NewTestActivities(suite.testAccounts) + suite.storage = testrig.NewInMemoryStorage() + suite.state.Storage = suite.storage + suite.typeconverter = testrig.NewTestTypeConverter(suite.db) + + testrig.StartTimelines( + &suite.state, + visibility.NewFilter(&suite.state), + suite.typeconverter, + ) + + suite.httpClient = testrig.NewMockHTTPClient(nil, "../../../testrig/media") + suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople() + suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses() + + suite.transportController = testrig.NewTestTransportController(&suite.state, suite.httpClient) + suite.mediaManager = testrig.NewTestMediaManager(&suite.state) + suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager) + suite.oauthServer = testrig.NewTestOauthServer(suite.db) + suite.emailSender = testrig.NewEmailSender("../../../web/template/", nil) + + suite.processor = processing.NewProcessor(suite.typeconverter, suite.federator, suite.oauthServer, suite.mediaManager, &suite.state, suite.emailSender) + suite.state.Workers.EnqueueClientAPI = suite.processor.Workers().EnqueueClientAPI + suite.state.Workers.EnqueueFediAPI = suite.processor.Workers().EnqueueFediAPI + + testrig.StandardDBSetup(suite.db, suite.testAccounts) + testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") +} + +func (suite *WorkersTestSuite) TearDownTest() { + testrig.StandardDBTeardown(suite.db) + testrig.StandardStorageTeardown(suite.storage) + testrig.StopWorkers(&suite.state) +} + +func (suite *WorkersTestSuite) openStreams(ctx context.Context, account *gtsmodel.Account, listIDs []string) map[string]*stream.Stream { + streams := make(map[string]*stream.Stream) + + for _, streamType := range []string{ + stream.TimelineHome, + stream.TimelinePublic, + stream.TimelineNotifications, + } { + stream, err := suite.processor.Stream().Open(ctx, account, streamType) + if err != nil { + suite.FailNow(err.Error()) + } + + streams[streamType] = stream + } + + for _, listID := range listIDs { + streamType := stream.TimelineList + ":" + listID + + stream, err := suite.processor.Stream().Open(ctx, account, streamType) + if err != nil { + suite.FailNow(err.Error()) + } + + streams[streamType] = stream + } + + return streams +} |