summaryrefslogtreecommitdiff
path: root/internal/processing
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing')
-rw-r--r--internal/processing/fromfederator.go389
-rw-r--r--internal/processing/fromfederator_test.go23
-rw-r--r--internal/processing/processor_test.go28
3 files changed, 276 insertions, 164 deletions
diff --git a/internal/processing/fromfederator.go b/internal/processing/fromfederator.go
index 1ef29264e..449cc6f08 100644
--- a/internal/processing/fromfederator.go
+++ b/internal/processing/fromfederator.go
@@ -31,203 +31,264 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/messages"
)
+// ProcessFromFederator reads the APActivityType and APObjectType of an incoming message from the federator,
+// and directs the message into the appropriate side effect handler function, or simply does nothing if there's
+// no handler function defined for the combination of Activity and Object.
func (p *processor) ProcessFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
l := p.log.WithFields(logrus.Fields{
- "func": "processFromFederator",
- "federatorMsg": fmt.Sprintf("%+v", federatorMsg),
+ "func": "processFromFederator",
+ "APActivityType": federatorMsg.APActivityType,
+ "APObjectType": federatorMsg.APObjectType,
})
-
- l.Trace("entering function PROCESS FROM FEDERATOR")
+ l.Trace("processing message from federator")
switch federatorMsg.APActivityType {
case ap.ActivityCreate:
- // CREATE
+ // CREATE SOMETHING
switch federatorMsg.APObjectType {
case ap.ObjectNote:
// CREATE A STATUS
- incomingStatus, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
- if !ok {
- return errors.New("note was not parseable as *gtsmodel.Status")
- }
-
- status, err := p.federator.EnrichRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, incomingStatus, true)
- if err != nil {
- return err
- }
-
- if err := p.timelineStatus(ctx, status); err != nil {
- return err
- }
-
- if err := p.notifyStatus(ctx, status); err != nil {
- return err
- }
- case ap.ObjectProfile:
- // CREATE AN ACCOUNT
- // nothing to do here
+ return p.processCreateStatusFromFederator(ctx, federatorMsg)
case ap.ActivityLike:
// CREATE A FAVE
- incomingFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave)
- if !ok {
- return errors.New("like was not parseable as *gtsmodel.StatusFave")
- }
-
- if err := p.notifyFave(ctx, incomingFave); err != nil {
- return err
- }
+ return p.processCreateFaveFromFederator(ctx, federatorMsg)
case ap.ActivityFollow:
// CREATE A FOLLOW REQUEST
- followRequest, ok := federatorMsg.GTSModel.(*gtsmodel.FollowRequest)
- if !ok {
- return errors.New("incomingFollowRequest was not parseable as *gtsmodel.FollowRequest")
- }
-
- if followRequest.TargetAccount == nil {
- a, err := p.db.GetAccountByID(ctx, followRequest.TargetAccountID)
- if err != nil {
- return err
- }
- followRequest.TargetAccount = a
- }
- targetAccount := followRequest.TargetAccount
-
- if targetAccount.Locked {
- // if the account is locked just notify the follow request and nothing else
- return p.notifyFollowRequest(ctx, followRequest)
- }
-
- if followRequest.Account == nil {
- a, err := p.db.GetAccountByID(ctx, followRequest.AccountID)
- if err != nil {
- return err
- }
- followRequest.Account = a
- }
- originAccount := followRequest.Account
-
- // if the target account isn't locked, we should already accept the follow and notify about the new follower instead
- follow, err := p.db.AcceptFollowRequest(ctx, followRequest.AccountID, followRequest.TargetAccountID)
- if err != nil {
- return err
- }
-
- if err := p.federateAcceptFollowRequest(ctx, follow, originAccount, targetAccount); err != nil {
- return err
- }
-
- return p.notifyFollow(ctx, follow, targetAccount)
+ return p.processCreateFollowRequestFromFederator(ctx, federatorMsg)
case ap.ActivityAnnounce:
// CREATE AN ANNOUNCE
- incomingAnnounce, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
- if !ok {
- return errors.New("announce was not parseable as *gtsmodel.Status")
- }
-
- if err := p.federator.DereferenceAnnounce(ctx, incomingAnnounce, federatorMsg.ReceivingAccount.Username); err != nil {
- return fmt.Errorf("error dereferencing announce from federator: %s", err)
- }
-
- incomingAnnounceID, err := id.NewULIDFromTime(incomingAnnounce.CreatedAt)
- if err != nil {
- return err
- }
- incomingAnnounce.ID = incomingAnnounceID
-
- if err := p.db.PutStatus(ctx, incomingAnnounce); err != nil {
- return fmt.Errorf("error adding dereferenced announce to the db: %s", err)
- }
-
- if err := p.timelineStatus(ctx, incomingAnnounce); err != nil {
- return err
- }
-
- if err := p.notifyAnnounce(ctx, incomingAnnounce); err != nil {
- return err
- }
+ return p.processCreateAnnounceFromFederator(ctx, federatorMsg)
case ap.ActivityBlock:
// CREATE A BLOCK
- block, ok := federatorMsg.GTSModel.(*gtsmodel.Block)
- if !ok {
- return errors.New("block was not parseable as *gtsmodel.Block")
- }
-
- // remove any of the blocking account's statuses from the blocked account's timeline, and vice versa
- if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil {
- return err
- }
- if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil {
- return err
- }
- // TODO: same with notifications
- // TODO: same with bookmarks
+ return p.processCreateBlockFromFederator(ctx, federatorMsg)
}
case ap.ActivityUpdate:
- // UPDATE
+ // UPDATE SOMETHING
switch federatorMsg.APObjectType {
case ap.ObjectProfile:
// UPDATE AN ACCOUNT
- incomingAccount, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
- if !ok {
- return errors.New("profile was not parseable as *gtsmodel.Account")
- }
-
- if _, err := p.federator.EnrichRemoteAccount(ctx, federatorMsg.ReceivingAccount.Username, incomingAccount); err != nil {
- return fmt.Errorf("error enriching updated account from federator: %s", err)
- }
+ return p.processUpdateAccountFromFederator(ctx, federatorMsg)
}
case ap.ActivityDelete:
- // DELETE
+ // DELETE SOMETHING
switch federatorMsg.APObjectType {
case ap.ObjectNote:
// DELETE A STATUS
- // TODO: handle side effects of status deletion here:
- // 1. delete all media associated with status
- // 2. delete boosts of status
- // 3. etc etc etc
- statusToDelete, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
- if !ok {
- return errors.New("note was not parseable as *gtsmodel.Status")
- }
-
- // delete all attachments for this status
- for _, a := range statusToDelete.AttachmentIDs {
- if err := p.mediaProcessor.Delete(ctx, a); err != nil {
- return err
- }
- }
-
- // delete all mentions for this status
- for _, m := range statusToDelete.MentionIDs {
- if err := p.db.DeleteByID(ctx, m, &gtsmodel.Mention{}); err != nil {
- return err
- }
- }
-
- // delete all notifications for this status
- if err := p.db.DeleteWhere(ctx, []db.Where{{Key: "status_id", Value: statusToDelete.ID}}, &[]*gtsmodel.Notification{}); err != nil {
- return err
- }
-
- // remove this status from any and all timelines
- return p.deleteStatusFromTimelines(ctx, statusToDelete)
+ return p.processDeleteStatusFromFederator(ctx, federatorMsg)
case ap.ObjectProfile:
// DELETE A PROFILE/ACCOUNT
- // handle side effects of account deletion here: delete all objects, statuses, media etc associated with account
- account, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
- if !ok {
- return errors.New("account delete was not parseable as *gtsmodel.Account")
- }
+ return p.processDeleteAccountFromFederator(ctx, federatorMsg)
+ }
+ }
- return p.accountProcessor.Delete(ctx, account, account.ID)
+ // not a combination we can/need to process
+ return nil
+}
+
+// processCreateStatusFromFederator handles Activity Create and Object Note
+func (p *processor) processCreateStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ // check for either an IRI that we still need to dereference, OR an already dereferenced
+ // and converted status pinned to the message.
+ var status *gtsmodel.Status
+
+ if federatorMsg.GTSModel != nil {
+ // there's a gts model already pinned to the message, it should be a status
+ var ok bool
+ if status, ok = federatorMsg.GTSModel.(*gtsmodel.Status); !ok {
+ return errors.New("ProcessFromFederator: note was not parseable as *gtsmodel.Status")
}
- case ap.ActivityAccept:
- // ACCEPT
- switch federatorMsg.APObjectType {
- case ap.ActivityFollow:
- // ACCEPT A FOLLOW
- // nothing to do here
+
+ var err error
+ status, err = p.federator.EnrichRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, status, true)
+ if err != nil {
+ return err
+ }
+ } else {
+ // no model pinned, we need to dereference based on the IRI
+ if federatorMsg.APIri == nil {
+ return errors.New("ProcessFromFederator: status was not pinned to federatorMsg, and neither was an IRI for us to dereference")
}
+ var err error
+ status, _, _, err = p.federator.GetRemoteStatus(ctx, federatorMsg.ReceivingAccount.Username, federatorMsg.APIri, false, false)
+ if err != nil {
+ return err
+ }
+ }
+
+ if err := p.timelineStatus(ctx, status); err != nil {
+ return err
+ }
+
+ if err := p.notifyStatus(ctx, status); err != nil {
+ return err
}
return nil
}
+
+// processCreateFaveFromFederator handles Activity Create and Object Like
+func (p *processor) processCreateFaveFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ incomingFave, ok := federatorMsg.GTSModel.(*gtsmodel.StatusFave)
+ if !ok {
+ return errors.New("like was not parseable as *gtsmodel.StatusFave")
+ }
+
+ if err := p.notifyFave(ctx, incomingFave); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// processCreateFollowRequestFromFederator handles Activity Create and Object Follow
+func (p *processor) processCreateFollowRequestFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ followRequest, ok := federatorMsg.GTSModel.(*gtsmodel.FollowRequest)
+ if !ok {
+ return errors.New("incomingFollowRequest was not parseable as *gtsmodel.FollowRequest")
+ }
+
+ if followRequest.TargetAccount == nil {
+ a, err := p.db.GetAccountByID(ctx, followRequest.TargetAccountID)
+ if err != nil {
+ return err
+ }
+ followRequest.TargetAccount = a
+ }
+ targetAccount := followRequest.TargetAccount
+
+ if targetAccount.Locked {
+ // if the account is locked just notify the follow request and nothing else
+ return p.notifyFollowRequest(ctx, followRequest)
+ }
+
+ if followRequest.Account == nil {
+ a, err := p.db.GetAccountByID(ctx, followRequest.AccountID)
+ if err != nil {
+ return err
+ }
+ followRequest.Account = a
+ }
+ originAccount := followRequest.Account
+
+ // if the target account isn't locked, we should already accept the follow and notify about the new follower instead
+ follow, err := p.db.AcceptFollowRequest(ctx, followRequest.AccountID, followRequest.TargetAccountID)
+ if err != nil {
+ return err
+ }
+
+ if err := p.federateAcceptFollowRequest(ctx, follow, originAccount, targetAccount); err != nil {
+ return err
+ }
+
+ return p.notifyFollow(ctx, follow, targetAccount)
+}
+
+// processCreateAnnounceFromFederator handles Activity Create and Object Announce
+func (p *processor) processCreateAnnounceFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ incomingAnnounce, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return errors.New("announce was not parseable as *gtsmodel.Status")
+ }
+
+ if err := p.federator.DereferenceAnnounce(ctx, incomingAnnounce, federatorMsg.ReceivingAccount.Username); err != nil {
+ return fmt.Errorf("error dereferencing announce from federator: %s", err)
+ }
+
+ incomingAnnounceID, err := id.NewULIDFromTime(incomingAnnounce.CreatedAt)
+ if err != nil {
+ return err
+ }
+ incomingAnnounce.ID = incomingAnnounceID
+
+ if err := p.db.PutStatus(ctx, incomingAnnounce); err != nil {
+ return fmt.Errorf("error adding dereferenced announce to the db: %s", err)
+ }
+
+ if err := p.timelineStatus(ctx, incomingAnnounce); err != nil {
+ return err
+ }
+
+ if err := p.notifyAnnounce(ctx, incomingAnnounce); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// processCreateBlockFromFederator handles Activity Create and Object Block
+func (p *processor) processCreateBlockFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ block, ok := federatorMsg.GTSModel.(*gtsmodel.Block)
+ if !ok {
+ return errors.New("block was not parseable as *gtsmodel.Block")
+ }
+
+ // remove any of the blocking account's statuses from the blocked account's timeline, and vice versa
+ if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil {
+ return err
+ }
+ if err := p.timelineManager.WipeStatusesFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil {
+ return err
+ }
+ // TODO: same with notifications
+ // TODO: same with bookmarks
+
+ return nil
+}
+
+// processUpdateAccountFromFederator handles Activity Update and Object Profile
+func (p *processor) processUpdateAccountFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ incomingAccount, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
+ if !ok {
+ return errors.New("profile was not parseable as *gtsmodel.Account")
+ }
+
+ if _, err := p.federator.EnrichRemoteAccount(ctx, federatorMsg.ReceivingAccount.Username, incomingAccount); err != nil {
+ return fmt.Errorf("error enriching updated account from federator: %s", err)
+ }
+
+ return nil
+}
+
+// processDeleteStatusFromFederator handles Activity Delete and Object Note
+func (p *processor) processDeleteStatusFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ // TODO: handle side effects of status deletion here:
+ // 1. delete all media associated with status
+ // 2. delete boosts of status
+ // 3. etc etc etc
+ statusToDelete, ok := federatorMsg.GTSModel.(*gtsmodel.Status)
+ if !ok {
+ return errors.New("note was not parseable as *gtsmodel.Status")
+ }
+
+ // delete all attachments for this status
+ for _, a := range statusToDelete.AttachmentIDs {
+ if err := p.mediaProcessor.Delete(ctx, a); err != nil {
+ return err
+ }
+ }
+
+ // delete all mentions for this status
+ for _, m := range statusToDelete.MentionIDs {
+ if err := p.db.DeleteByID(ctx, m, &gtsmodel.Mention{}); err != nil {
+ return err
+ }
+ }
+
+ // delete all notifications for this status
+ if err := p.db.DeleteWhere(ctx, []db.Where{{Key: "status_id", Value: statusToDelete.ID}}, &[]*gtsmodel.Notification{}); err != nil {
+ return err
+ }
+
+ // remove this status from any and all timelines
+ return p.deleteStatusFromTimelines(ctx, statusToDelete)
+}
+
+// processDeleteAccountFromFederator handles Activity Delete and Object Profile
+func (p *processor) processDeleteAccountFromFederator(ctx context.Context, federatorMsg messages.FromFederator) error {
+ account, ok := federatorMsg.GTSModel.(*gtsmodel.Account)
+ if !ok {
+ return errors.New("account delete was not parseable as *gtsmodel.Account")
+ }
+
+ return p.accountProcessor.Delete(ctx, account, account.ID)
+}
diff --git a/internal/processing/fromfederator_test.go b/internal/processing/fromfederator_test.go
index 4f100d4cb..09519d1d3 100644
--- a/internal/processing/fromfederator_test.go
+++ b/internal/processing/fromfederator_test.go
@@ -32,6 +32,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/testrig"
)
type FromFederatorTestSuite struct {
@@ -486,6 +487,28 @@ func (suite *FromFederatorTestSuite) TestProcessFollowRequestUnlocked() {
suite.Equal("Accept", accept.Type)
}
+// TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor.
+func (suite *FromFederatorTestSuite) TestCreateStatusFromIRI() {
+ ctx := context.Background()
+
+ receivingAccount := suite.testAccounts["local_account_1"]
+ statusCreator := suite.testAccounts["remote_account_2"]
+
+ err := suite.processor.ProcessFromFederator(ctx, messages.FromFederator{
+ APObjectType: ap.ObjectNote,
+ APActivityType: ap.ActivityCreate,
+ GTSModel: nil, // gtsmodel is nil because this is a forwarded status -- we want to dereference it using the iri
+ ReceivingAccount: receivingAccount,
+ APIri: testrig.URLMustParse("http://example.org/users/some_user/statuses/afaba698-5740-4e32-a702-af61aa543bc1"),
+ })
+ suite.NoError(err)
+
+ // status should now be in the database, attributed to remote_account_2
+ s, err := suite.db.GetStatusByURI(context.Background(), "http://example.org/users/some_user/statuses/afaba698-5740-4e32-a702-af61aa543bc1")
+ suite.NoError(err)
+ suite.Equal(statusCreator.URI, s.AccountURI)
+}
+
func TestFromFederatorTestSuite(t *testing.T) {
suite.Run(t, &FromFederatorTestSuite{})
}
diff --git a/internal/processing/processor_test.go b/internal/processing/processor_test.go
index 1c4dfb32f..7a1db442e 100644
--- a/internal/processing/processor_test.go
+++ b/internal/processing/processor_test.go
@@ -69,6 +69,7 @@ type ProcessingStandardTestSuite struct {
testMentions map[string]*gtsmodel.Mention
testAutheds map[string]*oauth.Auth
testBlocks map[string]*gtsmodel.Block
+ testActivities map[string]testrig.ActivityWithSignature
sentHTTPRequests map[string][]byte
@@ -92,6 +93,7 @@ func (suite *ProcessingStandardTestSuite) SetupSuite() {
Account: suite.testAccounts["local_account_1"],
},
}
+ suite.testActivities = testrig.NewTestActivities(suite.testAccounts)
suite.testBlocks = testrig.NewTestBlocks()
}
@@ -149,6 +151,32 @@ func (suite *ProcessingStandardTestSuite) SetupTest() {
return response, nil
}
+ if req.URL.String() == "http://example.org/users/some_user/statuses/afaba698-5740-4e32-a702-af61aa543bc1" {
+ // the request is for the forwarded message
+ message := suite.testActivities["forwarded_message"].Activity.GetActivityStreamsObject().At(0).GetActivityStreamsNote()
+ messageI, err := streams.Serialize(message)
+ if err != nil {
+ panic(err)
+ }
+ messageJson, err := json.Marshal(messageI)
+ if err != nil {
+ panic(err)
+ }
+ responseType := "application/activity+json"
+
+ reader := bytes.NewReader(messageJson)
+ readCloser := io.NopCloser(reader)
+ response := &http.Response{
+ StatusCode: 200,
+ Body: readCloser,
+ ContentLength: int64(len(messageJson)),
+ Header: http.Header{
+ "content-type": {responseType},
+ },
+ }
+ return response, nil
+ }
+
r := ioutil.NopCloser(bytes.NewReader([]byte{}))
return &http.Response{
StatusCode: 200,