diff options
author | 2023-11-04 20:21:20 +0000 | |
---|---|---|
committer | 2023-11-04 20:21:20 +0000 | |
commit | 41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8 (patch) | |
tree | 987b5d7787b24f6f6e340bbcf7fd1b052fe40dfc /internal/federation/federatingdb/create.go | |
parent | [docs/bugfix] fix link to swagger yaml (#2333) (diff) | |
download | gotosocial-41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8.tar.xz |
[feature] support canceling scheduled tasks, some federation API performance improvements (#2329)
Diffstat (limited to 'internal/federation/federatingdb/create.go')
-rw-r--r-- | internal/federation/federatingdb/create.go | 173 |
1 files changed, 61 insertions, 112 deletions
diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 14e846b15..0fb459190 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -24,7 +24,6 @@ import ( "strings" "codeberg.org/gruf/go-logger/v2/level" - "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -137,22 +136,37 @@ func (f *federatingDB) activityCreate( return gtserror.Newf("could not convert asType %T to ActivityStreamsCreate", asType) } - for _, object := range ap.ExtractObjects(create) { - // Try to get object as vocab.Type, - // else skip handling (likely) IRI. - objType := object.GetType() - if objType == nil { - continue - } + var errs gtserror.MultiError + + // Extract objects from create activity. + objects := ap.ExtractObjects(create) + + // Extract Statusables from objects slice (this must be + // done AFTER extracting options due to how AS typing works). + statusables, objects := ap.ExtractStatusables(objects) - if statusable, ok := ap.ToStatusable(objType); ok { - return f.createStatusable(ctx, statusable, receivingAccount, requestingAccount) + for _, statusable := range statusables { + // Check if this is a forwarded object, i.e. did + // the account making the request also create this? + forwarded := !isSender(statusable, requestingAccount) + + // Handle create event for this statusable. + if err := f.createStatusable(ctx, + receivingAccount, + requestingAccount, + statusable, + forwarded, + ); err != nil { + errs.Appendf("error creating statusable: %w", err) } + } - // TODO: handle CREATE of other types? + if len(objects) > 0 { + // Log any unhandled objects after filtering for debug purposes. + log.Debugf(ctx, "unhandled CREATE types: %v", typeNames(objects)) } - return nil + return errs.Combine() } // createStatusable handles a Create activity for a Statusable. @@ -161,88 +175,36 @@ func (f *federatingDB) activityCreate( // the processor for further asynchronous processing. func (f *federatingDB) createStatusable( ctx context.Context, + receiver *gtsmodel.Account, + requester *gtsmodel.Account, statusable ap.Statusable, - receivingAccount *gtsmodel.Account, - requestingAccount *gtsmodel.Account, + forwarded bool, ) error { - // Statusable must have an attributedTo. - attrToProp := statusable.GetActivityStreamsAttributedTo() - if attrToProp == nil { - return gtserror.Newf("statusable had no attributedTo") - } - - // Statusable must have an ID. - idProp := statusable.GetJSONLDId() - if idProp == nil || !idProp.IsIRI() { - return gtserror.Newf("statusable had no id, or id was not a URI") - } - - statusableURI := idProp.GetIRI() - - // Check if we have a forward. In other words, was the - // statusable posted to our inbox by at least one actor - // who actually created it, or are they forwarding it? - forward := true - for iter := attrToProp.Begin(); iter != attrToProp.End(); iter = iter.Next() { - actorURI, err := pub.ToId(iter) - if err != nil { - return gtserror.Newf("error extracting id from attributedTo entry: %w", err) - } - - if requestingAccount.URI == actorURI.String() { - // The actor who posted this statusable to our inbox is - // (one of) its creator(s), so this is not a forward. - forward = false - break - } - } - - // Check if we already have a status entry - // for this statusable, based on the ID/URI. - statusableURIStr := statusableURI.String() - status, err := f.state.DB.GetStatusByURI(ctx, statusableURIStr) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - return gtserror.Newf("db error checking existence of status %s: %w", statusableURIStr, err) - } - - if status != nil { - // We already had this status in the db, no need for further action. - log.Trace(ctx, "status already exists: %s", statusableURIStr) - return nil - } - // If we do have a forward, we should ignore the content // and instead deref based on the URI of the statusable. // // In other words, don't automatically trust whoever sent // this status to us, but fetch the authentic article from // the server it originated from. - if forward { - // Pass the statusable URI (APIri) into the processor worker - // and do the rest of the processing asynchronously. + if forwarded { + // Pass the statusable URI (APIri) into the processor + // worker and do the rest of the processing asynchronously. f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, - APIri: statusableURI, + APIri: ap.GetJSONLDId(statusable), APObjectModel: nil, GTSModel: nil, - ReceivingAccount: receivingAccount, + ReceivingAccount: receiver, }) return nil } - // This is a non-forwarded status we can trust the requester on, - // convert this provided statusable data to a useable gtsmodel status. - status, err = f.converter.ASStatusToStatus(ctx, statusable) - if err != nil { - return gtserror.Newf("error converting statusable to status: %w", err) - } - // Check whether we should accept this new status. accept, err := f.shouldAcceptStatusable(ctx, - receivingAccount, - requestingAccount, - status, + receiver, + requester, + statusable, ) if err != nil { return gtserror.Newf("error checking status acceptibility: %w", err) @@ -258,65 +220,52 @@ func (f *federatingDB) createStatusable( return nil } - // ID the new status based on the time it was created. - status.ID, err = id.NewULIDFromTime(status.CreatedAt) - if err != nil { - return err - } - - // Put this newly parsed status in the database. - if err := f.state.DB.PutStatus(ctx, status); err != nil { - if errors.Is(err, db.ErrAlreadyExists) { - // The status already exists in the database, which - // means we've already processed it and some race - // condition means we didn't catch it yet. We can - // just return nil here and be done with it. - return nil - } - return gtserror.Newf("db error inserting status: %w", err) - } - // Do the rest of the processing asynchronously. The processor // will handle inserting/updating + further dereferencing the status. f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, APIri: nil, + GTSModel: nil, APObjectModel: statusable, - GTSModel: status, - ReceivingAccount: receivingAccount, + ReceivingAccount: receiver, }) return nil } -func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, status *gtsmodel.Status) (bool, error) { +func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, statusable ap.Statusable) (bool, error) { host := config.GetHost() accountDomain := config.GetAccountDomain() // Check whether status mentions the receiver, // this is the quickest check so perform it first. - // Prefer checking using mention Href, fall back to Name. - for _, mention := range status.Mentions { - targetURI := mention.TargetAccountURI - nameString := mention.NameString - - if targetURI != "" { - if targetURI == receiver.URI || targetURI == receiver.URL { - // Target URI or URL match; - // receiver is mentioned. + mentions, _ := ap.ExtractMentions(statusable) + for _, mention := range mentions { + + // Extract placeholder mention vars. + accURI := mention.TargetAccountURI + name := mention.NameString + + switch { + case accURI != "": + if accURI == receiver.URI || + accURI == receiver.URL { + // Mention target is receiver, + // they are mentioned in status. return true, nil } - } else if nameString != "" { - username, domain, err := util.ExtractNamestringParts(nameString) + + case accURI == "" && name != "": + // Only a name was provided, extract the user@domain parts. + user, domain, err := util.ExtractNamestringParts(name) if err != nil { - return false, gtserror.Newf("error checking if mentioned: %w", err) + return false, gtserror.Newf("error extracting mention name parts: %w", err) } - if (domain == host || domain == accountDomain) && - strings.EqualFold(username, receiver.Username) { - // Username + domain match; - // receiver is mentioned. + // Check if the name points to our receiving local user. + isLocal := (domain == host || domain == accountDomain) + if isLocal && strings.EqualFold(user, receiver.Username) { return true, nil } } |