summaryrefslogtreecommitdiff
path: root/internal/processing/workers
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2023-08-09 19:14:33 +0200
committerLibravatar GitHub <noreply@github.com>2023-08-09 19:14:33 +0200
commit9770d54237bea828cab7e50aec7dff452c203138 (patch)
tree59c444a02e81925bab47d3656a489a8c7087d530 /internal/processing/workers
parent[bugfix] Fix incorrect per-loop variable capture (#2092) (diff)
downloadgotosocial-9770d54237bea828cab7e50aec7dff452c203138.tar.xz
[feature] List replies policy, refactor async workers (#2087)
* Add/update some DB functions. * move async workers into subprocessor * rename FromFederator -> FromFediAPI * update home timeline check to include check for current status first before moving to parent status * change streamMap to pointer to mollify linter * update followtoas func signature * fix merge * remove errant debug log * don't use separate errs.Combine() check to wrap errs * wrap parts of workers functionality in sub-structs * populate report using new db funcs * embed federator (tiny bit tidier) * flesh out error msg, add continue(!) * fix other error messages to be more specific * better, nicer * give parseURI util function a bit more util * missing headers * use pointers for subprocessors
Diffstat (limited to 'internal/processing/workers')
-rw-r--r--internal/processing/workers/federate.go892
-rw-r--r--internal/processing/workers/fromclientapi.go548
-rw-r--r--internal/processing/workers/fromclientapi_test.go589
-rw-r--r--internal/processing/workers/fromfediapi.go540
-rw-r--r--internal/processing/workers/fromfediapi_test.go565
-rw-r--r--internal/processing/workers/surface.go40
-rw-r--r--internal/processing/workers/surfaceemail.go160
-rw-r--r--internal/processing/workers/surfacenotify.go221
-rw-r--r--internal/processing/workers/surfacetimeline.go401
-rw-r--r--internal/processing/workers/wipestatus.go119
-rw-r--r--internal/processing/workers/workers.go92
-rw-r--r--internal/processing/workers/workers_test.go169
12 files changed, 4336 insertions, 0 deletions
diff --git a/internal/processing/workers/federate.go b/internal/processing/workers/federate.go
new file mode 100644
index 000000000..76bfc892e
--- /dev/null
+++ b/internal/processing/workers/federate.go
@@ -0,0 +1,892 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "context"
+ "net/url"
+
+ "github.com/superseriousbusiness/activity/pub"
+ "github.com/superseriousbusiness/activity/streams"
+ "github.com/superseriousbusiness/gotosocial/internal/federation"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+)
+
+// federate wraps functions for federating
+// something out via ActivityPub in response
+// to message processing.
+type federate struct {
+ // Embed federator to give access
+ // to send and retrieve functions.
+ federation.Federator
+ state *state.State
+ tc typeutils.TypeConverter
+}
+
+// parseURI is a cheeky little
+// shortcut to wrap parsing errors.
+//
+// The returned err will be prepended
+// with the name of the function that
+// called this function, so it can be
+// returned without further wrapping.
+func parseURI(s string) (*url.URL, error) {
+ const (
+ // Provides enough calldepth to
+ // prepend the name of whatever
+ // function called *this* one,
+ // so that they don't have to
+ // wrap the error themselves.
+ calldepth = 3
+ errFmt = "error parsing uri %s: %w"
+ )
+
+ uri, err := url.Parse(s)
+ if err != nil {
+ return nil, gtserror.NewfAt(calldepth, errFmt, s, err)
+ }
+
+ return uri, err
+}
+
+func (f *federate) DeleteAccount(ctx context.Context, account *gtsmodel.Account) error {
+ // Do nothing if it's not our
+ // account that's been deleted.
+ if !account.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ actorIRI, err := parseURI(account.URI)
+ if err != nil {
+ return err
+ }
+
+ followersIRI, err := parseURI(account.FollowersURI)
+ if err != nil {
+ return err
+ }
+
+ publicIRI, err := parseURI(pub.PublicActivityPubIRI)
+ if err != nil {
+ return err
+ }
+
+ // Create a new delete.
+ // todo: tc.AccountToASDelete
+ delete := streams.NewActivityStreamsDelete()
+
+ // Set the Actor for the delete; no matter
+ // who actually did the delete, we should
+ // use the account owner for this.
+ deleteActor := streams.NewActivityStreamsActorProperty()
+ deleteActor.AppendIRI(actorIRI)
+ delete.SetActivityStreamsActor(deleteActor)
+
+ // Set the account's IRI as the 'object' property.
+ deleteObject := streams.NewActivityStreamsObjectProperty()
+ deleteObject.AppendIRI(actorIRI)
+ delete.SetActivityStreamsObject(deleteObject)
+
+ // Address the delete To followers.
+ deleteTo := streams.NewActivityStreamsToProperty()
+ deleteTo.AppendIRI(followersIRI)
+ delete.SetActivityStreamsTo(deleteTo)
+
+ // Address the delete CC public.
+ deleteCC := streams.NewActivityStreamsCcProperty()
+ deleteCC.AppendIRI(publicIRI)
+ delete.SetActivityStreamsCc(deleteCC)
+
+ // Send the Delete via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, delete,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ delete, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) CreateStatus(ctx context.Context, status *gtsmodel.Status) error {
+ // Do nothing if the status
+ // shouldn't be federated.
+ if !*status.Federated {
+ return nil
+ }
+
+ // Do nothing if this
+ // isn't our status.
+ if !*status.Local {
+ return nil
+ }
+
+ // Populate model.
+ if err := f.state.DB.PopulateStatus(ctx, status); err != nil {
+ return gtserror.Newf("error populating status: %w", err)
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(status.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Convert status to an ActivityStreams
+ // Note, wrapped in a Create activity.
+ asStatus, err := f.tc.StatusToAS(ctx, status)
+ if err != nil {
+ return gtserror.Newf("error converting status to AS: %w", err)
+ }
+
+ create, err := f.tc.WrapNoteInCreate(asStatus, false)
+ if err != nil {
+ return gtserror.Newf("error wrapping status in create: %w", err)
+ }
+
+ // Send the Create via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, create,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ create, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) DeleteStatus(ctx context.Context, status *gtsmodel.Status) error {
+ // Do nothing if the status
+ // shouldn't be federated.
+ if !*status.Federated {
+ return nil
+ }
+
+ // Do nothing if this
+ // isn't our status.
+ if !*status.Local {
+ return nil
+ }
+
+ // Populate model.
+ if err := f.state.DB.PopulateStatus(ctx, status); err != nil {
+ return gtserror.Newf("error populating status: %w", err)
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(status.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Wrap the status URI in a Delete activity.
+ delete, err := f.tc.StatusToASDelete(ctx, status)
+ if err != nil {
+ return gtserror.Newf("error creating Delete: %w", err)
+ }
+
+ // Send the Delete via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, delete,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ delete, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) Follow(ctx context.Context, follow *gtsmodel.Follow) error {
+ // Populate model.
+ if err := f.state.DB.PopulateFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error populating follow: %w", err)
+ }
+
+ // Do nothing if both accounts are local.
+ if follow.Account.IsLocal() &&
+ follow.TargetAccount.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(follow.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Convert follow to ActivityStreams Follow.
+ asFollow, err := f.tc.FollowToAS(ctx, follow)
+ if err != nil {
+ return gtserror.Newf("error converting follow to AS: %s", err)
+ }
+
+ // Send the Follow via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, asFollow,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ asFollow, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) UndoFollow(ctx context.Context, follow *gtsmodel.Follow) error {
+ // Populate model.
+ if err := f.state.DB.PopulateFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error populating follow: %w", err)
+ }
+
+ // Do nothing if both accounts are local.
+ if follow.Account.IsLocal() &&
+ follow.TargetAccount.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(follow.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ targetAccountIRI, err := parseURI(follow.TargetAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ // Recreate the ActivityStreams Follow.
+ asFollow, err := f.tc.FollowToAS(ctx, follow)
+ if err != nil {
+ return gtserror.Newf("error converting follow to AS: %w", err)
+ }
+
+ // Create a new Undo.
+ // todo: tc.FollowToASUndo
+ undo := streams.NewActivityStreamsUndo()
+
+ // Set the Actor for the Undo:
+ // same as the actor for the Follow.
+ undo.SetActivityStreamsActor(asFollow.GetActivityStreamsActor())
+
+ // Set recreated Follow as the 'object' property.
+ //
+ // For most AP implementations, it's not enough
+ // to just send the URI of the original Follow,
+ // we have to send the whole object again.
+ undoObject := streams.NewActivityStreamsObjectProperty()
+ undoObject.AppendActivityStreamsFollow(asFollow)
+ undo.SetActivityStreamsObject(undoObject)
+
+ // Address the Undo To the target account.
+ undoTo := streams.NewActivityStreamsToProperty()
+ undoTo.AppendIRI(targetAccountIRI)
+ undo.SetActivityStreamsTo(undoTo)
+
+ // Send the Undo via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, undo,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ undo, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) UndoLike(ctx context.Context, fave *gtsmodel.StatusFave) error {
+ // Populate model.
+ if err := f.state.DB.PopulateStatusFave(ctx, fave); err != nil {
+ return gtserror.Newf("error populating fave: %w", err)
+ }
+
+ // Do nothing if both accounts are local.
+ if fave.Account.IsLocal() &&
+ fave.TargetAccount.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(fave.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ targetAccountIRI, err := parseURI(fave.TargetAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ // Recreate the ActivityStreams Like.
+ like, err := f.tc.FaveToAS(ctx, fave)
+ if err != nil {
+ return gtserror.Newf("error converting fave to AS: %w", err)
+ }
+
+ // Create a new Undo.
+ // todo: tc.FaveToASUndo
+ undo := streams.NewActivityStreamsUndo()
+
+ // Set the Actor for the Undo:
+ // same as the actor for the Like.
+ undo.SetActivityStreamsActor(like.GetActivityStreamsActor())
+
+ // Set recreated Like as the 'object' property.
+ //
+ // For most AP implementations, it's not enough
+ // to just send the URI of the original Like,
+ // we have to send the whole object again.
+ undoObject := streams.NewActivityStreamsObjectProperty()
+ undoObject.AppendActivityStreamsLike(like)
+ undo.SetActivityStreamsObject(undoObject)
+
+ // Address the Undo To the target account.
+ undoTo := streams.NewActivityStreamsToProperty()
+ undoTo.AppendIRI(targetAccountIRI)
+ undo.SetActivityStreamsTo(undoTo)
+
+ // Send the Undo via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, undo,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ undo, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) UndoAnnounce(ctx context.Context, boost *gtsmodel.Status) error {
+ // Populate model.
+ if err := f.state.DB.PopulateStatus(ctx, boost); err != nil {
+ return gtserror.Newf("error populating status: %w", err)
+ }
+
+ // Do nothing if boosting
+ // account isn't ours.
+ if !boost.Account.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(boost.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Recreate the ActivityStreams Announce.
+ asAnnounce, err := f.tc.BoostToAS(
+ ctx,
+ boost,
+ boost.Account,
+ boost.BoostOfAccount,
+ )
+ if err != nil {
+ return gtserror.Newf("error converting boost to AS: %w", err)
+ }
+
+ // Create a new Undo.
+ // todo: tc.AnnounceToASUndo
+ undo := streams.NewActivityStreamsUndo()
+
+ // Set the Actor for the Undo:
+ // same as the actor for the Announce.
+ undo.SetActivityStreamsActor(asAnnounce.GetActivityStreamsActor())
+
+ // Set recreated Announce as the 'object' property.
+ //
+ // For most AP implementations, it's not enough
+ // to just send the URI of the original Announce,
+ // we have to send the whole object again.
+ undoObject := streams.NewActivityStreamsObjectProperty()
+ undoObject.AppendActivityStreamsAnnounce(asAnnounce)
+ undo.SetActivityStreamsObject(undoObject)
+
+ // Address the Undo To the Announce To.
+ undo.SetActivityStreamsTo(asAnnounce.GetActivityStreamsTo())
+
+ // Address the Undo CC the Announce CC.
+ undo.SetActivityStreamsCc(asAnnounce.GetActivityStreamsCc())
+
+ // Send the Undo via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, undo,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ undo, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) AcceptFollow(ctx context.Context, follow *gtsmodel.Follow) error {
+ // Populate model.
+ if err := f.state.DB.PopulateFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error populating follow: %w", err)
+ }
+
+ // Bail if requesting account is ours:
+ // we've already accepted internally and
+ // shouldn't send an Accept to ourselves.
+ if follow.Account.IsLocal() {
+ return nil
+ }
+
+ // Bail if target account isn't ours:
+ // we can't Accept a follow on
+ // another instance's behalf.
+ if follow.TargetAccount.IsRemote() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(follow.TargetAccount.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ acceptingAccountIRI, err := parseURI(follow.TargetAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ requestingAccountIRI, err := parseURI(follow.Account.URI)
+ if err != nil {
+ return err
+ }
+
+ // Recreate the ActivityStreams Follow.
+ asFollow, err := f.tc.FollowToAS(ctx, follow)
+ if err != nil {
+ return gtserror.Newf("error converting follow to AS: %w", err)
+ }
+
+ // Create a new Accept.
+ // todo: tc.FollowToASAccept
+ accept := streams.NewActivityStreamsAccept()
+
+ // Set the requestee as Actor of the Accept.
+ acceptActorProp := streams.NewActivityStreamsActorProperty()
+ acceptActorProp.AppendIRI(acceptingAccountIRI)
+ accept.SetActivityStreamsActor(acceptActorProp)
+
+ // Set recreated Follow as the 'object' property.
+ //
+ // For most AP implementations, it's not enough
+ // to just send the URI of the original Follow,
+ // we have to send the whole object again.
+ acceptObject := streams.NewActivityStreamsObjectProperty()
+ acceptObject.AppendActivityStreamsFollow(asFollow)
+ accept.SetActivityStreamsObject(acceptObject)
+
+ // Address the Accept To the Follow requester.
+ acceptTo := streams.NewActivityStreamsToProperty()
+ acceptTo.AppendIRI(requestingAccountIRI)
+ accept.SetActivityStreamsTo(acceptTo)
+
+ // Send the Accept via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, accept,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ accept, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) RejectFollow(ctx context.Context, follow *gtsmodel.Follow) error {
+ // Ensure follow populated before proceeding.
+ if err := f.state.DB.PopulateFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error populating follow: %w", err)
+ }
+
+ // Bail if requesting account is ours:
+ // we've already rejected internally and
+ // shouldn't send an Reject to ourselves.
+ if follow.Account.IsLocal() {
+ return nil
+ }
+
+ // Bail if target account isn't ours:
+ // we can't Reject a follow on
+ // another instance's behalf.
+ if follow.TargetAccount.IsRemote() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(follow.TargetAccount.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ rejectingAccountIRI, err := parseURI(follow.TargetAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ requestingAccountIRI, err := parseURI(follow.Account.URI)
+ if err != nil {
+ return err
+ }
+
+ // Recreate the ActivityStreams Follow.
+ asFollow, err := f.tc.FollowToAS(ctx, follow)
+ if err != nil {
+ return gtserror.Newf("error converting follow to AS: %w", err)
+ }
+
+ // Create a new Reject.
+ // todo: tc.FollowRequestToASReject
+ reject := streams.NewActivityStreamsReject()
+
+ // Set the requestee as Actor of the Reject.
+ rejectActorProp := streams.NewActivityStreamsActorProperty()
+ rejectActorProp.AppendIRI(rejectingAccountIRI)
+ reject.SetActivityStreamsActor(rejectActorProp)
+
+ // Set recreated Follow as the 'object' property.
+ //
+ // For most AP implementations, it's not enough
+ // to just send the URI of the original Follow,
+ // we have to send the whole object again.
+ rejectObject := streams.NewActivityStreamsObjectProperty()
+ rejectObject.AppendActivityStreamsFollow(asFollow)
+ reject.SetActivityStreamsObject(rejectObject)
+
+ // Address the Reject To the Follow requester.
+ rejectTo := streams.NewActivityStreamsToProperty()
+ rejectTo.AppendIRI(requestingAccountIRI)
+ reject.SetActivityStreamsTo(rejectTo)
+
+ // Send the Reject via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, reject,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ reject, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) Like(ctx context.Context, fave *gtsmodel.StatusFave) error {
+ // Populate model.
+ if err := f.state.DB.PopulateStatusFave(ctx, fave); err != nil {
+ return gtserror.Newf("error populating fave: %w", err)
+ }
+
+ // Do nothing if both accounts are local.
+ if fave.Account.IsLocal() &&
+ fave.TargetAccount.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(fave.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Create the ActivityStreams Like.
+ like, err := f.tc.FaveToAS(ctx, fave)
+ if err != nil {
+ return gtserror.Newf("error converting fave to AS Like: %w", err)
+ }
+
+ // Send the Like via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, like,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ like, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) Announce(ctx context.Context, boost *gtsmodel.Status) error {
+ // Populate model.
+ if err := f.state.DB.PopulateStatus(ctx, boost); err != nil {
+ return gtserror.Newf("error populating status: %w", err)
+ }
+
+ // Do nothing if boosting
+ // account isn't ours.
+ if !boost.Account.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(boost.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Create the ActivityStreams Announce.
+ announce, err := f.tc.BoostToAS(
+ ctx,
+ boost,
+ boost.Account,
+ boost.BoostOfAccount,
+ )
+ if err != nil {
+ return gtserror.Newf("error converting boost to AS: %w", err)
+ }
+
+ // Send the Announce via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, announce,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ announce, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) UpdateAccount(ctx context.Context, account *gtsmodel.Account) error {
+ // Populate model.
+ if err := f.state.DB.PopulateAccount(ctx, account); err != nil {
+ return gtserror.Newf("error populating account: %w", err)
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Convert account to ActivityStreams Person.
+ person, err := f.tc.AccountToAS(ctx, account)
+ if err != nil {
+ return gtserror.Newf("error converting account to Person: %w", err)
+ }
+
+ // Use ActivityStreams Person as Object of Update.
+ update, err := f.tc.WrapPersonInUpdate(person, account)
+ if err != nil {
+ return gtserror.Newf("error wrapping Person in Update: %w", err)
+ }
+
+ // Send the Update via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, update,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ update, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) Block(ctx context.Context, block *gtsmodel.Block) error {
+ // Populate model.
+ if err := f.state.DB.PopulateBlock(ctx, block); err != nil {
+ return gtserror.Newf("error populating block: %w", err)
+ }
+
+ // Do nothing if both accounts are local.
+ if block.Account.IsLocal() &&
+ block.TargetAccount.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(block.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ // Convert block to ActivityStreams Block.
+ asBlock, err := f.tc.BlockToAS(ctx, block)
+ if err != nil {
+ return gtserror.Newf("error converting block to AS: %w", err)
+ }
+
+ // Send the Block via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, asBlock,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ asBlock, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) UndoBlock(ctx context.Context, block *gtsmodel.Block) error {
+ // Populate model.
+ if err := f.state.DB.PopulateBlock(ctx, block); err != nil {
+ return gtserror.Newf("error populating block: %w", err)
+ }
+
+ // Do nothing if both accounts are local.
+ if block.Account.IsLocal() &&
+ block.TargetAccount.IsLocal() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(block.Account.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ targetAccountIRI, err := parseURI(block.TargetAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ // Convert block to ActivityStreams Block.
+ asBlock, err := f.tc.BlockToAS(ctx, block)
+ if err != nil {
+ return gtserror.Newf("error converting block to AS: %w", err)
+ }
+
+ // Create a new Undo.
+ // todo: tc.BlockToASUndo
+ undo := streams.NewActivityStreamsUndo()
+
+ // Set the Actor for the Undo:
+ // same as the actor for the Block.
+ undo.SetActivityStreamsActor(asBlock.GetActivityStreamsActor())
+
+ // Set Block as the 'object' property.
+ //
+ // For most AP implementations, it's not enough
+ // to just send the URI of the original Block,
+ // we have to send the whole object again.
+ undoObject := streams.NewActivityStreamsObjectProperty()
+ undoObject.AppendActivityStreamsBlock(asBlock)
+ undo.SetActivityStreamsObject(undoObject)
+
+ // Address the Undo To the target account.
+ undoTo := streams.NewActivityStreamsToProperty()
+ undoTo.AppendIRI(targetAccountIRI)
+ undo.SetActivityStreamsTo(undoTo)
+
+ // Send the Undo via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, undo,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ undo, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) Flag(ctx context.Context, report *gtsmodel.Report) error {
+ // Populate model.
+ if err := f.state.DB.PopulateReport(ctx, report); err != nil {
+ return gtserror.Newf("error populating report: %w", err)
+ }
+
+ // Do nothing if report target
+ // is not remote account.
+ if report.TargetAccount.IsLocal() {
+ return nil
+ }
+
+ // Get our instance account from the db:
+ // to anonymize the report, we'll deliver
+ // using the outbox of the instance account.
+ instanceAcct, err := f.state.DB.GetInstanceAccount(ctx, "")
+ if err != nil {
+ return gtserror.Newf("error getting instance account: %w", err)
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(instanceAcct.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ targetAccountIRI, err := parseURI(report.TargetAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ // Convert report to ActivityStreams Flag.
+ flag, err := f.tc.ReportToASFlag(ctx, report)
+ if err != nil {
+ return gtserror.Newf("error converting report to AS: %w", err)
+ }
+
+ // To is not set explicitly on Flags. Instead,
+ // address Flag BTo report target account URI.
+ // This ensures that our federating actor still
+ // knows where to send the report, but the BTo
+ // property will be stripped before sending.
+ //
+ // Happily, BTo does not prevent federating
+ // actor from using shared inbox to deliver.
+ bTo := streams.NewActivityStreamsBtoProperty()
+ bTo.AppendIRI(targetAccountIRI)
+ flag.SetActivityStreamsBto(bTo)
+
+ // Send the Flag via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, flag,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T via outbox %s: %w",
+ flag, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go
new file mode 100644
index 000000000..40efc20bb
--- /dev/null
+++ b/internal/processing/workers/fromclientapi.go
@@ -0,0 +1,548 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "context"
+ "errors"
+
+ "codeberg.org/gruf/go-kv"
+ "codeberg.org/gruf/go-logger/v2/level"
+ "github.com/superseriousbusiness/gotosocial/internal/ap"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+)
+
+// clientAPI wraps processing functions
+// specifically for messages originating
+// from the client/REST API.
+type clientAPI struct {
+ state *state.State
+ tc typeutils.TypeConverter
+ surface *surface
+ federate *federate
+ wipeStatus wipeStatus
+ account *account.Processor
+}
+
+func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) {
+ log.Trace(ctx, "enqueuing")
+ _ = p.workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ for _, msg := range msgs {
+ log.Trace(ctx, "processing: %+v", msg)
+ if err := p.ProcessFromClientAPI(ctx, msg); err != nil {
+ log.Errorf(ctx, "error processing client API message: %v", err)
+ }
+ }
+ })
+}
+
+func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg messages.FromClientAPI) error {
+ // Allocate new log fields slice
+ fields := make([]kv.Field, 3, 4)
+ fields[0] = kv.Field{"activityType", cMsg.APActivityType}
+ fields[1] = kv.Field{"objectType", cMsg.APObjectType}
+ fields[2] = kv.Field{"fromAccount", cMsg.OriginAccount.Username}
+
+ // Include GTSModel in logs if appropriate.
+ if cMsg.GTSModel != nil &&
+ log.Level() >= level.DEBUG {
+ fields = append(fields, kv.Field{
+ "model", cMsg.GTSModel,
+ })
+ }
+
+ l := log.WithContext(ctx).WithFields(fields...)
+ l.Info("processing from client API")
+
+ switch cMsg.APActivityType {
+
+ // CREATE SOMETHING
+ case ap.ActivityCreate:
+ switch cMsg.APObjectType {
+
+ // CREATE PROFILE/ACCOUNT
+ case ap.ObjectProfile, ap.ActorPerson:
+ return p.clientAPI.CreateAccount(ctx, cMsg)
+
+ // CREATE NOTE/STATUS
+ case ap.ObjectNote:
+ return p.clientAPI.CreateStatus(ctx, cMsg)
+
+ // CREATE FOLLOW (request)
+ case ap.ActivityFollow:
+ return p.clientAPI.CreateFollowReq(ctx, cMsg)
+
+ // CREATE LIKE/FAVE
+ case ap.ActivityLike:
+ return p.clientAPI.CreateLike(ctx, cMsg)
+
+ // CREATE ANNOUNCE/BOOST
+ case ap.ActivityAnnounce:
+ return p.clientAPI.CreateAnnounce(ctx, cMsg)
+
+ // CREATE BLOCK
+ case ap.ActivityBlock:
+ return p.clientAPI.CreateBlock(ctx, cMsg)
+ }
+
+ // UPDATE SOMETHING
+ case ap.ActivityUpdate:
+ switch cMsg.APObjectType {
+
+ // UPDATE PROFILE/ACCOUNT
+ case ap.ObjectProfile, ap.ActorPerson:
+ return p.clientAPI.UpdateAccount(ctx, cMsg)
+
+ // UPDATE A FLAG/REPORT (mark as resolved/closed)
+ case ap.ActivityFlag:
+ return p.clientAPI.UpdateReport(ctx, cMsg)
+ }
+
+ // ACCEPT SOMETHING
+ case ap.ActivityAccept:
+ switch cMsg.APObjectType { //nolint:gocritic
+
+ // ACCEPT FOLLOW (request)
+ case ap.ActivityFollow:
+ return p.clientAPI.AcceptFollow(ctx, cMsg)
+ }
+
+ // REJECT SOMETHING
+ case ap.ActivityReject:
+ switch cMsg.APObjectType { //nolint:gocritic
+
+ // REJECT FOLLOW (request)
+ case ap.ActivityFollow:
+ return p.clientAPI.RejectFollowRequest(ctx, cMsg)
+ }
+
+ // UNDO SOMETHING
+ case ap.ActivityUndo:
+ switch cMsg.APObjectType {
+
+ // UNDO FOLLOW (request)
+ case ap.ActivityFollow:
+ return p.clientAPI.UndoFollow(ctx, cMsg)
+
+ // UNDO BLOCK
+ case ap.ActivityBlock:
+ return p.clientAPI.UndoBlock(ctx, cMsg)
+
+ // UNDO LIKE/FAVE
+ case ap.ActivityLike:
+ return p.clientAPI.UndoFave(ctx, cMsg)
+
+ // UNDO ANNOUNCE/BOOST
+ case ap.ActivityAnnounce:
+ return p.clientAPI.UndoAnnounce(ctx, cMsg)
+ }
+
+ // DELETE SOMETHING
+ case ap.ActivityDelete:
+ switch cMsg.APObjectType {
+
+ // DELETE NOTE/STATUS
+ case ap.ObjectNote:
+ return p.clientAPI.DeleteStatus(ctx, cMsg)
+
+ // DELETE PROFILE/ACCOUNT
+ case ap.ObjectProfile, ap.ActorPerson:
+ return p.clientAPI.DeleteAccount(ctx, cMsg)
+ }
+
+ // FLAG/REPORT SOMETHING
+ case ap.ActivityFlag:
+ switch cMsg.APObjectType { //nolint:gocritic
+
+ // FLAG/REPORT A PROFILE
+ case ap.ObjectProfile:
+ return p.clientAPI.ReportAccount(ctx, cMsg)
+ }
+ }
+
+ return nil
+}
+
+func (p *clientAPI) CreateAccount(ctx context.Context, cMsg messages.FromClientAPI) error {
+ account, ok := cMsg.GTSModel.(*gtsmodel.Account)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Account", cMsg.GTSModel)
+ }
+
+ // Send a confirmation email to the newly created account.
+ user, err := p.state.DB.GetUserByAccountID(ctx, account.ID)
+ if err != nil {
+ return gtserror.Newf("db error getting user for account id %s: %w", account.ID, err)
+ }
+
+ if err := p.surface.emailPleaseConfirm(ctx, user, account.Username); err != nil {
+ return gtserror.Newf("error emailing %s: %w", account.Username, err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) CreateStatus(ctx context.Context, cMsg messages.FromClientAPI) error {
+ status, ok := cMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel)
+ }
+
+ if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil {
+ return gtserror.Newf("error timelining status: %w", err)
+ }
+
+ if status.InReplyToID != "" {
+ // Interaction counts changed on the replied status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ }
+
+ if err := p.federate.CreateStatus(ctx, status); err != nil {
+ return gtserror.Newf("error federating status: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) CreateFollowReq(ctx context.Context, cMsg messages.FromClientAPI) error {
+ followRequest, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel)
+ }
+
+ if err := p.surface.notifyFollowRequest(ctx, followRequest); err != nil {
+ return gtserror.Newf("error notifying follow request: %w", err)
+ }
+
+ if err := p.federate.Follow(
+ ctx,
+ p.tc.FollowRequestToFollow(ctx, followRequest),
+ ); err != nil {
+ return gtserror.Newf("error federating follow: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) CreateLike(ctx context.Context, cMsg messages.FromClientAPI) error {
+ fave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel)
+ }
+
+ if err := p.surface.notifyFave(ctx, fave); err != nil {
+ return gtserror.Newf("error notifying fave: %w", err)
+ }
+
+ // Interaction counts changed on the faved status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID)
+
+ if err := p.federate.Like(ctx, fave); err != nil {
+ return gtserror.Newf("error federating like: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error {
+ boost, ok := cMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel)
+ }
+
+ // Timeline and notify the boost wrapper status.
+ if err := p.surface.timelineAndNotifyStatus(ctx, boost); err != nil {
+ return gtserror.Newf("error timelining boost: %w", err)
+ }
+
+ // Notify the boost target account.
+ if err := p.surface.notifyAnnounce(ctx, boost); err != nil {
+ return gtserror.Newf("error notifying boost: %w", err)
+ }
+
+ // Interaction counts changed on the boosted status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+
+ if err := p.federate.Announce(ctx, boost); err != nil {
+ return gtserror.Newf("error federating announce: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) CreateBlock(ctx context.Context, cMsg messages.FromClientAPI) error {
+ block, ok := cMsg.GTSModel.(*gtsmodel.Block)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel)
+ }
+
+ // Remove blockee's statuses from blocker's timeline.
+ if err := p.state.Timelines.Home.WipeItemsFromAccountID(
+ ctx,
+ block.AccountID,
+ block.TargetAccountID,
+ ); err != nil {
+ return gtserror.Newf("error wiping timeline items for block: %w", err)
+ }
+
+ // Remove blocker's statuses from blockee's timeline.
+ if err := p.state.Timelines.Home.WipeItemsFromAccountID(
+ ctx,
+ block.TargetAccountID,
+ block.AccountID,
+ ); err != nil {
+ return gtserror.Newf("error wiping timeline items for block: %w", err)
+ }
+
+ // TODO: same with notifications?
+ // TODO: same with bookmarks?
+
+ if err := p.federate.Block(ctx, block); err != nil {
+ return gtserror.Newf("error federating block: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) UpdateAccount(ctx context.Context, cMsg messages.FromClientAPI) error {
+ account, ok := cMsg.GTSModel.(*gtsmodel.Account)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Account", cMsg.GTSModel)
+ }
+
+ if err := p.federate.UpdateAccount(ctx, account); err != nil {
+ return gtserror.Newf("error federating account update: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) UpdateReport(ctx context.Context, cMsg messages.FromClientAPI) error {
+ report, ok := cMsg.GTSModel.(*gtsmodel.Report)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel)
+ }
+
+ if report.Account.IsRemote() {
+ // Report creator is a remote account,
+ // we shouldn't try to email them!
+ return nil
+ }
+
+ if err := p.surface.emailReportClosed(ctx, report); err != nil {
+ return gtserror.Newf("error sending report closed email: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) AcceptFollow(ctx context.Context, cMsg messages.FromClientAPI) error {
+ follow, ok := cMsg.GTSModel.(*gtsmodel.Follow)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel)
+ }
+
+ if err := p.surface.notifyFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error notifying follow: %w", err)
+ }
+
+ if err := p.federate.AcceptFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error federating follow request accept: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) RejectFollowRequest(ctx context.Context, cMsg messages.FromClientAPI) error {
+ followReq, ok := cMsg.GTSModel.(*gtsmodel.FollowRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", cMsg.GTSModel)
+ }
+
+ if err := p.federate.RejectFollow(
+ ctx,
+ p.tc.FollowRequestToFollow(ctx, followReq),
+ ); err != nil {
+ return gtserror.Newf("error federating reject follow: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) UndoFollow(ctx context.Context, cMsg messages.FromClientAPI) error {
+ follow, ok := cMsg.GTSModel.(*gtsmodel.Follow)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Follow", cMsg.GTSModel)
+ }
+
+ if err := p.federate.UndoFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error federating undo follow: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) UndoBlock(ctx context.Context, cMsg messages.FromClientAPI) error {
+ block, ok := cMsg.GTSModel.(*gtsmodel.Block)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Block", cMsg.GTSModel)
+ }
+
+ if err := p.federate.UndoBlock(ctx, block); err != nil {
+ return gtserror.Newf("error federating undo block: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) UndoFave(ctx context.Context, cMsg messages.FromClientAPI) error {
+ statusFave, ok := cMsg.GTSModel.(*gtsmodel.StatusFave)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel)
+ }
+
+ // Interaction counts changed on the faved status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID)
+
+ if err := p.federate.UndoLike(ctx, statusFave); err != nil {
+ return gtserror.Newf("error federating undo like: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg messages.FromClientAPI) error {
+ status, ok := cMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel)
+ }
+
+ if err := p.state.DB.DeleteStatusByID(ctx, status.ID); err != nil {
+ return gtserror.Newf("db error deleting status: %w", err)
+ }
+
+ if err := p.surface.deleteStatusFromTimelines(ctx, status.ID); err != nil {
+ return gtserror.Newf("error removing status from timelines: %w", err)
+ }
+
+ // Interaction counts changed on the boosted status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID)
+
+ if err := p.federate.UndoAnnounce(ctx, status); err != nil {
+ return gtserror.Newf("error federating undo announce: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg messages.FromClientAPI) error {
+ // Don't delete attachments, just unattach them:
+ // this request comes from the client API and the
+ // poster may want to use attachments again later.
+ const deleteAttachments = false
+
+ status, ok := cMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Status", cMsg.GTSModel)
+ }
+
+ // Try to populate status structs if possible,
+ // in order to more thoroughly remove them.
+ if err := p.state.DB.PopulateStatus(
+ ctx, status,
+ ); err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("db error populating status: %w", err)
+ }
+
+ if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil {
+ return gtserror.Newf("error wiping status: %w", err)
+ }
+
+ if status.InReplyToID != "" {
+ // Interaction counts changed on the replied status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ }
+
+ if err := p.federate.DeleteStatus(ctx, status); err != nil {
+ return gtserror.Newf("error federating status delete: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) DeleteAccount(ctx context.Context, cMsg messages.FromClientAPI) error {
+ // The originID of the delete, one of:
+ // - ID of a domain block, for which
+ // this account delete is a side effect.
+ // - ID of the deleted account itself (self delete).
+ // - ID of an admin account (account suspension).
+ var originID string
+
+ if domainBlock, ok := cMsg.GTSModel.(*gtsmodel.DomainBlock); ok {
+ // Origin is a domain block.
+ originID = domainBlock.ID
+ } else {
+ // Origin is whichever account
+ // originated this message.
+ originID = cMsg.OriginAccount.ID
+ }
+
+ if err := p.federate.DeleteAccount(ctx, cMsg.TargetAccount); err != nil {
+ return gtserror.Newf("error federating account delete: %w", err)
+ }
+
+ if err := p.account.Delete(ctx, cMsg.TargetAccount, originID); err != nil {
+ return gtserror.Newf("error deleting account: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) ReportAccount(ctx context.Context, cMsg messages.FromClientAPI) error {
+ report, ok := cMsg.GTSModel.(*gtsmodel.Report)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Report", cMsg.GTSModel)
+ }
+
+ // Federate this report to the
+ // remote instance if desired.
+ if *report.Forwarded {
+ if err := p.federate.Flag(ctx, report); err != nil {
+ return gtserror.Newf("error federating report: %w", err)
+ }
+ }
+
+ if err := p.surface.emailReportOpened(ctx, report); err != nil {
+ return gtserror.Newf("error sending report opened email: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go
new file mode 100644
index 000000000..6690a43db
--- /dev/null
+++ b/internal/processing/workers/fromclientapi_test.go
@@ -0,0 +1,589 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers_test
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/ap"
+ "github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type FromClientAPITestSuite struct {
+ WorkersTestSuite
+}
+
+func (suite *FromClientAPITestSuite) newStatus(
+ ctx context.Context,
+ account *gtsmodel.Account,
+ visibility gtsmodel.Visibility,
+ replyToStatus *gtsmodel.Status,
+ boostOfStatus *gtsmodel.Status,
+) *gtsmodel.Status {
+ var (
+ protocol = config.GetProtocol()
+ host = config.GetHost()
+ statusID = id.NewULID()
+ )
+
+ // Make a new status from given account.
+ newStatus := &gtsmodel.Status{
+ ID: statusID,
+ URI: protocol + "://" + host + "/users/" + account.Username + "/statuses/" + statusID,
+ URL: protocol + "://" + host + "/@" + account.Username + "/statuses/" + statusID,
+ Content: "pee pee poo poo",
+ Local: util.Ptr(true),
+ AccountURI: account.URI,
+ AccountID: account.ID,
+ Visibility: visibility,
+ ActivityStreamsType: ap.ObjectNote,
+ Federated: util.Ptr(true),
+ Boostable: util.Ptr(true),
+ Replyable: util.Ptr(true),
+ Likeable: util.Ptr(true),
+ }
+
+ if replyToStatus != nil {
+ // Status is a reply.
+ newStatus.InReplyToAccountID = replyToStatus.AccountID
+ newStatus.InReplyToID = replyToStatus.ID
+ newStatus.InReplyToURI = replyToStatus.URI
+
+ // Mention the replied-to account.
+ mention := &gtsmodel.Mention{
+ ID: id.NewULID(),
+ StatusID: statusID,
+ OriginAccountID: account.ID,
+ OriginAccountURI: account.URI,
+ TargetAccountID: replyToStatus.AccountID,
+ }
+
+ if err := suite.db.PutMention(ctx, mention); err != nil {
+ suite.FailNow(err.Error())
+ }
+ newStatus.Mentions = []*gtsmodel.Mention{mention}
+ newStatus.MentionIDs = []string{mention.ID}
+ }
+
+ if boostOfStatus != nil {
+ // Status is a boost.
+
+ }
+
+ // Put the status in the db, to mimic what would
+ // have already happened earlier up the flow.
+ if err := suite.db.PutStatus(ctx, newStatus); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ return newStatus
+}
+
+func (suite *FromClientAPITestSuite) checkStreamed(
+ str *stream.Stream,
+ expectMessage bool,
+ expectPayload string,
+ expectEventType string,
+) {
+ var msg *stream.Message
+streamLoop:
+ for {
+ select {
+ case msg = <-str.Messages:
+ break streamLoop // Got it.
+ case <-time.After(5 * time.Second):
+ break streamLoop // Didn't get it.
+ }
+ }
+
+ if expectMessage && msg == nil {
+ suite.FailNow("expected a message but message was nil")
+ }
+
+ if !expectMessage && msg != nil {
+ suite.FailNow("expected no message but message was not nil")
+ }
+
+ if expectPayload != "" && msg.Payload != expectPayload {
+ suite.FailNow("", "expected payload %s but payload was: %s", expectPayload, msg.Payload)
+ }
+
+ if expectEventType != "" && msg.Event != expectEventType {
+ suite.FailNow("", "expected event type %s but event type was: %s", expectEventType, msg.Event)
+ }
+}
+
+func (suite *FromClientAPITestSuite) statusJSON(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ requestingAccount *gtsmodel.Account,
+) string {
+ apiStatus, err := suite.typeconverter.StatusToAPIStatus(
+ ctx,
+ status,
+ requestingAccount,
+ )
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ statusJSON, err := json.Marshal(apiStatus)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ return string(statusJSON)
+}
+
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["admin_account"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ testList = suite.testLists["local_account_1_list_1"]
+ streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID})
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+ notifStream = streams[stream.TimelineNotifications]
+
+ // Admin account posts a new top-level status.
+ status = suite.newStatus(
+ ctx,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ nil,
+ nil,
+ )
+ statusJSON = suite.statusJSON(
+ ctx,
+ status,
+ receivingAccount,
+ )
+ )
+
+ // Update the follow from receiving account -> posting account so
+ // that receiving account wants notifs when posting account posts.
+ follow := new(gtsmodel.Follow)
+ *follow = *suite.testFollows["local_account_1_admin_account"]
+
+ follow.Notify = util.Ptr(true)
+ if err := suite.db.UpdateFollow(ctx, follow); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Process the new status.
+ if err := suite.processor.Workers().ProcessFromClientAPI(
+ ctx,
+ messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ OriginAccount: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check message in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+
+ // Check message in list stream.
+ suite.checkStreamed(
+ listStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+
+ // Wait for a notification to appear for the status.
+ var notif *gtsmodel.Notification
+ if !testrig.WaitFor(func() bool {
+ var err error
+ notif, err = suite.db.GetNotification(
+ ctx,
+ gtsmodel.NotificationStatus,
+ receivingAccount.ID,
+ postingAccount.ID,
+ status.ID,
+ )
+ return err == nil
+ }) {
+ suite.FailNow("timed out waiting for new status notification")
+ }
+
+ apiNotif, err := suite.typeconverter.NotificationToAPINotification(ctx, notif)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ notifJSON, err := json.Marshal(apiNotif)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check message in notification stream.
+ suite.checkStreamed(
+ notifStream,
+ true,
+ string(notifJSON),
+ stream.EventTypeNotification,
+ )
+}
+
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["admin_account"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ testList = suite.testLists["local_account_1_list_1"]
+ streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID})
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+
+ // Admin account posts a reply to turtle.
+ // Since turtle is followed by zork, and
+ // the default replies policy for this list
+ // is to show replies to followed accounts,
+ // post should also show in the list stream.
+ status = suite.newStatus(
+ ctx,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ suite.testStatuses["local_account_2_status_1"],
+ nil,
+ )
+ statusJSON = suite.statusJSON(
+ ctx,
+ status,
+ receivingAccount,
+ )
+ )
+
+ // Process the new status.
+ if err := suite.processor.Workers().ProcessFromClientAPI(
+ ctx,
+ messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ OriginAccount: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check message in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+
+ // Check message in list stream.
+ suite.checkStreamed(
+ listStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+}
+
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyOK() {
+ // We're modifying the test list so take a copy.
+ testList := new(gtsmodel.List)
+ *testList = *suite.testLists["local_account_1_list_1"]
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["admin_account"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID})
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+
+ // Admin account posts a reply to turtle.
+ status = suite.newStatus(
+ ctx,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ suite.testStatuses["local_account_2_status_1"],
+ nil,
+ )
+ statusJSON = suite.statusJSON(
+ ctx,
+ status,
+ receivingAccount,
+ )
+ )
+
+ // Modify replies policy of test list to show replies
+ // only to other accounts in the same list. Since turtle
+ // and admin are in the same list, this means the reply
+ // should be shown in the list.
+ testList.RepliesPolicy = gtsmodel.RepliesPolicyList
+ if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Process the new status.
+ if err := suite.processor.Workers().ProcessFromClientAPI(
+ ctx,
+ messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ OriginAccount: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check message in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+
+ // Check message in list stream.
+ suite.checkStreamed(
+ listStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+}
+
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyNo() {
+ // We're modifying the test list so take a copy.
+ testList := new(gtsmodel.List)
+ *testList = *suite.testLists["local_account_1_list_1"]
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["admin_account"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID})
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+
+ // Admin account posts a reply to turtle.
+ status = suite.newStatus(
+ ctx,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ suite.testStatuses["local_account_2_status_1"],
+ nil,
+ )
+ statusJSON = suite.statusJSON(
+ ctx,
+ status,
+ receivingAccount,
+ )
+ )
+
+ // Modify replies policy of test list to show replies
+ // only to other accounts in the same list. We're
+ // about to remove turtle from the same list as admin,
+ // so the new post should not be streamed to the list.
+ testList.RepliesPolicy = gtsmodel.RepliesPolicyList
+ if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Remove turtle from the list.
+ if err := suite.db.DeleteListEntry(ctx, suite.testListEntries["local_account_1_list_1_entry_1"].ID); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Process the new status.
+ if err := suite.processor.Workers().ProcessFromClientAPI(
+ ctx,
+ messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ OriginAccount: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check message in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+
+ // Check message NOT in list stream.
+ suite.checkStreamed(
+ listStream,
+ false,
+ "",
+ "",
+ )
+}
+
+func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPolicyNone() {
+ // We're modifying the test list so take a copy.
+ testList := new(gtsmodel.List)
+ *testList = *suite.testLists["local_account_1_list_1"]
+
+ var (
+ ctx = context.Background()
+ postingAccount = suite.testAccounts["admin_account"]
+ receivingAccount = suite.testAccounts["local_account_1"]
+ streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID})
+ homeStream = streams[stream.TimelineHome]
+ listStream = streams[stream.TimelineList+":"+testList.ID]
+
+ // Admin account posts a reply to turtle.
+ status = suite.newStatus(
+ ctx,
+ postingAccount,
+ gtsmodel.VisibilityPublic,
+ suite.testStatuses["local_account_2_status_1"],
+ nil,
+ )
+ statusJSON = suite.statusJSON(
+ ctx,
+ status,
+ receivingAccount,
+ )
+ )
+
+ // Modify replies policy of test list.
+ // Since we're modifying the list to not
+ // show any replies, the post should not
+ // be streamed to the list.
+ testList.RepliesPolicy = gtsmodel.RepliesPolicyNone
+ if err := suite.db.UpdateList(ctx, testList, "replies_policy"); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Process the new status.
+ if err := suite.processor.Workers().ProcessFromClientAPI(
+ ctx,
+ messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: status,
+ OriginAccount: postingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Check message in home stream.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ statusJSON,
+ stream.EventTypeUpdate,
+ )
+
+ // Check message NOT in list stream.
+ suite.checkStreamed(
+ listStream,
+ false,
+ "",
+ "",
+ )
+}
+
+func (suite *FromClientAPITestSuite) TestProcessStatusDelete() {
+ var (
+ ctx = context.Background()
+ deletingAccount = suite.testAccounts["local_account_1"]
+ receivingAccount = suite.testAccounts["local_account_2"]
+ deletedStatus = suite.testStatuses["local_account_1_status_1"]
+ boostOfDeletedStatus = suite.testStatuses["admin_account_status_4"]
+ streams = suite.openStreams(ctx, receivingAccount, nil)
+ homeStream = streams[stream.TimelineHome]
+ )
+
+ // Delete the status from the db first, to mimic what
+ // would have already happened earlier up the flow
+ if err := suite.db.DeleteStatusByID(ctx, deletedStatus.ID); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Process the status delete.
+ if err := suite.processor.Workers().ProcessFromClientAPI(
+ ctx,
+ messages.FromClientAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityDelete,
+ GTSModel: deletedStatus,
+ OriginAccount: deletingAccount,
+ },
+ ); err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ // Stream should have the delete
+ // of admin's boost in it now.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ boostOfDeletedStatus.ID,
+ stream.EventTypeDelete,
+ )
+
+ // Stream should also have the delete
+ // of the message itself in it.
+ suite.checkStreamed(
+ homeStream,
+ true,
+ deletedStatus.ID,
+ stream.EventTypeDelete,
+ )
+
+ // Boost should no longer be in the database.
+ if !testrig.WaitFor(func() bool {
+ _, err := suite.db.GetStatusByID(ctx, boostOfDeletedStatus.ID)
+ return errors.Is(err, db.ErrNoEntries)
+ }) {
+ suite.FailNow("timed out waiting for status delete")
+ }
+}
+
+func TestFromClientAPITestSuite(t *testing.T) {
+ suite.Run(t, &FromClientAPITestSuite{})
+}
diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go
new file mode 100644
index 000000000..5fbb0066b
--- /dev/null
+++ b/internal/processing/workers/fromfediapi.go
@@ -0,0 +1,540 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "context"
+ "net/url"
+
+ "codeberg.org/gruf/go-kv"
+ "codeberg.org/gruf/go-logger/v2/level"
+ "github.com/superseriousbusiness/gotosocial/internal/ap"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+)
+
+// fediAPI wraps processing functions
+// specifically for messages originating
+// from the federation/ActivityPub API.
+type fediAPI struct {
+ state *state.State
+ surface *surface
+ federate *federate
+ wipeStatus wipeStatus
+ account *account.Processor
+}
+
+func (p *Processor) EnqueueFediAPI(ctx context.Context, msgs ...messages.FromFediAPI) {
+ log.Trace(ctx, "enqueuing")
+ _ = p.workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ for _, msg := range msgs {
+ log.Trace(ctx, "processing: %+v", msg)
+ if err := p.ProcessFromFediAPI(ctx, msg); err != nil {
+ log.Errorf(ctx, "error processing fedi API message: %v", err)
+ }
+ }
+ })
+}
+
+func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFediAPI) error {
+ // Allocate new log fields slice
+ fields := make([]kv.Field, 3, 5)
+ fields[0] = kv.Field{"activityType", fMsg.APActivityType}
+ fields[1] = kv.Field{"objectType", fMsg.APObjectType}
+ fields[2] = kv.Field{"toAccount", fMsg.ReceivingAccount.Username}
+
+ if fMsg.APIri != nil {
+ // An IRI was supplied, append to log
+ fields = append(fields, kv.Field{
+ "iri", fMsg.APIri,
+ })
+ }
+
+ // Include GTSModel in logs if appropriate.
+ if fMsg.GTSModel != nil &&
+ log.Level() >= level.DEBUG {
+ fields = append(fields, kv.Field{
+ "model", fMsg.GTSModel,
+ })
+ }
+
+ l := log.WithContext(ctx).WithFields(fields...)
+ l.Info("processing from fedi API")
+
+ switch fMsg.APActivityType {
+
+ // CREATE SOMETHING
+ case ap.ActivityCreate:
+ switch fMsg.APObjectType {
+
+ // CREATE NOTE/STATUS
+ case ap.ObjectNote:
+ return p.fediAPI.CreateStatus(ctx, fMsg)
+
+ // CREATE FOLLOW (request)
+ case ap.ActivityFollow:
+ return p.fediAPI.CreateFollowReq(ctx, fMsg)
+
+ // CREATE LIKE/FAVE
+ case ap.ActivityLike:
+ return p.fediAPI.CreateLike(ctx, fMsg)
+
+ // CREATE ANNOUNCE/BOOST
+ case ap.ActivityAnnounce:
+ return p.fediAPI.CreateAnnounce(ctx, fMsg)
+
+ // CREATE BLOCK
+ case ap.ActivityBlock:
+ return p.fediAPI.CreateBlock(ctx, fMsg)
+
+ // CREATE FLAG/REPORT
+ case ap.ActivityFlag:
+ return p.fediAPI.CreateFlag(ctx, fMsg)
+ }
+
+ // UPDATE SOMETHING
+ case ap.ActivityUpdate:
+ switch fMsg.APObjectType { //nolint:gocritic
+
+ // UPDATE PROFILE/ACCOUNT
+ case ap.ObjectProfile:
+ return p.fediAPI.UpdateAccount(ctx, fMsg)
+ }
+
+ // DELETE SOMETHING
+ case ap.ActivityDelete:
+ switch fMsg.APObjectType {
+
+ // DELETE NOTE/STATUS
+ case ap.ObjectNote:
+ return p.fediAPI.DeleteStatus(ctx, fMsg)
+
+ // DELETE PROFILE/ACCOUNT
+ case ap.ObjectProfile:
+ return p.fediAPI.DeleteAccount(ctx, fMsg)
+ }
+ }
+
+ return nil
+}
+
+func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error {
+ var (
+ status *gtsmodel.Status
+ err error
+
+ // Check the federatorMsg for either an already dereferenced
+ // and converted status pinned to the message, or a forwarded
+ // AP IRI that we still need to deref.
+ forwarded = (fMsg.GTSModel == nil)
+ )
+
+ if forwarded {
+ // Model was not set, deref with IRI.
+ // This will also cause the status to be inserted into the db.
+ status, err = p.statusFromAPIRI(ctx, fMsg)
+ } else {
+ // Model is set, ensure we have the most up-to-date model.
+ status, err = p.statusFromGTSModel(ctx, fMsg)
+ }
+
+ if err != nil {
+ return gtserror.Newf("error extracting status from federatorMsg: %w", err)
+ }
+
+ if status.Account == nil || status.Account.IsRemote() {
+ // Either no account attached yet, or a remote account.
+ // Both situations we need to parse account URI to fetch it.
+ accountURI, err := url.Parse(status.AccountURI)
+ if err != nil {
+ return err
+ }
+
+ // Ensure that account for this status has been deref'd.
+ status.Account, _, err = p.federate.GetAccountByURI(
+ ctx,
+ fMsg.ReceivingAccount.Username,
+ accountURI,
+ )
+ if err != nil {
+ return err
+ }
+ }
+
+ // Ensure status ancestors dereferenced. We need at least the
+ // immediate parent (if present) to ascertain timelineability.
+ if err := p.federate.DereferenceStatusAncestors(
+ ctx,
+ fMsg.ReceivingAccount.Username,
+ status,
+ ); err != nil {
+ return err
+ }
+
+ if status.InReplyToID != "" {
+ // Interaction counts changed on the replied status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ }
+
+ if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil {
+ return gtserror.Newf("error timelining status: %w", err)
+ }
+
+ return nil
+}
+
+func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) {
+ // There should be a status pinned to the message:
+ // we've already checked to ensure this is not nil.
+ status, ok := fMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ err := gtserror.New("Note was not parseable as *gtsmodel.Status")
+ return nil, err
+ }
+
+ // AP statusable representation may have also
+ // been set on message (no problem if not).
+ statusable, _ := fMsg.APObjectModel.(ap.Statusable)
+
+ // Call refresh on status to update
+ // it (deref remote) if necessary.
+ var err error
+ status, _, err = p.federate.RefreshStatus(
+ ctx,
+ fMsg.ReceivingAccount.Username,
+ status,
+ statusable,
+ false, // Don't force refresh.
+ )
+ if err != nil {
+ return nil, gtserror.Newf("%w", err)
+ }
+
+ return status, nil
+}
+
+func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) {
+ // There should be a status IRI pinned to
+ // the federatorMsg for us to dereference.
+ if fMsg.APIri == nil {
+ err := gtserror.New(
+ "status was not pinned to federatorMsg, " +
+ "and neither was an IRI for us to dereference",
+ )
+ return nil, err
+ }
+
+ // Get the status + ensure we have
+ // the most up-to-date version.
+ status, _, err := p.federate.GetStatusByURI(
+ ctx,
+ fMsg.ReceivingAccount.Username,
+ fMsg.APIri,
+ )
+ if err != nil {
+ return nil, gtserror.Newf("%w", err)
+ }
+
+ return status, nil
+}
+
+func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI) error {
+ followRequest, ok := fMsg.GTSModel.(*gtsmodel.FollowRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", fMsg.GTSModel)
+ }
+
+ if *followRequest.TargetAccount.Locked {
+ // Account on our instance is locked:
+ // just notify the follow request.
+ if err := p.surface.notifyFollowRequest(ctx, followRequest); err != nil {
+ return gtserror.Newf("error notifying follow request: %w", err)
+ }
+
+ return nil
+ }
+
+ // Account on our instance is not locked:
+ // Automatically accept the follow request
+ // and notify about the new follower.
+ follow, err := p.state.DB.AcceptFollowRequest(
+ ctx,
+ followRequest.AccountID,
+ followRequest.TargetAccountID,
+ )
+ if err != nil {
+ return gtserror.Newf("error accepting follow request: %w", err)
+ }
+
+ if err := p.federate.AcceptFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error federating accept follow request: %w", err)
+ }
+
+ if err := p.surface.notifyFollow(ctx, follow); err != nil {
+ return gtserror.Newf("error notifying follow: %w", err)
+ }
+
+ return nil
+}
+
+func (p *fediAPI) CreateLike(ctx context.Context, fMsg messages.FromFediAPI) error {
+ fave, ok := fMsg.GTSModel.(*gtsmodel.StatusFave)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", fMsg.GTSModel)
+ }
+
+ if err := p.surface.notifyFave(ctx, fave); err != nil {
+ return gtserror.Newf("error notifying fave: %w", err)
+ }
+
+ // Interaction counts changed on the faved status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID)
+
+ return nil
+}
+
+func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) error {
+ status, ok := fMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel)
+ }
+
+ // Dereference status that this status boosts.
+ if err := p.federate.DereferenceAnnounce(
+ ctx,
+ status,
+ fMsg.ReceivingAccount.Username,
+ ); err != nil {
+ return gtserror.Newf("error dereferencing announce: %w", err)
+ }
+
+ // Generate an ID for the boost wrapper status.
+ statusID, err := id.NewULIDFromTime(status.CreatedAt)
+ if err != nil {
+ return gtserror.Newf("error generating id: %w", err)
+ }
+ status.ID = statusID
+
+ // Store the boost wrapper status.
+ if err := p.state.DB.PutStatus(ctx, status); err != nil {
+ return gtserror.Newf("db error inserting status: %w", err)
+ }
+
+ // Ensure boosted status ancestors dereferenced. We need at least
+ // the immediate parent (if present) to ascertain timelineability.
+ if err := p.federate.DereferenceStatusAncestors(ctx,
+ fMsg.ReceivingAccount.Username,
+ status.BoostOf,
+ ); err != nil {
+ return err
+ }
+
+ // Timeline and notify the announce.
+ if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil {
+ return gtserror.Newf("error timelining status: %w", err)
+ }
+
+ if err := p.surface.notifyAnnounce(ctx, status); err != nil {
+ return gtserror.Newf("error notifying status: %w", err)
+ }
+
+ // Interaction counts changed on the boosted status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.ID)
+
+ return nil
+}
+
+func (p *fediAPI) CreateBlock(ctx context.Context, fMsg messages.FromFediAPI) error {
+ block, ok := fMsg.GTSModel.(*gtsmodel.Block)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel)
+ }
+
+ // Remove each account's posts from the other's timelines.
+ //
+ // First home timelines.
+ if err := p.state.Timelines.Home.WipeItemsFromAccountID(
+ ctx,
+ block.AccountID,
+ block.TargetAccountID,
+ ); err != nil {
+ return gtserror.Newf("%w", err)
+ }
+
+ if err := p.state.Timelines.Home.WipeItemsFromAccountID(
+ ctx,
+ block.TargetAccountID,
+ block.AccountID,
+ ); err != nil {
+ return gtserror.Newf("%w", err)
+ }
+
+ // Now list timelines.
+ if err := p.state.Timelines.List.WipeItemsFromAccountID(
+ ctx,
+ block.AccountID,
+ block.TargetAccountID,
+ ); err != nil {
+ return gtserror.Newf("%w", err)
+ }
+
+ if err := p.state.Timelines.List.WipeItemsFromAccountID(
+ ctx,
+ block.TargetAccountID,
+ block.AccountID,
+ ); err != nil {
+ return gtserror.Newf("%w", err)
+ }
+
+ // Remove any follows that existed between blocker + blockee.
+ if err := p.state.DB.DeleteFollow(
+ ctx,
+ block.AccountID,
+ block.TargetAccountID,
+ ); err != nil {
+ return gtserror.Newf(
+ "db error deleting follow from %s targeting %s: %w",
+ block.AccountID, block.TargetAccountID, err,
+ )
+ }
+
+ if err := p.state.DB.DeleteFollow(
+ ctx,
+ block.TargetAccountID,
+ block.AccountID,
+ ); err != nil {
+ return gtserror.Newf(
+ "db error deleting follow from %s targeting %s: %w",
+ block.TargetAccountID, block.AccountID, err,
+ )
+ }
+
+ // Remove any follow requests that existed between blocker + blockee.
+ if err := p.state.DB.DeleteFollowRequest(
+ ctx,
+ block.AccountID,
+ block.TargetAccountID,
+ ); err != nil {
+ return gtserror.Newf(
+ "db error deleting follow request from %s targeting %s: %w",
+ block.AccountID, block.TargetAccountID, err,
+ )
+ }
+
+ if err := p.state.DB.DeleteFollowRequest(
+ ctx,
+ block.TargetAccountID,
+ block.AccountID,
+ ); err != nil {
+ return gtserror.Newf(
+ "db error deleting follow request from %s targeting %s: %w",
+ block.TargetAccountID, block.AccountID, err,
+ )
+ }
+
+ return nil
+}
+
+func (p *fediAPI) CreateFlag(ctx context.Context, fMsg messages.FromFediAPI) error {
+ incomingReport, ok := fMsg.GTSModel.(*gtsmodel.Report)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Report", fMsg.GTSModel)
+ }
+
+ // TODO: handle additional side effects of flag creation:
+ // - notify admins by dm / notification
+
+ if err := p.surface.emailReportOpened(ctx, incomingReport); err != nil {
+ return gtserror.Newf("error sending report opened email: %w", err)
+ }
+
+ return nil
+}
+
+func (p *fediAPI) UpdateAccount(ctx context.Context, fMsg messages.FromFediAPI) error {
+ // Parse the old/existing account model.
+ account, ok := fMsg.GTSModel.(*gtsmodel.Account)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Account", fMsg.GTSModel)
+ }
+
+ // Because this was an Update, the new Accountable should be set on the message.
+ apubAcc, ok := fMsg.APObjectModel.(ap.Accountable)
+ if !ok {
+ return gtserror.Newf("%T not parseable as ap.Accountable", fMsg.APObjectModel)
+ }
+
+ // Fetch up-to-date bio, avatar, header, etc.
+ _, _, err := p.federate.RefreshAccount(
+ ctx,
+ fMsg.ReceivingAccount.Username,
+ account,
+ apubAcc,
+ true, // Force refresh.
+ )
+ if err != nil {
+ return gtserror.Newf("error refreshing updated account: %w", err)
+ }
+
+ return nil
+}
+
+func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg messages.FromFediAPI) error {
+ // Delete attachments from this status, since this request
+ // comes from the federating API, and there's no way the
+ // poster can do a delete + redraft for it on our instance.
+ const deleteAttachments = true
+
+ status, ok := fMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel)
+ }
+
+ if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil {
+ return gtserror.Newf("error wiping status: %w", err)
+ }
+
+ if status.InReplyToID != "" {
+ // Interaction counts changed on the replied status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+ }
+
+ return nil
+}
+
+func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg messages.FromFediAPI) error {
+ account, ok := fMsg.GTSModel.(*gtsmodel.Account)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.Account", fMsg.GTSModel)
+ }
+
+ if err := p.account.Delete(ctx, account, account.ID); err != nil {
+ return gtserror.Newf("error deleting account: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go
new file mode 100644
index 000000000..f8e3941fc
--- /dev/null
+++ b/internal/processing/workers/fromfediapi_test.go
@@ -0,0 +1,565 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers_test
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/ap"
+ apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type FromFediAPITestSuite struct {
+ WorkersTestSuite
+}
+
+// remote_account_1 boosts the first status of local_account_1
+func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() {
+ boostedStatus := suite.testStatuses["local_account_1_status_1"]
+ boostingAccount := suite.testAccounts["remote_account_1"]
+ announceStatus := &gtsmodel.Status{}
+ announceStatus.URI = "https://example.org/some-announce-uri"
+ announceStatus.BoostOf = &gtsmodel.Status{
+ URI: boostedStatus.URI,
+ }
+ announceStatus.CreatedAt = time.Now()
+ announceStatus.UpdatedAt = time.Now()
+ announceStatus.AccountID = boostingAccount.ID
+ announceStatus.AccountURI = boostingAccount.URI
+ announceStatus.Account = boostingAccount
+ announceStatus.Visibility = boostedStatus.Visibility
+
+ err := suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{
+ APObjectType: ap.ActivityAnnounce,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: announceStatus,
+ ReceivingAccount: suite.testAccounts["local_account_1"],
+ })
+ suite.NoError(err)
+
+ // side effects should be triggered
+ // 1. status should have an ID, and be in the database
+ suite.NotEmpty(announceStatus.ID)
+ _, err = suite.db.GetStatusByID(context.Background(), announceStatus.ID)
+ suite.NoError(err)
+
+ // 2. a notification should exist for the announce
+ where := []db.Where{
+ {
+ Key: "status_id",
+ Value: announceStatus.ID,
+ },
+ }
+ notif := &gtsmodel.Notification{}
+ err = suite.db.GetWhere(context.Background(), where, notif)
+ suite.NoError(err)
+ suite.Equal(gtsmodel.NotificationReblog, notif.NotificationType)
+ suite.Equal(boostedStatus.AccountID, notif.TargetAccountID)
+ suite.Equal(announceStatus.AccountID, notif.OriginAccountID)
+ suite.Equal(announceStatus.ID, notif.StatusID)
+ suite.False(*notif.Read)
+}
+
+func (suite *FromFediAPITestSuite) TestProcessReplyMention() {
+ repliedAccount := suite.testAccounts["local_account_1"]
+ repliedStatus := suite.testStatuses["local_account_1_status_1"]
+ replyingAccount := suite.testAccounts["remote_account_1"]
+
+ replyingStatus := &gtsmodel.Status{
+ CreatedAt: time.Now(),
+ UpdatedAt: time.Now(),
+ URI: "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552",
+ URL: "http://fossbros-anonymous.io/@foss_satan/106221634728637552",
+ Content: `<p><span class="h-card"><a href="http://localhost:8080/@the_mighty_zork" class="u-url mention">@<span>the_mighty_zork</span></a></span> nice there it is:</p><p><a href="http://localhost:8080/users/the_mighty_zork/statuses/01F8MHAMCHF6Y650WCRSCP4WMY/activity" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://</span><span class="ellipsis">social.pixie.town/users/f0x/st</span><span class="invisible">atuses/106221628567855262/activity</span></a></p>`,
+ Mentions: []*gtsmodel.Mention{
+ {
+ TargetAccountURI: repliedAccount.URI,
+ NameString: "@the_mighty_zork@localhost:8080",
+ },
+ },
+ AccountID: replyingAccount.ID,
+ AccountURI: replyingAccount.URI,
+ InReplyToID: repliedStatus.ID,
+ InReplyToURI: repliedStatus.URI,
+ InReplyToAccountID: repliedAccount.ID,
+ Visibility: gtsmodel.VisibilityUnlocked,
+ ActivityStreamsType: ap.ObjectNote,
+ Federated: util.Ptr(true),
+ Boostable: util.Ptr(true),
+ Replyable: util.Ptr(true),
+ Likeable: util.Ptr(false),
+ }
+
+ wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), repliedAccount, stream.TimelineHome)
+ suite.NoError(errWithCode)
+
+ // id the status based on the time it was created
+ statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt)
+ suite.NoError(err)
+ replyingStatus.ID = statusID
+
+ err = suite.db.PutStatus(context.Background(), replyingStatus)
+ suite.NoError(err)
+
+ err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: replyingStatus,
+ ReceivingAccount: suite.testAccounts["local_account_1"],
+ })
+ suite.NoError(err)
+
+ // side effects should be triggered
+ // 1. status should be in the database
+ suite.NotEmpty(replyingStatus.ID)
+ _, err = suite.db.GetStatusByID(context.Background(), replyingStatus.ID)
+ suite.NoError(err)
+
+ // 2. a notification should exist for the mention
+ var notif gtsmodel.Notification
+ err = suite.db.GetWhere(context.Background(), []db.Where{
+ {Key: "status_id", Value: replyingStatus.ID},
+ }, &notif)
+ suite.NoError(err)
+ suite.Equal(gtsmodel.NotificationMention, notif.NotificationType)
+ suite.Equal(replyingStatus.InReplyToAccountID, notif.TargetAccountID)
+ suite.Equal(replyingStatus.AccountID, notif.OriginAccountID)
+ suite.Equal(replyingStatus.ID, notif.StatusID)
+ suite.False(*notif.Read)
+
+ // the notification should be streamed
+ var msg *stream.Message
+ select {
+ case msg = <-wssStream.Messages:
+ // fine
+ case <-time.After(5 * time.Second):
+ suite.FailNow("no message from wssStream")
+ }
+
+ suite.Equal(stream.EventTypeNotification, msg.Event)
+ suite.NotEmpty(msg.Payload)
+ suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)
+ notifStreamed := &apimodel.Notification{}
+ err = json.Unmarshal([]byte(msg.Payload), notifStreamed)
+ suite.NoError(err)
+ suite.Equal("mention", notifStreamed.Type)
+ suite.Equal(replyingAccount.ID, notifStreamed.Account.ID)
+}
+
+func (suite *FromFediAPITestSuite) TestProcessFave() {
+ favedAccount := suite.testAccounts["local_account_1"]
+ favedStatus := suite.testStatuses["local_account_1_status_1"]
+ favingAccount := suite.testAccounts["remote_account_1"]
+
+ wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), favedAccount, stream.TimelineNotifications)
+ suite.NoError(errWithCode)
+
+ fave := &gtsmodel.StatusFave{
+ ID: "01FGKJPXFTVQPG9YSSZ95ADS7Q",
+ CreatedAt: time.Now(),
+ UpdatedAt: time.Now(),
+ AccountID: favingAccount.ID,
+ Account: favingAccount,
+ TargetAccountID: favedAccount.ID,
+ TargetAccount: favedAccount,
+ StatusID: favedStatus.ID,
+ Status: favedStatus,
+ URI: favingAccount.URI + "/faves/aaaaaaaaaaaa",
+ }
+
+ err := suite.db.Put(context.Background(), fave)
+ suite.NoError(err)
+
+ err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{
+ APObjectType: ap.ActivityLike,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: fave,
+ ReceivingAccount: favedAccount,
+ })
+ suite.NoError(err)
+
+ // side effects should be triggered
+ // 1. a notification should exist for the fave
+ where := []db.Where{
+ {
+ Key: "status_id",
+ Value: favedStatus.ID,
+ },
+ {
+ Key: "origin_account_id",
+ Value: favingAccount.ID,
+ },
+ }
+
+ notif := &gtsmodel.Notification{}
+ err = suite.db.GetWhere(context.Background(), where, notif)
+ suite.NoError(err)
+ suite.Equal(gtsmodel.NotificationFave, notif.NotificationType)
+ suite.Equal(fave.TargetAccountID, notif.TargetAccountID)
+ suite.Equal(fave.AccountID, notif.OriginAccountID)
+ suite.Equal(fave.StatusID, notif.StatusID)
+ suite.False(*notif.Read)
+
+ // 2. a notification should be streamed
+ var msg *stream.Message
+ select {
+ case msg = <-wssStream.Messages:
+ // fine
+ case <-time.After(5 * time.Second):
+ suite.FailNow("no message from wssStream")
+ }
+ suite.Equal(stream.EventTypeNotification, msg.Event)
+ suite.NotEmpty(msg.Payload)
+ suite.EqualValues([]string{stream.TimelineNotifications}, msg.Stream)
+}
+
+// TestProcessFaveWithDifferentReceivingAccount ensures that when an account receives a fave that's for
+// another account in their AP inbox, a notification isn't streamed to the receiving account.
+//
+// This tests for an issue we were seeing where Misskey sends out faves to inboxes of people that don't own
+// the fave, but just follow the actor who received the fave.
+func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount() {
+ receivingAccount := suite.testAccounts["local_account_2"]
+ favedAccount := suite.testAccounts["local_account_1"]
+ favedStatus := suite.testStatuses["local_account_1_status_1"]
+ favingAccount := suite.testAccounts["remote_account_1"]
+
+ wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), receivingAccount, stream.TimelineHome)
+ suite.NoError(errWithCode)
+
+ fave := &gtsmodel.StatusFave{
+ ID: "01FGKJPXFTVQPG9YSSZ95ADS7Q",
+ CreatedAt: time.Now(),
+ UpdatedAt: time.Now(),
+ AccountID: favingAccount.ID,
+ Account: favingAccount,
+ TargetAccountID: favedAccount.ID,
+ TargetAccount: favedAccount,
+ StatusID: favedStatus.ID,
+ Status: favedStatus,
+ URI: favingAccount.URI + "/faves/aaaaaaaaaaaa",
+ }
+
+ err := suite.db.Put(context.Background(), fave)
+ suite.NoError(err)
+
+ err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{
+ APObjectType: ap.ActivityLike,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: fave,
+ ReceivingAccount: receivingAccount,
+ })
+ suite.NoError(err)
+
+ // side effects should be triggered
+ // 1. a notification should exist for the fave
+ where := []db.Where{
+ {
+ Key: "status_id",
+ Value: favedStatus.ID,
+ },
+ {
+ Key: "origin_account_id",
+ Value: favingAccount.ID,
+ },
+ }
+
+ notif := &gtsmodel.Notification{}
+ err = suite.db.GetWhere(context.Background(), where, notif)
+ suite.NoError(err)
+ suite.Equal(gtsmodel.NotificationFave, notif.NotificationType)
+ suite.Equal(fave.TargetAccountID, notif.TargetAccountID)
+ suite.Equal(fave.AccountID, notif.OriginAccountID)
+ suite.Equal(fave.StatusID, notif.StatusID)
+ suite.False(*notif.Read)
+
+ // 2. no notification should be streamed to the account that received the fave message, because they weren't the target
+ suite.Empty(wssStream.Messages)
+}
+
+func (suite *FromFediAPITestSuite) TestProcessAccountDelete() {
+ ctx := context.Background()
+
+ deletedAccount := suite.testAccounts["remote_account_1"]
+ receivingAccount := suite.testAccounts["local_account_1"]
+
+ // before doing the delete....
+ // make local_account_1 and remote_account_1 into mufos
+ zorkFollowSatan := &gtsmodel.Follow{
+ ID: "01FGRY72ASHBSET64353DPHK9T",
+ CreatedAt: time.Now().Add(-1 * time.Hour),
+ UpdatedAt: time.Now().Add(-1 * time.Hour),
+ AccountID: deletedAccount.ID,
+ TargetAccountID: receivingAccount.ID,
+ ShowReblogs: util.Ptr(true),
+ URI: fmt.Sprintf("%s/follows/01FGRY72ASHBSET64353DPHK9T", deletedAccount.URI),
+ Notify: util.Ptr(false),
+ }
+ err := suite.db.Put(ctx, zorkFollowSatan)
+ suite.NoError(err)
+
+ satanFollowZork := &gtsmodel.Follow{
+ ID: "01FGRYAVAWWPP926J175QGM0WV",
+ CreatedAt: time.Now().Add(-1 * time.Hour),
+ UpdatedAt: time.Now().Add(-1 * time.Hour),
+ AccountID: receivingAccount.ID,
+ TargetAccountID: deletedAccount.ID,
+ ShowReblogs: util.Ptr(true),
+ URI: fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", receivingAccount.URI),
+ Notify: util.Ptr(false),
+ }
+ err = suite.db.Put(ctx, satanFollowZork)
+ suite.NoError(err)
+
+ // now they are mufos!
+ err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{
+ APObjectType: ap.ObjectProfile,
+ APActivityType: ap.ActivityDelete,
+ GTSModel: deletedAccount,
+ ReceivingAccount: receivingAccount,
+ })
+ suite.NoError(err)
+
+ // local account 2 blocked foss_satan, that block should be gone now
+ testBlock := suite.testBlocks["local_account_2_block_remote_account_1"]
+ dbBlock := &gtsmodel.Block{}
+ err = suite.db.GetByID(ctx, testBlock.ID, dbBlock)
+ suite.ErrorIs(err, db.ErrNoEntries)
+
+ // the mufos should be gone now too
+ satanFollowsZork, err := suite.db.IsFollowing(ctx, deletedAccount.ID, receivingAccount.ID)
+ suite.NoError(err)
+ suite.False(satanFollowsZork)
+ zorkFollowsSatan, err := suite.db.IsFollowing(ctx, receivingAccount.ID, deletedAccount.ID)
+ suite.NoError(err)
+ suite.False(zorkFollowsSatan)
+
+ // no statuses from foss satan should be left in the database
+ if !testrig.WaitFor(func() bool {
+ s, err := suite.db.GetAccountStatuses(ctx, deletedAccount.ID, 0, false, false, "", "", false, false)
+ return s == nil && err == db.ErrNoEntries
+ }) {
+ suite.FailNow("timeout waiting for statuses to be deleted")
+ }
+
+ dbAccount, err := suite.db.GetAccountByID(ctx, deletedAccount.ID)
+ suite.NoError(err)
+
+ suite.Empty(dbAccount.Note)
+ suite.Empty(dbAccount.DisplayName)
+ suite.Empty(dbAccount.AvatarMediaAttachmentID)
+ suite.Empty(dbAccount.AvatarRemoteURL)
+ suite.Empty(dbAccount.HeaderMediaAttachmentID)
+ suite.Empty(dbAccount.HeaderRemoteURL)
+ suite.Empty(dbAccount.Reason)
+ suite.Empty(dbAccount.Fields)
+ suite.True(*dbAccount.HideCollections)
+ suite.False(*dbAccount.Discoverable)
+ suite.WithinDuration(time.Now(), dbAccount.SuspendedAt, 30*time.Second)
+ suite.Equal(dbAccount.ID, dbAccount.SuspensionOrigin)
+}
+
+func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() {
+ ctx := context.Background()
+
+ originAccount := suite.testAccounts["remote_account_1"]
+
+ // target is a locked account
+ targetAccount := suite.testAccounts["local_account_2"]
+
+ wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), targetAccount, stream.TimelineHome)
+ suite.NoError(errWithCode)
+
+ // put the follow request in the database as though it had passed through the federating db already
+ satanFollowRequestTurtle := &gtsmodel.FollowRequest{
+ ID: "01FGRYAVAWWPP926J175QGM0WV",
+ CreatedAt: time.Now(),
+ UpdatedAt: time.Now(),
+ AccountID: originAccount.ID,
+ Account: originAccount,
+ TargetAccountID: targetAccount.ID,
+ TargetAccount: targetAccount,
+ ShowReblogs: util.Ptr(true),
+ URI: fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", originAccount.URI),
+ Notify: util.Ptr(false),
+ }
+
+ err := suite.db.Put(ctx, satanFollowRequestTurtle)
+ suite.NoError(err)
+
+ err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{
+ APObjectType: ap.ActivityFollow,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: satanFollowRequestTurtle,
+ ReceivingAccount: targetAccount,
+ })
+ suite.NoError(err)
+
+ // a notification should be streamed
+ var msg *stream.Message
+ select {
+ case msg = <-wssStream.Messages:
+ // fine
+ case <-time.After(5 * time.Second):
+ suite.FailNow("no message from wssStream")
+ }
+ suite.Equal(stream.EventTypeNotification, msg.Event)
+ suite.NotEmpty(msg.Payload)
+ suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)
+ notif := &apimodel.Notification{}
+ err = json.Unmarshal([]byte(msg.Payload), notif)
+ suite.NoError(err)
+ suite.Equal("follow_request", notif.Type)
+ suite.Equal(originAccount.ID, notif.Account.ID)
+
+ // no messages should have been sent out, since we didn't need to federate an accept
+ suite.Empty(suite.httpClient.SentMessages)
+}
+
+func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() {
+ ctx := context.Background()
+
+ originAccount := suite.testAccounts["remote_account_1"]
+
+ // target is an unlocked account
+ targetAccount := suite.testAccounts["local_account_1"]
+
+ wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), targetAccount, stream.TimelineHome)
+ suite.NoError(errWithCode)
+
+ // put the follow request in the database as though it had passed through the federating db already
+ satanFollowRequestTurtle := &gtsmodel.FollowRequest{
+ ID: "01FGRYAVAWWPP926J175QGM0WV",
+ CreatedAt: time.Now(),
+ UpdatedAt: time.Now(),
+ AccountID: originAccount.ID,
+ Account: originAccount,
+ TargetAccountID: targetAccount.ID,
+ TargetAccount: targetAccount,
+ ShowReblogs: util.Ptr(true),
+ URI: fmt.Sprintf("%s/follows/01FGRYAVAWWPP926J175QGM0WV", originAccount.URI),
+ Notify: util.Ptr(false),
+ }
+
+ err := suite.db.Put(ctx, satanFollowRequestTurtle)
+ suite.NoError(err)
+
+ err = suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{
+ APObjectType: ap.ActivityFollow,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: satanFollowRequestTurtle,
+ ReceivingAccount: targetAccount,
+ })
+ suite.NoError(err)
+
+ // an accept message should be sent to satan's inbox
+ var sent [][]byte
+ if !testrig.WaitFor(func() bool {
+ sentI, ok := suite.httpClient.SentMessages.Load(*originAccount.SharedInboxURI)
+ if ok {
+ sent, ok = sentI.([][]byte)
+ if !ok {
+ panic("SentMessages entry was not []byte")
+ }
+ return true
+ }
+ return false
+ }) {
+ suite.FailNow("timed out waiting for message")
+ }
+
+ accept := &struct {
+ Actor string `json:"actor"`
+ ID string `json:"id"`
+ Object struct {
+ Actor string `json:"actor"`
+ ID string `json:"id"`
+ Object string `json:"object"`
+ To string `json:"to"`
+ Type string `json:"type"`
+ }
+ To string `json:"to"`
+ Type string `json:"type"`
+ }{}
+ err = json.Unmarshal(sent[0], accept)
+ suite.NoError(err)
+
+ suite.Equal(targetAccount.URI, accept.Actor)
+ suite.Equal(originAccount.URI, accept.Object.Actor)
+ suite.Equal(satanFollowRequestTurtle.URI, accept.Object.ID)
+ suite.Equal(targetAccount.URI, accept.Object.Object)
+ suite.Equal(targetAccount.URI, accept.Object.To)
+ suite.Equal("Follow", accept.Object.Type)
+ suite.Equal(originAccount.URI, accept.To)
+ suite.Equal("Accept", accept.Type)
+
+ // a notification should be streamed
+ var msg *stream.Message
+ select {
+ case msg = <-wssStream.Messages:
+ // fine
+ case <-time.After(5 * time.Second):
+ suite.FailNow("no message from wssStream")
+ }
+ suite.Equal(stream.EventTypeNotification, msg.Event)
+ suite.NotEmpty(msg.Payload)
+ suite.EqualValues([]string{stream.TimelineHome}, msg.Stream)
+ notif := &apimodel.Notification{}
+ err = json.Unmarshal([]byte(msg.Payload), notif)
+ suite.NoError(err)
+ suite.Equal("follow", notif.Type)
+ suite.Equal(originAccount.ID, notif.Account.ID)
+}
+
+// TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor.
+func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() {
+ ctx := context.Background()
+
+ receivingAccount := suite.testAccounts["local_account_1"]
+ statusCreator := suite.testAccounts["remote_account_2"]
+
+ err := suite.processor.Workers().ProcessFromFediAPI(ctx, messages.FromFediAPI{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: nil, // gtsmodel is nil because this is a forwarded status -- we want to dereference it using the iri
+ ReceivingAccount: receivingAccount,
+ APIri: testrig.URLMustParse("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1"),
+ })
+ suite.NoError(err)
+
+ // status should now be in the database, attributed to remote_account_2
+ s, err := suite.db.GetStatusByURI(context.Background(), "http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1")
+ suite.NoError(err)
+ suite.Equal(statusCreator.URI, s.AccountURI)
+}
+
+func TestFromFederatorTestSuite(t *testing.T) {
+ suite.Run(t, &FromFediAPITestSuite{})
+}
diff --git a/internal/processing/workers/surface.go b/internal/processing/workers/surface.go
new file mode 100644
index 000000000..a3cf9a3e1
--- /dev/null
+++ b/internal/processing/workers/surface.go
@@ -0,0 +1,40 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "github.com/superseriousbusiness/gotosocial/internal/email"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/stream"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/visibility"
+)
+
+// surface wraps functions for 'surfacing' the result
+// of processing a message, eg:
+// - timelining a status
+// - removing a status from timelines
+// - sending a notification to a user
+// - sending an email
+type surface struct {
+ state *state.State
+ tc typeutils.TypeConverter
+ stream *stream.Processor
+ filter *visibility.Filter
+ emailSender email.Sender
+}
diff --git a/internal/processing/workers/surfaceemail.go b/internal/processing/workers/surfaceemail.go
new file mode 100644
index 000000000..a6c97f48f
--- /dev/null
+++ b/internal/processing/workers/surfaceemail.go
@@ -0,0 +1,160 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "context"
+ "errors"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/email"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/uris"
+)
+
+func (s *surface) emailReportOpened(ctx context.Context, report *gtsmodel.Report) error {
+ instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
+ if err != nil {
+ return gtserror.Newf("error getting instance: %w", err)
+ }
+
+ toAddresses, err := s.state.DB.GetInstanceModeratorAddresses(ctx)
+ if err != nil {
+ if errors.Is(err, db.ErrNoEntries) {
+ // No registered moderator addresses.
+ return nil
+ }
+ return gtserror.Newf("error getting instance moderator addresses: %w", err)
+ }
+
+ if err := s.state.DB.PopulateReport(ctx, report); err != nil {
+ return gtserror.Newf("error populating report: %w", err)
+ }
+
+ reportData := email.NewReportData{
+ InstanceURL: instance.URI,
+ InstanceName: instance.Title,
+ ReportURL: instance.URI + "/settings/admin/reports/" + report.ID,
+ ReportDomain: report.Account.Domain,
+ ReportTargetDomain: report.TargetAccount.Domain,
+ }
+
+ if err := s.emailSender.SendNewReportEmail(toAddresses, reportData); err != nil {
+ return gtserror.Newf("error emailing instance moderators: %w", err)
+ }
+
+ return nil
+}
+
+func (s *surface) emailReportClosed(ctx context.Context, report *gtsmodel.Report) error {
+ user, err := s.state.DB.GetUserByAccountID(ctx, report.Account.ID)
+ if err != nil {
+ return gtserror.Newf("db error getting user: %w", err)
+ }
+
+ if user.ConfirmedAt.IsZero() ||
+ !*user.Approved ||
+ *user.Disabled ||
+ user.Email == "" {
+ // Only email users who:
+ // - are confirmed
+ // - are approved
+ // - are not disabled
+ // - have an email address
+ return nil
+ }
+
+ instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
+ if err != nil {
+ return gtserror.Newf("db error getting instance: %w", err)
+ }
+
+ if err := s.state.DB.PopulateReport(ctx, report); err != nil {
+ return gtserror.Newf("error populating report: %w", err)
+ }
+
+ reportClosedData := email.ReportClosedData{
+ Username: report.Account.Username,
+ InstanceURL: instance.URI,
+ InstanceName: instance.Title,
+ ReportTargetUsername: report.TargetAccount.Username,
+ ReportTargetDomain: report.TargetAccount.Domain,
+ ActionTakenComment: report.ActionTaken,
+ }
+
+ return s.emailSender.SendReportClosedEmail(user.Email, reportClosedData)
+}
+
+func (s *surface) emailPleaseConfirm(ctx context.Context, user *gtsmodel.User, username string) error {
+ if user.UnconfirmedEmail == "" ||
+ user.UnconfirmedEmail == user.Email {
+ // User has already confirmed this
+ // email address; nothing to do.
+ return nil
+ }
+
+ instance, err := s.state.DB.GetInstance(ctx, config.GetHost())
+ if err != nil {
+ return gtserror.Newf("db error getting instance: %w", err)
+ }
+
+ // We need a token and a link for the
+ // user to click on. We'll use a uuid
+ // as our token since it's secure enough
+ // for this purpose.
+ var (
+ confirmToken = uuid.NewString()
+ confirmLink = uris.GenerateURIForEmailConfirm(confirmToken)
+ )
+
+ // Assemble email contents and send the email.
+ if err := s.emailSender.SendConfirmEmail(
+ user.UnconfirmedEmail,
+ email.ConfirmData{
+ Username: username,
+ InstanceURL: instance.URI,
+ InstanceName: instance.Title,
+ ConfirmLink: confirmLink,
+ },
+ ); err != nil {
+ return err
+ }
+
+ // Email sent, update the user entry
+ // with the new confirmation token.
+ now := time.Now()
+ user.ConfirmationToken = confirmToken
+ user.ConfirmationSentAt = now
+ user.LastEmailedAt = now
+
+ if err := s.state.DB.UpdateUser(
+ ctx,
+ user,
+ "confirmation_token",
+ "confirmation_sent_at",
+ "last_emailed_at",
+ ); err != nil {
+ return gtserror.Newf("error updating user entry after email sent: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/workers/surfacenotify.go b/internal/processing/workers/surfacenotify.go
new file mode 100644
index 000000000..00e1205e6
--- /dev/null
+++ b/internal/processing/workers/surfacenotify.go
@@ -0,0 +1,221 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "context"
+ "errors"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+)
+
+// notifyMentions notifies each targeted account in
+// the given mentions that they have a new mention.
+func (s *surface) notifyMentions(
+ ctx context.Context,
+ mentions []*gtsmodel.Mention,
+) error {
+ var errs = gtserror.NewMultiError(len(mentions))
+
+ for _, mention := range mentions {
+ if err := s.notify(
+ ctx,
+ gtsmodel.NotificationMention,
+ mention.TargetAccountID,
+ mention.OriginAccountID,
+ mention.StatusID,
+ ); err != nil {
+ errs.Append(err)
+ }
+ }
+
+ return errs.Combine()
+}
+
+// notifyFollowRequest notifies the target of the given
+// follow request that they have a new follow request.
+func (s *surface) notifyFollowRequest(
+ ctx context.Context,
+ followRequest *gtsmodel.FollowRequest,
+) error {
+ return s.notify(
+ ctx,
+ gtsmodel.NotificationFollowRequest,
+ followRequest.TargetAccountID,
+ followRequest.AccountID,
+ "",
+ )
+}
+
+// notifyFollow notifies the target of the given follow that
+// they have a new follow. It will also remove any previous
+// notification of a follow request, essentially replacing
+// that notification.
+func (s *surface) notifyFollow(
+ ctx context.Context,
+ follow *gtsmodel.Follow,
+) error {
+ // Check if previous follow req notif exists.
+ prevNotif, err := s.state.DB.GetNotification(
+ gtscontext.SetBarebones(ctx),
+ gtsmodel.NotificationFollowRequest,
+ follow.TargetAccountID,
+ follow.AccountID,
+ "",
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("db error checking for previous follow request notification: %w", err)
+ }
+
+ if prevNotif != nil {
+ // Previous notif existed, delete it.
+ if err := s.state.DB.DeleteNotificationByID(ctx, prevNotif.ID); err != nil {
+ return gtserror.Newf("db error removing previous follow request notification %s: %w", prevNotif.ID, err)
+ }
+ }
+
+ // Now notify the follow itself.
+ return s.notify(
+ ctx,
+ gtsmodel.NotificationFollow,
+ follow.TargetAccountID,
+ follow.AccountID,
+ "",
+ )
+}
+
+// notifyFave notifies the target of the given
+// fave that their status has been liked/faved.
+func (s *surface) notifyFave(
+ ctx context.Context,
+ fave *gtsmodel.StatusFave,
+) error {
+ if fave.TargetAccountID == fave.AccountID {
+ // Self-fave, nothing to do.
+ return nil
+ }
+
+ return s.notify(
+ ctx,
+ gtsmodel.NotificationFave,
+ fave.TargetAccountID,
+ fave.AccountID,
+ fave.StatusID,
+ )
+}
+
+// notifyAnnounce notifies the status boost target
+// account that their status has been boosted.
+func (s *surface) notifyAnnounce(
+ ctx context.Context,
+ status *gtsmodel.Status,
+) error {
+ if status.BoostOfID == "" {
+ // Not a boost, nothing to do.
+ return nil
+ }
+
+ if status.BoostOfAccountID == status.AccountID {
+ // Self-boost, nothing to do.
+ return nil
+ }
+
+ return s.notify(
+ ctx,
+ gtsmodel.NotificationReblog,
+ status.BoostOfAccountID,
+ status.AccountID,
+ status.ID,
+ )
+}
+
+// notify creates, inserts, and streams a new
+// notification to the target account if it
+// doesn't yet exist with the given parameters.
+//
+// It filters out non-local target accounts, so
+// it is safe to pass all sorts of notification
+// targets into this function without filtering
+// for non-local first.
+//
+// targetAccountID and originAccountID must be
+// set, but statusID can be an empty string.
+func (s *surface) notify(
+ ctx context.Context,
+ notificationType gtsmodel.NotificationType,
+ targetAccountID string,
+ originAccountID string,
+ statusID string,
+) error {
+ targetAccount, err := s.state.DB.GetAccountByID(ctx, targetAccountID)
+ if err != nil {
+ return gtserror.Newf("error getting target account %s: %w", targetAccountID, err)
+ }
+
+ if !targetAccount.IsLocal() {
+ // Nothing to do.
+ return nil
+ }
+
+ // Make sure a notification doesn't
+ // already exist with these params.
+ if _, err := s.state.DB.GetNotification(
+ gtscontext.SetBarebones(ctx),
+ notificationType,
+ targetAccountID,
+ originAccountID,
+ statusID,
+ ); err == nil {
+ // Notification exists;
+ // nothing to do.
+ return nil
+ } else if !errors.Is(err, db.ErrNoEntries) {
+ // Real error.
+ return gtserror.Newf("error checking existence of notification: %w", err)
+ }
+
+ // Notification doesn't yet exist, so
+ // we need to create + store one.
+ notif := &gtsmodel.Notification{
+ ID: id.NewULID(),
+ NotificationType: notificationType,
+ TargetAccountID: targetAccountID,
+ OriginAccountID: originAccountID,
+ StatusID: statusID,
+ }
+
+ if err := s.state.DB.PutNotification(ctx, notif); err != nil {
+ return gtserror.Newf("error putting notification in database: %w", err)
+ }
+
+ // Stream notification to the user.
+ apiNotif, err := s.tc.NotificationToAPINotification(ctx, notif)
+ if err != nil {
+ return gtserror.Newf("error converting notification to api representation: %w", err)
+ }
+
+ if err := s.stream.Notify(apiNotif, targetAccount); err != nil {
+ return gtserror.Newf("error streaming notification to account: %w", err)
+ }
+
+ return nil
+}
diff --git a/internal/processing/workers/surfacetimeline.go b/internal/processing/workers/surfacetimeline.go
new file mode 100644
index 000000000..827cbe2f8
--- /dev/null
+++ b/internal/processing/workers/surfacetimeline.go
@@ -0,0 +1,401 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "context"
+ "errors"
+
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+ "github.com/superseriousbusiness/gotosocial/internal/timeline"
+)
+
+// timelineAndNotifyStatus inserts the given status into the HOME
+// and LIST timelines of accounts that follow the status author.
+//
+// It will also handle notifications for any mentions attached to
+// the account, and notifications for any local accounts that want
+// to know when this account posts.
+func (s *surface) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error {
+ // Ensure status fully populated; including account, mentions, etc.
+ if err := s.state.DB.PopulateStatus(ctx, status); err != nil {
+ return gtserror.Newf("error populating status with id %s: %w", status.ID, err)
+ }
+
+ // Get all local followers of the account that posted the status.
+ follows, err := s.state.DB.GetAccountLocalFollowers(ctx, status.AccountID)
+ if err != nil {
+ return gtserror.Newf("error getting local followers of account %s: %w", status.AccountID, err)
+ }
+
+ // If the poster is also local, add a fake entry for them
+ // so they can see their own status in their timeline.
+ if status.Account.IsLocal() {
+ follows = append(follows, &gtsmodel.Follow{
+ AccountID: status.AccountID,
+ Account: status.Account,
+ Notify: func() *bool { b := false; return &b }(), // Account shouldn't notify itself.
+ ShowReblogs: func() *bool { b := true; return &b }(), // Account should show own reblogs.
+ })
+ }
+
+ // Timeline the status for each local follower of this account.
+ // This will also handle notifying any followers with notify
+ // set to true on their follow.
+ if err := s.timelineAndNotifyStatusForFollowers(ctx, status, follows); err != nil {
+ return gtserror.Newf("error timelining status %s for followers: %w", status.ID, err)
+ }
+
+ // Notify each local account that's mentioned by this status.
+ if err := s.notifyMentions(ctx, status.Mentions); err != nil {
+ return gtserror.Newf("error notifying status mentions for status %s: %w", status.ID, err)
+ }
+
+ return nil
+}
+
+// timelineAndNotifyStatusForFollowers iterates through the given
+// slice of followers of the account that posted the given status,
+// adding the status to list timelines + home timelines of each
+// follower, as appropriate, and notifying each follower of the
+// new status, if the status is eligible for notification.
+func (s *surface) timelineAndNotifyStatusForFollowers(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ follows []*gtsmodel.Follow,
+) error {
+ var (
+ errs = new(gtserror.MultiError)
+ boost = status.BoostOfID != ""
+ reply = status.InReplyToURI != ""
+ )
+
+ for _, follow := range follows {
+ // Do an initial rough-grained check to see if the
+ // status is timelineable for this follower at all
+ // based on its visibility and who it replies to etc.
+ timelineable, err := s.filter.StatusHomeTimelineable(
+ ctx, follow.Account, status,
+ )
+ if err != nil {
+ errs.Appendf("error checking status %s hometimelineability: %w", status.ID, err)
+ continue
+ }
+
+ if !timelineable {
+ // Nothing to do.
+ continue
+ }
+
+ if boost && !*follow.ShowReblogs {
+ // Status is a boost, but the owner of
+ // this follow doesn't want to see boosts
+ // from this account. We can safely skip
+ // everything, then, because we also know
+ // that the follow owner won't want to be
+ // have the status put in any list timelines,
+ // or be notified about the status either.
+ continue
+ }
+
+ // Add status to any relevant lists
+ // for this follow, if applicable.
+ s.listTimelineStatusForFollow(
+ ctx,
+ status,
+ follow,
+ errs,
+ )
+
+ // Add status to home timeline for owner
+ // of this follow, if applicable.
+ homeTimelined, err := s.timelineStatus(
+ ctx,
+ s.state.Timelines.Home.IngestOne,
+ follow.AccountID, // home timelines are keyed by account ID
+ follow.Account,
+ status,
+ stream.TimelineHome,
+ )
+ if err != nil {
+ errs.Appendf("error home timelining status: %w", err)
+ continue
+ }
+
+ if !homeTimelined {
+ // If status wasn't added to home
+ // timeline, we shouldn't notify it.
+ continue
+ }
+
+ if !*follow.Notify {
+ // This follower doesn't have notifs
+ // set for this account's new posts.
+ continue
+ }
+
+ if boost || reply {
+ // Don't notify for boosts or replies.
+ continue
+ }
+
+ // If we reach here, we know:
+ //
+ // - This status is hometimelineable.
+ // - This status was added to the home timeline for this follower.
+ // - This follower wants to be notified when this account posts.
+ // - This is a top-level post (not a reply or boost).
+ //
+ // That means we can officially notify this one.
+ if err := s.notify(
+ ctx,
+ gtsmodel.NotificationStatus,
+ follow.AccountID,
+ status.AccountID,
+ status.ID,
+ ); err != nil {
+ errs.Appendf("error notifying account %s about new status: %w", follow.AccountID, err)
+ }
+ }
+
+ return errs.Combine()
+}
+
+// listTimelineStatusForFollow puts the given status
+// in any eligible lists owned by the given follower.
+func (s *surface) listTimelineStatusForFollow(
+ ctx context.Context,
+ status *gtsmodel.Status,
+ follow *gtsmodel.Follow,
+ errs *gtserror.MultiError,
+) {
+ // To put this status in appropriate list timelines,
+ // we need to get each listEntry that pertains to
+ // this follow. Then, we want to iterate through all
+ // those list entries, and add the status to the list
+ // that the entry belongs to if it meets criteria for
+ // inclusion in the list.
+
+ // Get every list entry that targets this follow's ID.
+ listEntries, err := s.state.DB.GetListEntriesForFollowID(
+ // We only need the list IDs.
+ gtscontext.SetBarebones(ctx),
+ follow.ID,
+ )
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ errs.Appendf("error getting list entries: %w", err)
+ return
+ }
+
+ // Check eligibility for each list entry (if any).
+ for _, listEntry := range listEntries {
+ eligible, err := s.listEligible(ctx, listEntry, status)
+ if err != nil {
+ errs.Appendf("error checking list eligibility: %w", err)
+ continue
+ }
+
+ if !eligible {
+ // Don't add this.
+ continue
+ }
+
+ // At this point we are certain this status
+ // should be included in the timeline of the
+ // list that this list entry belongs to.
+ if _, err := s.timelineStatus(
+ ctx,
+ s.state.Timelines.List.IngestOne,
+ listEntry.ListID, // list timelines are keyed by list ID
+ follow.Account,
+ status,
+ stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list
+ ); err != nil {
+ errs.Appendf("error adding status to timeline for list %s: %w", listEntry.ListID, err)
+ // implicit continue
+ }
+ }
+}
+
+// listEligible checks if the given status is eligible
+// for inclusion in the list that that the given listEntry
+// belongs to, based on the replies policy of the list.
+func (s *surface) listEligible(
+ ctx context.Context,
+ listEntry *gtsmodel.ListEntry,
+ status *gtsmodel.Status,
+) (bool, error) {
+ if status.InReplyToURI == "" {
+ // If status is not a reply,
+ // then it's all gravy baby.
+ return true, nil
+ }
+
+ if status.InReplyToID == "" {
+ // Status is a reply but we don't
+ // have the replied-to account!
+ return false, nil
+ }
+
+ // Status is a reply to a known account.
+ // We need to fetch the list that this
+ // entry belongs to, in order to check
+ // the list's replies policy.
+ list, err := s.state.DB.GetListByID(
+ ctx, listEntry.ListID,
+ )
+ if err != nil {
+ err := gtserror.Newf("db error getting list %s: %w", listEntry.ListID, err)
+ return false, err
+ }
+
+ switch list.RepliesPolicy {
+ case gtsmodel.RepliesPolicyNone:
+ // This list should not show
+ // replies at all, so skip it.
+ return false, nil
+
+ case gtsmodel.RepliesPolicyList:
+ // This list should show replies
+ // only to other people in the list.
+ //
+ // Check if replied-to account is
+ // also included in this list.
+ includes, err := s.state.DB.ListIncludesAccount(
+ ctx,
+ list.ID,
+ status.InReplyToAccountID,
+ )
+
+ if err != nil {
+ err := gtserror.Newf(
+ "db error checking if account %s in list %s: %w",
+ status.InReplyToAccountID, listEntry.ListID, err,
+ )
+ return false, err
+ }
+
+ return includes, nil
+
+ case gtsmodel.RepliesPolicyFollowed:
+ // This list should show replies
+ // only to people that the list
+ // owner also follows.
+ //
+ // Check if replied-to account is
+ // followed by list owner account.
+ follows, err := s.state.DB.IsFollowing(
+ ctx,
+ list.AccountID,
+ status.InReplyToAccountID,
+ )
+ if err != nil {
+ err := gtserror.Newf(
+ "db error checking if account %s is followed by %s: %w",
+ status.InReplyToAccountID, list.AccountID, err,
+ )
+ return false, err
+ }
+
+ return follows, nil
+
+ default:
+ // HUH??
+ err := gtserror.Newf(
+ "reply policy '%s' not recognized on list %s",
+ list.RepliesPolicy, list.ID,
+ )
+ return false, err
+ }
+}
+
+// timelineStatus uses the provided ingest function to put the given
+// status in a timeline with the given ID, if it's timelineable.
+//
+// If the status was inserted into the timeline, true will be returned
+// + it will also be streamed to the user using the given streamType.
+func (s *surface) timelineStatus(
+ ctx context.Context,
+ ingest func(context.Context, string, timeline.Timelineable) (bool, error),
+ timelineID string,
+ account *gtsmodel.Account,
+ status *gtsmodel.Status,
+ streamType string,
+) (bool, error) {
+ // Ingest status into given timeline using provided function.
+ if inserted, err := ingest(ctx, timelineID, status); err != nil {
+ err = gtserror.Newf("error ingesting status %s: %w", status.ID, err)
+ return false, err
+ } else if !inserted {
+ // Nothing more to do.
+ return false, nil
+ }
+
+ // The status was inserted so stream it to the user.
+ apiStatus, err := s.tc.StatusToAPIStatus(ctx, status, account)
+ if err != nil {
+ err = gtserror.Newf("error converting status %s to frontend representation: %w", status.ID, err)
+ return true, err
+ }
+
+ if err := s.stream.Update(apiStatus, account, []string{streamType}); err != nil {
+ err = gtserror.Newf("error streaming update for status %s: %w", status.ID, err)
+ return true, err
+ }
+
+ return true, nil
+}
+
+// deleteStatusFromTimelines completely removes the given status from all timelines.
+// It will also stream deletion of the status to all open streams.
+func (s *surface) deleteStatusFromTimelines(ctx context.Context, statusID string) error {
+ if err := s.state.Timelines.Home.WipeItemFromAllTimelines(ctx, statusID); err != nil {
+ return err
+ }
+
+ if err := s.state.Timelines.List.WipeItemFromAllTimelines(ctx, statusID); err != nil {
+ return err
+ }
+
+ return s.stream.Delete(statusID)
+}
+
+// invalidateStatusFromTimelines does cache invalidation on the given status by
+// unpreparing it from all timelines, forcing it to be prepared again (with updated
+// stats, boost counts, etc) next time it's fetched by the timeline owner. This goes
+// both for the status itself, and for any boosts of the status.
+func (s *surface) invalidateStatusFromTimelines(ctx context.Context, statusID string) {
+ if err := s.state.Timelines.Home.UnprepareItemFromAllTimelines(ctx, statusID); err != nil {
+ log.
+ WithContext(ctx).
+ WithField("statusID", statusID).
+ Errorf("error unpreparing status from home timelines: %v", err)
+ }
+
+ if err := s.state.Timelines.List.UnprepareItemFromAllTimelines(ctx, statusID); err != nil {
+ log.
+ WithContext(ctx).
+ WithField("statusID", statusID).
+ Errorf("error unpreparing status from list timelines: %v", err)
+ }
+}
diff --git a/internal/processing/workers/wipestatus.go b/internal/processing/workers/wipestatus.go
new file mode 100644
index 000000000..0891d9e24
--- /dev/null
+++ b/internal/processing/workers/wipestatus.go
@@ -0,0 +1,119 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "context"
+
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/media"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+)
+
+// wipeStatus encapsulates common logic used to totally delete a status
+// + all its attachments, notifications, boosts, and timeline entries.
+type wipeStatus func(context.Context, *gtsmodel.Status, bool) error
+
+// wipeStatusF returns a wipeStatus util function.
+func wipeStatusF(state *state.State, media *media.Processor, surface *surface) wipeStatus {
+ return func(
+ ctx context.Context,
+ statusToDelete *gtsmodel.Status,
+ deleteAttachments bool,
+ ) error {
+ errs := new(gtserror.MultiError)
+
+ // Either delete all attachments for this status,
+ // or simply unattach + clean them separately later.
+ //
+ // Reason to unattach rather than delete is that
+ // the poster might want to reattach them to another
+ // status immediately (in case of delete + redraft)
+ if deleteAttachments {
+ // todo:state.DB.DeleteAttachmentsForStatus
+ for _, a := range statusToDelete.AttachmentIDs {
+ if err := media.Delete(ctx, a); err != nil {
+ errs.Appendf("error deleting media: %w", err)
+ }
+ }
+ } else {
+ // todo:state.DB.UnattachAttachmentsForStatus
+ for _, a := range statusToDelete.AttachmentIDs {
+ if _, err := media.Unattach(ctx, statusToDelete.Account, a); err != nil {
+ errs.Appendf("error unattaching media: %w", err)
+ }
+ }
+ }
+
+ // delete all mention entries generated by this status
+ // todo:state.DB.DeleteMentionsForStatus
+ for _, id := range statusToDelete.MentionIDs {
+ if err := state.DB.DeleteMentionByID(ctx, id); err != nil {
+ errs.Appendf("error deleting status mention: %w", err)
+ }
+ }
+
+ // delete all notification entries generated by this status
+ if err := state.DB.DeleteNotificationsForStatus(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status notifications: %w", err)
+ }
+
+ // delete all bookmarks that point to this status
+ if err := state.DB.DeleteStatusBookmarksForStatus(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status bookmarks: %w", err)
+ }
+
+ // delete all faves of this status
+ if err := state.DB.DeleteStatusFavesForStatus(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status faves: %w", err)
+ }
+
+ // delete all boosts for this status + remove them from timelines
+ boosts, err := state.DB.GetStatusBoosts(
+ // we MUST set a barebones context here,
+ // as depending on where it came from the
+ // original BoostOf may already be gone.
+ gtscontext.SetBarebones(ctx),
+ statusToDelete.ID)
+ if err != nil {
+ errs.Appendf("error fetching status boosts: %w", err)
+ }
+ for _, b := range boosts {
+ if err := surface.deleteStatusFromTimelines(ctx, b.ID); err != nil {
+ errs.Appendf("error deleting boost from timelines: %w", err)
+ }
+ if err := state.DB.DeleteStatusByID(ctx, b.ID); err != nil {
+ errs.Appendf("error deleting boost: %w", err)
+ }
+ }
+
+ // delete this status from any and all timelines
+ if err := surface.deleteStatusFromTimelines(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status from timelines: %w", err)
+ }
+
+ // finally, delete the status itself
+ if err := state.DB.DeleteStatusByID(ctx, statusToDelete.ID); err != nil {
+ errs.Appendf("error deleting status: %w", err)
+ }
+
+ return errs.Combine()
+ }
+}
diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go
new file mode 100644
index 000000000..24b18a405
--- /dev/null
+++ b/internal/processing/workers/workers.go
@@ -0,0 +1,92 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers
+
+import (
+ "github.com/superseriousbusiness/gotosocial/internal/email"
+ "github.com/superseriousbusiness/gotosocial/internal/federation"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/media"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/stream"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/visibility"
+ "github.com/superseriousbusiness/gotosocial/internal/workers"
+)
+
+type Processor struct {
+ workers *workers.Workers
+ clientAPI *clientAPI
+ fediAPI *fediAPI
+}
+
+func New(
+ state *state.State,
+ federator federation.Federator,
+ tc typeutils.TypeConverter,
+ filter *visibility.Filter,
+ emailSender email.Sender,
+ account *account.Processor,
+ media *media.Processor,
+ stream *stream.Processor,
+) Processor {
+ // Init surface logic
+ // wrapper struct.
+ surface := &surface{
+ state: state,
+ tc: tc,
+ stream: stream,
+ filter: filter,
+ emailSender: emailSender,
+ }
+
+ // Init federate logic
+ // wrapper struct.
+ federate := &federate{
+ Federator: federator,
+ state: state,
+ tc: tc,
+ }
+
+ // Init shared logic wipe
+ // status util func.
+ wipeStatus := wipeStatusF(
+ state,
+ media,
+ surface,
+ )
+
+ return Processor{
+ workers: &state.Workers,
+ clientAPI: &clientAPI{
+ state: state,
+ tc: tc,
+ surface: surface,
+ federate: federate,
+ wipeStatus: wipeStatus,
+ account: account,
+ },
+ fediAPI: &fediAPI{
+ state: state,
+ surface: surface,
+ federate: federate,
+ wipeStatus: wipeStatus,
+ account: account,
+ },
+ }
+}
diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go
new file mode 100644
index 000000000..2d5a7f5d3
--- /dev/null
+++ b/internal/processing/workers/workers_test.go
@@ -0,0 +1,169 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package workers_test
+
+import (
+ "context"
+
+ "github.com/stretchr/testify/suite"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/email"
+ "github.com/superseriousbusiness/gotosocial/internal/federation"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/media"
+ "github.com/superseriousbusiness/gotosocial/internal/oauth"
+ "github.com/superseriousbusiness/gotosocial/internal/processing"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
+ "github.com/superseriousbusiness/gotosocial/internal/stream"
+ "github.com/superseriousbusiness/gotosocial/internal/transport"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/visibility"
+ "github.com/superseriousbusiness/gotosocial/testrig"
+)
+
+type WorkersTestSuite struct {
+ // standard suite interfaces
+ suite.Suite
+ db db.DB
+ storage *storage.Driver
+ state state.State
+ mediaManager *media.Manager
+ typeconverter typeutils.TypeConverter
+ httpClient *testrig.MockHTTPClient
+ transportController transport.Controller
+ federator federation.Federator
+ oauthServer oauth.Server
+ emailSender email.Sender
+
+ // standard suite models
+ testTokens map[string]*gtsmodel.Token
+ testClients map[string]*gtsmodel.Client
+ testApplications map[string]*gtsmodel.Application
+ testUsers map[string]*gtsmodel.User
+ testAccounts map[string]*gtsmodel.Account
+ testFollows map[string]*gtsmodel.Follow
+ testAttachments map[string]*gtsmodel.MediaAttachment
+ testStatuses map[string]*gtsmodel.Status
+ testTags map[string]*gtsmodel.Tag
+ testMentions map[string]*gtsmodel.Mention
+ testAutheds map[string]*oauth.Auth
+ testBlocks map[string]*gtsmodel.Block
+ testActivities map[string]testrig.ActivityWithSignature
+ testLists map[string]*gtsmodel.List
+ testListEntries map[string]*gtsmodel.ListEntry
+
+ processor *processing.Processor
+}
+
+func (suite *WorkersTestSuite) SetupSuite() {
+ suite.testTokens = testrig.NewTestTokens()
+ suite.testClients = testrig.NewTestClients()
+ suite.testApplications = testrig.NewTestApplications()
+ suite.testUsers = testrig.NewTestUsers()
+ suite.testAccounts = testrig.NewTestAccounts()
+ suite.testFollows = testrig.NewTestFollows()
+ suite.testAttachments = testrig.NewTestAttachments()
+ suite.testStatuses = testrig.NewTestStatuses()
+ suite.testTags = testrig.NewTestTags()
+ suite.testMentions = testrig.NewTestMentions()
+ suite.testAutheds = map[string]*oauth.Auth{
+ "local_account_1": {
+ Application: suite.testApplications["local_account_1"],
+ User: suite.testUsers["local_account_1"],
+ Account: suite.testAccounts["local_account_1"],
+ },
+ }
+ suite.testBlocks = testrig.NewTestBlocks()
+ suite.testLists = testrig.NewTestLists()
+ suite.testListEntries = testrig.NewTestListEntries()
+}
+
+func (suite *WorkersTestSuite) SetupTest() {
+ suite.state.Caches.Init()
+ testrig.StartWorkers(&suite.state)
+
+ testrig.InitTestConfig()
+ testrig.InitTestLog()
+
+ suite.db = testrig.NewTestDB(&suite.state)
+ suite.state.DB = suite.db
+ suite.testActivities = testrig.NewTestActivities(suite.testAccounts)
+ suite.storage = testrig.NewInMemoryStorage()
+ suite.state.Storage = suite.storage
+ suite.typeconverter = testrig.NewTestTypeConverter(suite.db)
+
+ testrig.StartTimelines(
+ &suite.state,
+ visibility.NewFilter(&suite.state),
+ suite.typeconverter,
+ )
+
+ suite.httpClient = testrig.NewMockHTTPClient(nil, "../../../testrig/media")
+ suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople()
+ suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses()
+
+ suite.transportController = testrig.NewTestTransportController(&suite.state, suite.httpClient)
+ suite.mediaManager = testrig.NewTestMediaManager(&suite.state)
+ suite.federator = testrig.NewTestFederator(&suite.state, suite.transportController, suite.mediaManager)
+ suite.oauthServer = testrig.NewTestOauthServer(suite.db)
+ suite.emailSender = testrig.NewEmailSender("../../../web/template/", nil)
+
+ suite.processor = processing.NewProcessor(suite.typeconverter, suite.federator, suite.oauthServer, suite.mediaManager, &suite.state, suite.emailSender)
+ suite.state.Workers.EnqueueClientAPI = suite.processor.Workers().EnqueueClientAPI
+ suite.state.Workers.EnqueueFediAPI = suite.processor.Workers().EnqueueFediAPI
+
+ testrig.StandardDBSetup(suite.db, suite.testAccounts)
+ testrig.StandardStorageSetup(suite.storage, "../../../testrig/media")
+}
+
+func (suite *WorkersTestSuite) TearDownTest() {
+ testrig.StandardDBTeardown(suite.db)
+ testrig.StandardStorageTeardown(suite.storage)
+ testrig.StopWorkers(&suite.state)
+}
+
+func (suite *WorkersTestSuite) openStreams(ctx context.Context, account *gtsmodel.Account, listIDs []string) map[string]*stream.Stream {
+ streams := make(map[string]*stream.Stream)
+
+ for _, streamType := range []string{
+ stream.TimelineHome,
+ stream.TimelinePublic,
+ stream.TimelineNotifications,
+ } {
+ stream, err := suite.processor.Stream().Open(ctx, account, streamType)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ streams[streamType] = stream
+ }
+
+ for _, listID := range listIDs {
+ streamType := stream.TimelineList + ":" + listID
+
+ stream, err := suite.processor.Stream().Open(ctx, account, streamType)
+ if err != nil {
+ suite.FailNow(err.Error())
+ }
+
+ streams[streamType] = stream
+ }
+
+ return streams
+}