diff options
author | 2023-11-04 20:21:20 +0000 | |
---|---|---|
committer | 2023-11-04 20:21:20 +0000 | |
commit | 41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8 (patch) | |
tree | 987b5d7787b24f6f6e340bbcf7fd1b052fe40dfc /internal/federation/federatingdb | |
parent | [docs/bugfix] fix link to swagger yaml (#2333) (diff) | |
download | gotosocial-41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8.tar.xz |
[feature] support canceling scheduled tasks, some federation API performance improvements (#2329)
Diffstat (limited to 'internal/federation/federatingdb')
-rw-r--r-- | internal/federation/federatingdb/create.go | 173 | ||||
-rw-r--r-- | internal/federation/federatingdb/create_test.go | 17 | ||||
-rw-r--r-- | internal/federation/federatingdb/db.go | 6 | ||||
-rw-r--r-- | internal/federation/federatingdb/question.go | 32 | ||||
-rw-r--r-- | internal/federation/federatingdb/update.go | 45 | ||||
-rw-r--r-- | internal/federation/federatingdb/util.go | 165 |
6 files changed, 146 insertions, 292 deletions
diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 14e846b15..0fb459190 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -24,7 +24,6 @@ import ( "strings" "codeberg.org/gruf/go-logger/v2/level" - "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -137,22 +136,37 @@ func (f *federatingDB) activityCreate( return gtserror.Newf("could not convert asType %T to ActivityStreamsCreate", asType) } - for _, object := range ap.ExtractObjects(create) { - // Try to get object as vocab.Type, - // else skip handling (likely) IRI. - objType := object.GetType() - if objType == nil { - continue - } + var errs gtserror.MultiError + + // Extract objects from create activity. + objects := ap.ExtractObjects(create) + + // Extract Statusables from objects slice (this must be + // done AFTER extracting options due to how AS typing works). + statusables, objects := ap.ExtractStatusables(objects) - if statusable, ok := ap.ToStatusable(objType); ok { - return f.createStatusable(ctx, statusable, receivingAccount, requestingAccount) + for _, statusable := range statusables { + // Check if this is a forwarded object, i.e. did + // the account making the request also create this? + forwarded := !isSender(statusable, requestingAccount) + + // Handle create event for this statusable. + if err := f.createStatusable(ctx, + receivingAccount, + requestingAccount, + statusable, + forwarded, + ); err != nil { + errs.Appendf("error creating statusable: %w", err) } + } - // TODO: handle CREATE of other types? + if len(objects) > 0 { + // Log any unhandled objects after filtering for debug purposes. + log.Debugf(ctx, "unhandled CREATE types: %v", typeNames(objects)) } - return nil + return errs.Combine() } // createStatusable handles a Create activity for a Statusable. @@ -161,88 +175,36 @@ func (f *federatingDB) activityCreate( // the processor for further asynchronous processing. func (f *federatingDB) createStatusable( ctx context.Context, + receiver *gtsmodel.Account, + requester *gtsmodel.Account, statusable ap.Statusable, - receivingAccount *gtsmodel.Account, - requestingAccount *gtsmodel.Account, + forwarded bool, ) error { - // Statusable must have an attributedTo. - attrToProp := statusable.GetActivityStreamsAttributedTo() - if attrToProp == nil { - return gtserror.Newf("statusable had no attributedTo") - } - - // Statusable must have an ID. - idProp := statusable.GetJSONLDId() - if idProp == nil || !idProp.IsIRI() { - return gtserror.Newf("statusable had no id, or id was not a URI") - } - - statusableURI := idProp.GetIRI() - - // Check if we have a forward. In other words, was the - // statusable posted to our inbox by at least one actor - // who actually created it, or are they forwarding it? - forward := true - for iter := attrToProp.Begin(); iter != attrToProp.End(); iter = iter.Next() { - actorURI, err := pub.ToId(iter) - if err != nil { - return gtserror.Newf("error extracting id from attributedTo entry: %w", err) - } - - if requestingAccount.URI == actorURI.String() { - // The actor who posted this statusable to our inbox is - // (one of) its creator(s), so this is not a forward. - forward = false - break - } - } - - // Check if we already have a status entry - // for this statusable, based on the ID/URI. - statusableURIStr := statusableURI.String() - status, err := f.state.DB.GetStatusByURI(ctx, statusableURIStr) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - return gtserror.Newf("db error checking existence of status %s: %w", statusableURIStr, err) - } - - if status != nil { - // We already had this status in the db, no need for further action. - log.Trace(ctx, "status already exists: %s", statusableURIStr) - return nil - } - // If we do have a forward, we should ignore the content // and instead deref based on the URI of the statusable. // // In other words, don't automatically trust whoever sent // this status to us, but fetch the authentic article from // the server it originated from. - if forward { - // Pass the statusable URI (APIri) into the processor worker - // and do the rest of the processing asynchronously. + if forwarded { + // Pass the statusable URI (APIri) into the processor + // worker and do the rest of the processing asynchronously. f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, - APIri: statusableURI, + APIri: ap.GetJSONLDId(statusable), APObjectModel: nil, GTSModel: nil, - ReceivingAccount: receivingAccount, + ReceivingAccount: receiver, }) return nil } - // This is a non-forwarded status we can trust the requester on, - // convert this provided statusable data to a useable gtsmodel status. - status, err = f.converter.ASStatusToStatus(ctx, statusable) - if err != nil { - return gtserror.Newf("error converting statusable to status: %w", err) - } - // Check whether we should accept this new status. accept, err := f.shouldAcceptStatusable(ctx, - receivingAccount, - requestingAccount, - status, + receiver, + requester, + statusable, ) if err != nil { return gtserror.Newf("error checking status acceptibility: %w", err) @@ -258,65 +220,52 @@ func (f *federatingDB) createStatusable( return nil } - // ID the new status based on the time it was created. - status.ID, err = id.NewULIDFromTime(status.CreatedAt) - if err != nil { - return err - } - - // Put this newly parsed status in the database. - if err := f.state.DB.PutStatus(ctx, status); err != nil { - if errors.Is(err, db.ErrAlreadyExists) { - // The status already exists in the database, which - // means we've already processed it and some race - // condition means we didn't catch it yet. We can - // just return nil here and be done with it. - return nil - } - return gtserror.Newf("db error inserting status: %w", err) - } - // Do the rest of the processing asynchronously. The processor // will handle inserting/updating + further dereferencing the status. f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, APIri: nil, + GTSModel: nil, APObjectModel: statusable, - GTSModel: status, - ReceivingAccount: receivingAccount, + ReceivingAccount: receiver, }) return nil } -func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, status *gtsmodel.Status) (bool, error) { +func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, statusable ap.Statusable) (bool, error) { host := config.GetHost() accountDomain := config.GetAccountDomain() // Check whether status mentions the receiver, // this is the quickest check so perform it first. - // Prefer checking using mention Href, fall back to Name. - for _, mention := range status.Mentions { - targetURI := mention.TargetAccountURI - nameString := mention.NameString - - if targetURI != "" { - if targetURI == receiver.URI || targetURI == receiver.URL { - // Target URI or URL match; - // receiver is mentioned. + mentions, _ := ap.ExtractMentions(statusable) + for _, mention := range mentions { + + // Extract placeholder mention vars. + accURI := mention.TargetAccountURI + name := mention.NameString + + switch { + case accURI != "": + if accURI == receiver.URI || + accURI == receiver.URL { + // Mention target is receiver, + // they are mentioned in status. return true, nil } - } else if nameString != "" { - username, domain, err := util.ExtractNamestringParts(nameString) + + case accURI == "" && name != "": + // Only a name was provided, extract the user@domain parts. + user, domain, err := util.ExtractNamestringParts(name) if err != nil { - return false, gtserror.Newf("error checking if mentioned: %w", err) + return false, gtserror.Newf("error extracting mention name parts: %w", err) } - if (domain == host || domain == accountDomain) && - strings.EqualFold(username, receiver.Username) { - // Username + domain match; - // receiver is mentioned. + // Check if the name points to our receiving local user. + isLocal := (domain == host || domain == accountDomain) + if isLocal && strings.EqualFold(user, receiver.Username) { return true, nil } } diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go index 6c18f5bd0..a1f1a7e18 100644 --- a/internal/federation/federatingdb/create_test.go +++ b/internal/federation/federatingdb/create_test.go @@ -39,6 +39,8 @@ func (suite *CreateTestSuite) TestCreateNote() { ctx := createTestContext(receivingAccount, requestingAccount) create := suite.testActivities["dm_for_zork"].Activity + objProp := create.GetActivityStreamsObject() + note := objProp.At(0).GetType().(ap.Statusable) err := suite.federatingDB.Create(ctx, create) suite.NoError(err) @@ -47,18 +49,7 @@ func (suite *CreateTestSuite) TestCreateNote() { msg := <-suite.fromFederator suite.Equal(ap.ObjectNote, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) - - // shiny new status should be defined on the message - suite.NotNil(msg.GTSModel) - status := msg.GTSModel.(*gtsmodel.Status) - - // status should have some expected values - suite.Equal(requestingAccount.ID, status.AccountID) - suite.Equal("@the_mighty_zork@localhost:8080 hey zork here's a new private note for you", status.Content) - - // status should be in the database - _, err = suite.db.GetStatusByID(context.Background(), status.ID) - suite.NoError(err) + suite.Equal(note, msg.APObjectModel) } func (suite *CreateTestSuite) TestCreateNoteForward() { @@ -78,7 +69,7 @@ func (suite *CreateTestSuite) TestCreateNoteForward() { suite.Equal(ap.ActivityCreate, msg.APActivityType) // nothing should be set as the model since this is a forward - suite.Nil(msg.GTSModel) + suite.Nil(msg.APObjectModel) // but we should have a uri set suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIri.String()) diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index 8e98dc2f2..75ef3a2a7 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -24,6 +24,7 @@ import ( "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" ) // DB wraps the pub.Database interface with a couple of custom functions for GoToSocial. @@ -33,7 +34,6 @@ type DB interface { Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error - Question(ctx context.Context, question vocab.ActivityStreamsQuestion) error } // FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface. @@ -41,13 +41,15 @@ type DB interface { type federatingDB struct { state *state.State converter *typeutils.Converter + filter *visibility.Filter } // New returns a DB interface using the given database and config -func New(state *state.State, converter *typeutils.Converter) DB { +func New(state *state.State, converter *typeutils.Converter, filter *visibility.Filter) DB { fdb := federatingDB{ state: state, converter: converter, + filter: filter, } return &fdb } diff --git a/internal/federation/federatingdb/question.go b/internal/federation/federatingdb/question.go deleted file mode 100644 index 85226d9ed..000000000 --- a/internal/federation/federatingdb/question.go +++ /dev/null @@ -1,32 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package federatingdb - -import ( - "context" - - "github.com/superseriousbusiness/activity/streams/vocab" -) - -func (f *federatingDB) Question(ctx context.Context, question vocab.ActivityStreamsQuestion) error { - receivingAccount, requestingAccount, internal := extractFromCtx(ctx) - if internal { - return nil // Already processed. - } - return f.createStatusable(ctx, question, receivingAccount, requestingAccount) -} diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go index 5d3d4a0ff..26ea81f72 100644 --- a/internal/federation/federatingdb/update.go +++ b/internal/federation/federatingdb/update.go @@ -19,11 +19,13 @@ package federatingdb import ( "context" + "errors" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -71,18 +73,21 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts // Extract AP URI of the updated Accountable model. idProp := accountable.GetJSONLDId() if idProp == nil || !idProp.IsIRI() { - return gtserror.New("Accountable id prop was nil or not IRI") + return gtserror.New("invalid id prop") } - updatedAcctURI := idProp.GetIRI() - // Don't try to update local accounts, it will break things. - if updatedAcctURI.Host == config.GetHost() { + // Get the account URI string for checks + accountURI := idProp.GetIRI() + accountURIStr := accountURI.String() + + // Don't try to update local accounts. + if accountURI.Host == config.GetHost() { return nil } - // Ensure Accountable and requesting account are one and the same. - if updatedAcctURIStr := updatedAcctURI.String(); requestingAcct.URI != updatedAcctURIStr { - return gtserror.Newf("update for %s was requested by %s, this is not valid", updatedAcctURIStr, requestingAcct.URI) + // Check that update was by the account themselves. + if accountURIStr != requestingAcct.URI { + return gtserror.Newf("update for %s was not requested by owner", accountURIStr) } // Pass in to the processor the existing version of the requesting @@ -117,15 +122,31 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm return nil } + // Check if this is a forwarded object, i.e. did + // the account making the request also create this? + forwarded := !isSender(statusable, requestingAcct) + // Get the status we have on file for this URI string. status, err := f.state.DB.GetStatusByURI(ctx, statusURIStr) - if err != nil { + if err != nil && !errors.Is(err, db.ErrNoEntries) { return gtserror.Newf("error fetching status from db: %w", err) } - // Check that update was by the status author. - if status.AccountID != requestingAcct.ID { - return gtserror.Newf("update for %s was not requested by author", statusURIStr) + if status == nil { + // We haven't seen this status before, be + // lenient and handle as a CREATE event. + return f.createStatusable(ctx, + receivingAcct, + requestingAcct, + statusable, + forwarded, + ) + } + + if forwarded { + // For forwarded updates, set a nil AS + // status to force refresh from remote. + statusable = nil } // Queue an UPDATE NOTE activity to our fedi API worker, @@ -134,7 +155,7 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm APObjectType: ap.ObjectNote, APActivityType: ap.ActivityUpdate, GTSModel: status, // original status - APObjectModel: statusable, + APObjectModel: (ap.Statusable)(statusable), ReceivingAccount: receivingAcct, }) diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go index d46451e21..dd7a2240e 100644 --- a/internal/federation/federatingdb/util.go +++ b/internal/federation/federatingdb/util.go @@ -20,7 +20,6 @@ package federatingdb import ( "context" "encoding/json" - "errors" "fmt" "net/url" @@ -37,6 +36,30 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/uris" ) +func typeNames(objects []ap.TypeOrIRI) []string { + names := make([]string, len(objects)) + for i, object := range objects { + if object.IsIRI() { + names[i] = "IRI" + } else if t := object.GetType(); t != nil { + names[i] = t.GetTypeName() + } else { + names[i] = "nil" + } + } + return names +} + +// isSender returns whether an object with AttributedTo property comes from the given requesting account. +func isSender(with ap.WithAttributedTo, requester *gtsmodel.Account) bool { + for _, uri := range ap.GetAttributedTo(with) { + if uri.String() == requester.URI { + return true + } + } + return false +} + func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityStreamsActorProperty) bool { if actor1 == nil || actor2 == nil { return false @@ -78,131 +101,31 @@ func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, l.Debug("entering NewID") } - switch t.GetTypeName() { - case ap.ActivityFollow: - // FOLLOW - // ID might already be set on a follow we've created, so check it here and return it if it is - follow, ok := t.(vocab.ActivityStreamsFollow) - if !ok { - return nil, errors.New("newid: follow couldn't be parsed into vocab.ActivityStreamsFollow") - } - idProp := follow.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - // it's not set so create one based on the actor set on the follow (ie., the followER not the followEE) - actorProp := follow.GetActivityStreamsActor() - if actorProp != nil { - for iter := actorProp.Begin(); iter != actorProp.End(); iter = iter.Next() { - // take the IRI of the first actor we can find (there should only be one) - if iter.IsIRI() { - // if there's an error here, just use the fallback behavior -- we don't need to return an error here - if actorAccount, err := f.state.DB.GetAccountByURI(ctx, iter.GetIRI().String()); err == nil { - newID, err := id.NewRandomULID() - if err != nil { - return nil, err - } - return url.Parse(uris.GenerateURIForFollow(actorAccount.Username, newID)) - } + // Most of our types set an ID already + // by this point, return this if found. + idProp := t.GetJSONLDId() + if idProp != nil && idProp.IsIRI() { + return idProp.GetIRI(), nil + } + + if t.GetTypeName() == ap.ActivityFollow { + follow, _ := t.(vocab.ActivityStreamsFollow) + + // If an actor URI has been set, create a new ID + // based on actor (i.e. followER not the followEE). + if uri := ap.GetActor(follow); len(uri) == 1 { + if actorAccount, err := f.state.DB.GetAccountByURI(ctx, uri[0].String()); err == nil { + newID, err := id.NewRandomULID() + if err != nil { + return nil, err } - } - } - case ap.ObjectNote: - // NOTE aka STATUS - // ID might already be set on a note we've created, so check it here and return it if it is - note, ok := t.(vocab.ActivityStreamsNote) - if !ok { - return nil, errors.New("newid: note couldn't be parsed into vocab.ActivityStreamsNote") - } - idProp := note.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityLike: - // LIKE aka FAVE - // ID might already be set on a fave we've created, so check it here and return it if it is - fave, ok := t.(vocab.ActivityStreamsLike) - if !ok { - return nil, errors.New("newid: fave couldn't be parsed into vocab.ActivityStreamsLike") - } - idProp := fave.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityCreate: - // CREATE - // ID might already be set on a Create, so check it here and return it if it is - create, ok := t.(vocab.ActivityStreamsCreate) - if !ok { - return nil, errors.New("newid: create couldn't be parsed into vocab.ActivityStreamsCreate") - } - idProp := create.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityAnnounce: - // ANNOUNCE aka BOOST - // ID might already be set on an announce we've created, so check it here and return it if it is - announce, ok := t.(vocab.ActivityStreamsAnnounce) - if !ok { - return nil, errors.New("newid: announce couldn't be parsed into vocab.ActivityStreamsAnnounce") - } - idProp := announce.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityUpdate: - // UPDATE - // ID might already be set on an update we've created, so check it here and return it if it is - update, ok := t.(vocab.ActivityStreamsUpdate) - if !ok { - return nil, errors.New("newid: update couldn't be parsed into vocab.ActivityStreamsUpdate") - } - idProp := update.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityBlock: - // BLOCK - // ID might already be set on a block we've created, so check it here and return it if it is - block, ok := t.(vocab.ActivityStreamsBlock) - if !ok { - return nil, errors.New("newid: block couldn't be parsed into vocab.ActivityStreamsBlock") - } - idProp := block.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityUndo: - // UNDO - // ID might already be set on an undo we've created, so check it here and return it if it is - undo, ok := t.(vocab.ActivityStreamsUndo) - if !ok { - return nil, errors.New("newid: undo couldn't be parsed into vocab.ActivityStreamsUndo") - } - idProp := undo.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil + return url.Parse(uris.GenerateURIForFollow(actorAccount.Username, newID)) } } } - // fallback default behavior: just return a random ULID after our protocol and host + // Default fallback behaviour: + // {proto}://{host}/{randomID} newID, err := id.NewRandomULID() if err != nil { return nil, err |