diff options
author | 2023-08-09 19:14:33 +0200 | |
---|---|---|
committer | 2023-08-09 19:14:33 +0200 | |
commit | 9770d54237bea828cab7e50aec7dff452c203138 (patch) | |
tree | 59c444a02e81925bab47d3656a489a8c7087d530 /internal/processing/fromfederator.go | |
parent | [bugfix] Fix incorrect per-loop variable capture (#2092) (diff) | |
download | gotosocial-9770d54237bea828cab7e50aec7dff452c203138.tar.xz |
[feature] List replies policy, refactor async workers (#2087)
* Add/update some DB functions.
* move async workers into subprocessor
* rename FromFederator -> FromFediAPI
* update home timeline check to include check for current status first before moving to parent status
* change streamMap to pointer to mollify linter
* update followtoas func signature
* fix merge
* remove errant debug log
* don't use separate errs.Combine() check to wrap errs
* wrap parts of workers functionality in sub-structs
* populate report using new db funcs
* embed federator (tiny bit tidier)
* flesh out error msg, add continue(!)
* fix other error messages to be more specific
* better, nicer
* give parseURI util function a bit more util
* missing headers
* use pointers for subprocessors
Diffstat (limited to 'internal/processing/fromfederator.go')
-rw-r--r-- | internal/processing/fromfederator.go | 486 |
1 files changed, 0 insertions, 486 deletions
diff --git a/internal/processing/fromfederator.go b/internal/processing/fromfederator.go deleted file mode 100644 index 2790d31ee..000000000 --- a/internal/processing/fromfederator.go +++ /dev/null @@ -1,486 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package processing - -import ( - "context" - "errors" - "net/url" - - "codeberg.org/gruf/go-kv" - "codeberg.org/gruf/go-logger/v2/level" - "github.com/superseriousbusiness/gotosocial/internal/ap" - "github.com/superseriousbusiness/gotosocial/internal/gtserror" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" - "github.com/superseriousbusiness/gotosocial/internal/log" - "github.com/superseriousbusiness/gotosocial/internal/messages" -) - -// ProcessFromFederator reads the APActivityType and APObjectType of an incoming message from the federator, -// and directs the message into the appropriate side effect handler function, or simply does nothing if there's -// no handler function defined for the combination of Activity and Object. -func (p *Processor) ProcessFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - // Allocate new log fields slice - fields := make([]kv.Field, 3, 5) - fields[0] = kv.Field{"activityType", federatorMsg.APActivityType} - fields[1] = kv.Field{"objectType", federatorMsg.APObjectType} - fields[2] = kv.Field{"toAccount", federatorMsg.ReceivingAccount.Username} - - if federatorMsg.APIri != nil { - // An IRI was supplied, append to log - fields = append(fields, kv.Field{ - "iri", federatorMsg.APIri, - }) - } - - if federatorMsg.GTSModel != nil && - log.Level() >= level.DEBUG { - // Append converted model to log - fields = append(fields, kv.Field{ - "model", federatorMsg.GTSModel, - }) - } - - // Log this federated message - l := log.WithContext(ctx).WithFields(fields...) - l.Info("processing from federator") - - switch federatorMsg.APActivityType { - case ap.ActivityCreate: - // CREATE SOMETHING - switch federatorMsg.APObjectType { - case ap.ObjectNote: - // CREATE A STATUS - return p.processCreateStatusFromFederator(ctx, federatorMsg) - case ap.ActivityLike: - // CREATE A FAVE - return p.processCreateFaveFromFederator(ctx, federatorMsg) - case ap.ActivityFollow: - // CREATE A FOLLOW REQUEST - return p.processCreateFollowRequestFromFederator(ctx, federatorMsg) - case ap.ActivityAnnounce: - // CREATE AN ANNOUNCE - return p.processCreateAnnounceFromFederator(ctx, federatorMsg) - case ap.ActivityBlock: - // CREATE A BLOCK - return p.processCreateBlockFromFederator(ctx, federatorMsg) - case ap.ActivityFlag: - // CREATE A FLAG / REPORT - return p.processCreateFlagFromFederator(ctx, federatorMsg) - } - case ap.ActivityUpdate: - // UPDATE SOMETHING - if federatorMsg.APObjectType == ap.ObjectProfile { - // UPDATE AN ACCOUNT - return p.processUpdateAccountFromFederator(ctx, federatorMsg) - } - case ap.ActivityDelete: - // DELETE SOMETHING - switch federatorMsg.APObjectType { - case ap.ObjectNote: - // DELETE A STATUS - return p.processDeleteStatusFromFederator(ctx, federatorMsg) - case ap.ObjectProfile: - // DELETE A PROFILE/ACCOUNT - return p.processDeleteAccountFromFederator(ctx, federatorMsg) - } - } - - // not a combination we can/need to process - return nil -} - -// processCreateStatusFromFederator handles Activity Create and Object Note. -func (p *Processor) processCreateStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - var ( - status *gtsmodel.Status - err error - - // Check the federatorMsg for either an already dereferenced - // and converted status pinned to the message, or a forwarded - // AP IRI that we still need to deref. - forwarded = (federatorMsg.GTSModel == nil) - ) - - if forwarded { - // Model was not set, deref with IRI. - // This will also cause the status to be inserted into the db. - status, err = p.statusFromAPIRI(ctx, federatorMsg) - } else { - // Model is set, ensure we have the most up-to-date model. - status, err = p.statusFromGTSModel(ctx, federatorMsg) - } - - if err != nil { - return gtserror.Newf("error extracting status from federatorMsg: %w", err) - } - - if status.Account == nil || status.Account.IsRemote() { - // Either no account attached yet, or a remote account. - // Both situations we need to parse account URI to fetch it. - accountURI, err := url.Parse(status.AccountURI) - if err != nil { - return err - } - - // Ensure that account for this status has been deref'd. - status.Account, _, err = p.federator.GetAccountByURI(ctx, - federatorMsg.ReceivingAccount.Username, - accountURI, - ) - if err != nil { - return err - } - } - - // Ensure status ancestors dereferenced. We need at least the - // immediate parent (if present) to ascertain timelineability. - if err := p.federator.DereferenceStatusAncestors(ctx, - federatorMsg.ReceivingAccount.Username, - status, - ); err != nil { - return err - } - - if status.InReplyToID != "" { - // Interaction counts changed on the replied status; - // uncache the prepared version from all timelines. - p.invalidateStatusFromTimelines(ctx, status.InReplyToID) - } - - if err := p.timelineAndNotifyStatus(ctx, status); err != nil { - return gtserror.Newf("error timelining status: %w", err) - } - - return nil -} - -func (p *Processor) statusFromGTSModel(ctx context.Context, federatorMsg messages.FromFederator) (*gtsmodel.Status, error) { - // There should be a status pinned to the federatorMsg - // (we've already checked to ensure this is not nil). - status, ok := federatorMsg.GTSModel.(*gtsmodel.Status) - if !ok { - err := gtserror.New("Note was not parseable as *gtsmodel.Status") - return nil, err - } - - // AP statusable representation may have also - // been set on message (no problem if not). - statusable, _ := federatorMsg.APObjectModel.(ap.Statusable) - - // Call refresh on status to update - // it (deref remote) if necessary. - var err error - status, _, err = p.federator.RefreshStatus( - ctx, - federatorMsg.ReceivingAccount.Username, - status, - statusable, - false, - ) - if err != nil { - return nil, gtserror.Newf("%w", err) - } - - return status, nil -} - -func (p *Processor) statusFromAPIRI(ctx context.Context, federatorMsg messages.FromFederator) (*gtsmodel.Status, error) { - // There should be a status IRI pinned to - // the federatorMsg for us to dereference. - if federatorMsg.APIri == nil { - err := gtserror.New("status was not pinned to federatorMsg, and neither was an IRI for us to dereference") - return nil, err - } - - // Get the status + ensure we have - // the most up-to-date version. - status, _, err := p.federator.GetStatusByURI( - ctx, - federatorMsg.ReceivingAccount.Username, - federatorMsg.APIri, - ) - if err != nil { - return nil, gtserror.Newf("%w", err) - } - - return status, nil -} - -// processCreateFaveFromFederator handles Activity Create with Object Like. -func (p *Processor) processCreateFaveFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - statusFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave) - if !ok { - return gtserror.New("Like was not parseable as *gtsmodel.StatusFave") - } - - if err := p.notifyFave(ctx, statusFave); err != nil { - return gtserror.Newf("error notifying status fave: %w", err) - } - - // Interaction counts changed on the faved status; - // uncache the prepared version from all timelines. - p.invalidateStatusFromTimelines(ctx, statusFave.StatusID) - - return nil -} - -// processCreateFollowRequestFromFederator handles Activity Create and Object Follow -func (p *Processor) processCreateFollowRequestFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - followRequest, ok := federatorMsg.GTSModel.(*gtsmodel.FollowRequest) - if !ok { - return errors.New("incomingFollowRequest was not parseable as *gtsmodel.FollowRequest") - } - - // make sure the account is pinned - if followRequest.Account == nil { - a, err := p.state.DB.GetAccountByID(ctx, followRequest.AccountID) - if err != nil { - return err - } - followRequest.Account = a - } - - // Get the remote account to make sure the avi and header are cached. - if followRequest.Account.Domain != "" { - remoteAccountID, err := url.Parse(followRequest.Account.URI) - if err != nil { - return err - } - - a, _, err := p.federator.GetAccountByURI(ctx, - federatorMsg.ReceivingAccount.Username, - remoteAccountID, - ) - if err != nil { - return err - } - - followRequest.Account = a - } - - if followRequest.TargetAccount == nil { - a, err := p.state.DB.GetAccountByID(ctx, followRequest.TargetAccountID) - if err != nil { - return err - } - followRequest.TargetAccount = a - } - - if *followRequest.TargetAccount.Locked { - // if the account is locked just notify the follow request and nothing else - return p.notifyFollowRequest(ctx, followRequest) - } - - // if the target account isn't locked, we should already accept the follow and notify about the new follower instead - follow, err := p.state.DB.AcceptFollowRequest(ctx, followRequest.AccountID, followRequest.TargetAccountID) - if err != nil { - return err - } - - if err := p.federateAcceptFollowRequest(ctx, follow); err != nil { - return err - } - - return p.notifyFollow(ctx, follow, followRequest.TargetAccount) -} - -// processCreateAnnounceFromFederator handles Activity Create with Object Announce. -func (p *Processor) processCreateAnnounceFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - status, ok := federatorMsg.GTSModel.(*gtsmodel.Status) - if !ok { - return gtserror.New("Announce was not parseable as *gtsmodel.Status") - } - - // Dereference status that this status boosts. - if err := p.federator.DereferenceAnnounce(ctx, status, federatorMsg.ReceivingAccount.Username); err != nil { - return gtserror.Newf("error dereferencing announce: %w", err) - } - - // Generate an ID for the boost wrapper status. - statusID, err := id.NewULIDFromTime(status.CreatedAt) - if err != nil { - return gtserror.Newf("error generating id: %w", err) - } - status.ID = statusID - - // Store the boost wrapper status. - if err := p.state.DB.PutStatus(ctx, status); err != nil { - return gtserror.Newf("db error inserting status: %w", err) - } - - // Ensure boosted status ancestors dereferenced. We need at least - // the immediate parent (if present) to ascertain timelineability. - if err := p.federator.DereferenceStatusAncestors(ctx, - federatorMsg.ReceivingAccount.Username, - status.BoostOf, - ); err != nil { - return err - } - - // Timeline and notify the announce. - if err := p.timelineAndNotifyStatus(ctx, status); err != nil { - return gtserror.Newf("error timelining status: %w", err) - } - - if err := p.notifyAnnounce(ctx, status); err != nil { - return gtserror.Newf("error notifying status: %w", err) - } - - // Interaction counts changed on the boosted status; - // uncache the prepared version from all timelines. - p.invalidateStatusFromTimelines(ctx, status.ID) - - return nil -} - -// processCreateBlockFromFederator handles Activity Create and Object Block -func (p *Processor) processCreateBlockFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - block, ok := federatorMsg.GTSModel.(*gtsmodel.Block) - if !ok { - return gtserror.New("block was not parseable as *gtsmodel.Block") - } - - // Remove each account's posts from the other's timelines. - // - // First home timelines. - if err := p.state.Timelines.Home.WipeItemsFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil { - return gtserror.Newf("%w", err) - } - - if err := p.state.Timelines.Home.WipeItemsFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil { - return gtserror.Newf("%w", err) - } - - // Now list timelines. - if err := p.state.Timelines.List.WipeItemsFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil { - return gtserror.Newf("%w", err) - } - - if err := p.state.Timelines.List.WipeItemsFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil { - return gtserror.Newf("%w", err) - } - - // Remove any follows that existed between blocker + blockee. - if err := p.state.DB.DeleteFollow(ctx, block.AccountID, block.TargetAccountID); err != nil { - return gtserror.Newf( - "db error deleting follow from %s targeting %s: %w", - block.AccountID, block.TargetAccountID, err, - ) - } - - if err := p.state.DB.DeleteFollow(ctx, block.TargetAccountID, block.AccountID); err != nil { - return gtserror.Newf( - "db error deleting follow from %s targeting %s: %w", - block.TargetAccountID, block.AccountID, err, - ) - } - - // Remove any follow requests that existed between blocker + blockee. - if err := p.state.DB.DeleteFollowRequest(ctx, block.AccountID, block.TargetAccountID); err != nil { - return gtserror.Newf( - "db error deleting follow request from %s targeting %s: %w", - block.AccountID, block.TargetAccountID, err, - ) - } - - if err := p.state.DB.DeleteFollowRequest(ctx, block.TargetAccountID, block.AccountID); err != nil { - return gtserror.Newf( - "db error deleting follow request from %s targeting %s: %w", - block.TargetAccountID, block.AccountID, err, - ) - } - - return nil -} - -func (p *Processor) processCreateFlagFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - incomingReport, ok := federatorMsg.GTSModel.(*gtsmodel.Report) - if !ok { - return errors.New("flag was not parseable as *gtsmodel.Report") - } - - // TODO: handle additional side effects of flag creation: - // - notify admins by dm / notification - - return p.emailReport(ctx, incomingReport) -} - -// processUpdateAccountFromFederator handles Activity Update and Object Profile -func (p *Processor) processUpdateAccountFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - // Parse the old/existing account model. - account, ok := federatorMsg.GTSModel.(*gtsmodel.Account) - if !ok { - return gtserror.New("account was not parseable as *gtsmodel.Account") - } - - // Because this was an Update, the new Accountable should be set on the message. - apubAcc, ok := federatorMsg.APObjectModel.(ap.Accountable) - if !ok { - return gtserror.New("Accountable was not parseable on update account message") - } - - // Fetch up-to-date bio, avatar, header, etc. - _, _, err := p.federator.RefreshAccount( - ctx, - federatorMsg.ReceivingAccount.Username, - account, - apubAcc, - true, // Force refresh. - ) - if err != nil { - return gtserror.Newf("error refreshing updated account: %w", err) - } - - return nil -} - -// processDeleteStatusFromFederator handles Activity Delete and Object Note -func (p *Processor) processDeleteStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - status, ok := federatorMsg.GTSModel.(*gtsmodel.Status) - if !ok { - return errors.New("Note was not parseable as *gtsmodel.Status") - } - - // Delete attachments from this status, since this request - // comes from the federating API, and there's no way the - // poster can do a delete + redraft for it on our instance. - deleteAttachments := true - if err := p.wipeStatus(ctx, status, deleteAttachments); err != nil { - return gtserror.Newf("error wiping status: %w", err) - } - - if status.InReplyToID != "" { - // Interaction counts changed on the replied status; - // uncache the prepared version from all timelines. - p.invalidateStatusFromTimelines(ctx, status.InReplyToID) - } - - return nil -} - -// processDeleteAccountFromFederator handles Activity Delete and Object Profile -func (p *Processor) processDeleteAccountFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error { - account, ok := federatorMsg.GTSModel.(*gtsmodel.Account) - if !ok { - return errors.New("account delete was not parseable as *gtsmodel.Account") - } - - return p.account.Delete(ctx, account, account.ID) -} |