diff options
author | 2023-11-04 20:21:20 +0000 | |
---|---|---|
committer | 2023-11-04 20:21:20 +0000 | |
commit | 41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8 (patch) | |
tree | 987b5d7787b24f6f6e340bbcf7fd1b052fe40dfc /internal/federation | |
parent | [docs/bugfix] fix link to swagger yaml (#2333) (diff) | |
download | gotosocial-41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8.tar.xz |
[feature] support canceling scheduled tasks, some federation API performance improvements (#2329)
Diffstat (limited to 'internal/federation')
-rw-r--r-- | internal/federation/dereferencing/account.go | 2 | ||||
-rw-r--r-- | internal/federation/dereferencing/status.go | 121 | ||||
-rw-r--r-- | internal/federation/dereferencing/thread.go | 47 | ||||
-rw-r--r-- | internal/federation/federatingdb/create.go | 173 | ||||
-rw-r--r-- | internal/federation/federatingdb/create_test.go | 17 | ||||
-rw-r--r-- | internal/federation/federatingdb/db.go | 6 | ||||
-rw-r--r-- | internal/federation/federatingdb/question.go | 32 | ||||
-rw-r--r-- | internal/federation/federatingdb/update.go | 45 | ||||
-rw-r--r-- | internal/federation/federatingdb/util.go | 165 | ||||
-rw-r--r-- | internal/federation/federatingprotocol.go | 3 |
10 files changed, 261 insertions, 350 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 58f07f9cd..a4e74de3c 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -945,7 +945,7 @@ func (d *Dereferencer) dereferenceAccountFeatured(ctx context.Context, requestUs // we still know it was *meant* to be pinned. statusURIs = append(statusURIs, statusURI) - status, _, err := d.getStatusByURI(ctx, requestUser, statusURI) + status, _, _, err := d.getStatusByURI(ctx, requestUser, statusURI) if err != nil { // We couldn't get the status, bummer. Just log + move on, we can try later. log.Errorf(ctx, "error getting status from featured collection %s: %v", statusURI, err) diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index 712692814..4dd6d3baf 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -22,9 +22,8 @@ import ( "errors" "io" "net/url" - "time" - "slices" + "time" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -55,12 +54,12 @@ func statusUpToDate(status *gtsmodel.Status) bool { return false } -// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model -// whose last_fetched date is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information -// may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the status thread. An ActivityPub object indicates the status was dereferenced. +// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model whose 'last_fetched' date +// is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, +// e.g. dereferencing the status thread. Param 'syncParent' = true indicates to fetch status ancestors synchronously. An ActivityPub object indicates the status was dereferenced. func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { // Fetch and dereference status if necessary. - status, apubStatus, err := d.getStatusByURI(ctx, + status, statusable, isNew, err := d.getStatusByURI(ctx, requestUser, uri, ) @@ -68,18 +67,22 @@ func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, u return nil, nil, err } - if apubStatus != nil { - // This status was updated, enqueue re-dereferencing the whole thread. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - d.dereferenceThread(ctx, requestUser, uri, status, apubStatus) - }) + if statusable != nil { + // Deref parents + children. + d.dereferenceThread(ctx, + requestUser, + uri, + status, + statusable, + isNew, + ) } - return status, apubStatus, nil + return status, statusable, nil } // getStatusByURI is a package internal form of .GetStatusByURI() that doesn't bother dereferencing the whole thread on update. -func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { +func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, bool, error) { var ( status *gtsmodel.Status uriStr = uri.String() @@ -94,7 +97,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u uriStr, ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, nil, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err) + return nil, nil, false, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err) } if status == nil { @@ -104,14 +107,14 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u uriStr, ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, nil, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err) + return nil, nil, false, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err) } } if status == nil { // Ensure that this isn't a search for a local status. if uri.Host == config.GetHost() || uri.Host == config.GetAccountDomain() { - return nil, nil, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries + return nil, nil, false, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries } // Create and pass-through a new bare-bones model for deref. @@ -127,11 +130,11 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err := d.state.DB.PopulateStatus(ctx, status); err != nil { log.Errorf(ctx, "error populating existing status: %v", err) } - return status, nil, nil + return status, nil, false, nil } // Try to update + deref existing status model. - latest, apubStatus, err := d.enrichStatusSafely(ctx, + latest, statusable, isNew, err := d.enrichStatusSafely(ctx, requestUser, uri, status, @@ -140,17 +143,22 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) - // Fallback to existing. - return status, nil, nil + // Fallback to existing status. + return status, nil, false, nil } - return latest, apubStatus, nil + return latest, statusable, isNew, nil } -// RefreshStatus updates the given status if remote and last_fetched is beyond fetch interval, or if force is set. An updated status model is returned, -// but in the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the -// status thread. An ActivityPub object indicates the status was dereferenced (i.e. updated). -func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) (*gtsmodel.Status, ap.Statusable, error) { +// RefreshStatus is functionally equivalent to GetStatusByURI(), except that it requires a pre +// populated status model (with AT LEAST uri set), and ALL thread dereferencing is asynchronous. +func (d *Dereferencer) RefreshStatus( + ctx context.Context, + requestUser string, + status *gtsmodel.Status, + statusable ap.Statusable, + force bool, +) (*gtsmodel.Status, ap.Statusable, error) { // Check whether needs update. if !force && statusUpToDate(status) { return status, nil, nil @@ -162,28 +170,40 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err) } - // Try to update + deref the passed status model. - latest, apubStatus, err := d.enrichStatusSafely(ctx, + // Try to update + dereference the passed status model. + latest, statusable, isNew, err := d.enrichStatusSafely(ctx, requestUser, uri, status, - apubStatus, + statusable, ) if err != nil { return nil, nil, err } - // This status was updated, enqueue re-dereferencing the whole thread. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) - }) + if statusable != nil { + // Deref parents + children. + d.dereferenceThread(ctx, + requestUser, + uri, + status, + statusable, + isNew, + ) + } - return latest, apubStatus, nil + return latest, statusable, nil } -// RefreshStatusAsync enqueues the given status for an asychronous update fetching, if last_fetched is beyond fetch interval, or if force is set. -// This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary. -func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) { +// RefreshStatusAsync is functionally equivalent to RefreshStatus(), except that ALL +// dereferencing is queued for asynchronous processing, (both thread AND status). +func (d *Dereferencer) RefreshStatusAsync( + ctx context.Context, + requestUser string, + status *gtsmodel.Status, + statusable ap.Statusable, + force bool, +) { // Check whether needs update. if !force && statusUpToDate(status) { return @@ -196,17 +216,25 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin return } - // Enqueue a worker function to re-fetch this status async. + // Enqueue a worker function to re-fetch this status entirely async. d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus) + latest, statusable, _, err := d.enrichStatusSafely(ctx, + requestUser, + uri, + status, + statusable, + ) if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) return } - - if apubStatus != nil { - // This status was updated, re-dereference the whole thread. - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) + if statusable != nil { + if err := d.DereferenceStatusAncestors(ctx, requestUser, latest); err != nil { + log.Error(ctx, err) + } + if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { + log.Error(ctx, err) + } } }) } @@ -220,7 +248,7 @@ func (d *Dereferencer) enrichStatusSafely( uri *url.URL, status *gtsmodel.Status, apubStatus ap.Statusable, -) (*gtsmodel.Status, ap.Statusable, error) { +) (*gtsmodel.Status, ap.Statusable, bool, error) { uriStr := status.URI if status.ID != "" { @@ -238,6 +266,9 @@ func (d *Dereferencer) enrichStatusSafely( unlock = doOnce(unlock) defer unlock() + // This is a NEW status (to us). + isNew := (status.ID == "") + // Perform status enrichment with passed vars. latest, apubStatus, err := d.enrichStatus(ctx, requestUser, @@ -261,6 +292,7 @@ func (d *Dereferencer) enrichStatusSafely( // otherwise this indicates WE // enriched the status. apubStatus = nil + isNew = false // DATA RACE! We likely lost out to another goroutine // in a call to db.Put(Status). Look again in DB by URI. @@ -270,7 +302,7 @@ func (d *Dereferencer) enrichStatusSafely( } } - return latest, apubStatus, err + return latest, apubStatus, isNew, err } // enrichStatus will enrich the given status, whether a new @@ -343,6 +375,7 @@ func (d *Dereferencer) enrichStatus( } // Carry-over values and set fetch time. + latestStatus.UpdatedAt = status.UpdatedAt latestStatus.FetchedAt = time.Now() latestStatus.Local = status.Local diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index 5753ce4dd..0ad8f09e4 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -38,15 +38,42 @@ import ( // ancesters we are willing to follow before returning error. const maxIter = 1000 -func (d *Dereferencer) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) { - // Ensure that ancestors have been fully dereferenced - if err := d.DereferenceStatusAncestors(ctx, username, status); err != nil { - log.Error(ctx, err) - } +// dereferenceThread handles dereferencing status thread after +// fetch. Passing off appropriate parts to be enqueued for async +// processing, or handling some parts synchronously when required. +func (d *Dereferencer) dereferenceThread( + ctx context.Context, + requestUser string, + uri *url.URL, + status *gtsmodel.Status, + statusable ap.Statusable, + isNew bool, +) { + if isNew { + // This is a new status that we need the ancestors of in + // order to determine visibility. Perform the initial part + // of thread dereferencing, i.e. parents, synchronously. + err := d.DereferenceStatusAncestors(ctx, requestUser, status) + if err != nil { + log.Error(ctx, err) + } - // Ensure that descendants have been fully dereferenced - if err := d.DereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil { - log.Error(ctx, err) + // Enqueue dereferencing remaining status thread, (children), asychronously . + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { + log.Error(ctx, err) + } + }) + } else { + // This is an existing status, dereference the WHOLE thread asynchronously. + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil { + log.Error(ctx, err) + } + if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { + log.Error(ctx, err) + } + }) } } @@ -157,7 +184,7 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username // - refetching recently fetched statuses (recursion!) // - remote domain is blocked (will return unretrievable) // - any http type error for a new status returns unretrievable - parent, _, err := d.getStatusByURI(ctx, username, inReplyToURI) + parent, _, _, err := d.getStatusByURI(ctx, username, inReplyToURI) if err == nil { // We successfully fetched the parent. // Update current status with new info. @@ -325,7 +352,7 @@ stackLoop: // - refetching recently fetched statuses (recursion!) // - remote domain is blocked (will return unretrievable) // - any http type error for a new status returns unretrievable - _, statusable, err := d.getStatusByURI(ctx, username, itemIRI) + _, statusable, _, err := d.getStatusByURI(ctx, username, itemIRI) if err != nil { if !gtserror.Unretrievable(err) { l.Errorf("error dereferencing remote status %s: %v", itemIRI, err) diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go index 14e846b15..0fb459190 100644 --- a/internal/federation/federatingdb/create.go +++ b/internal/federation/federatingdb/create.go @@ -24,7 +24,6 @@ import ( "strings" "codeberg.org/gruf/go-logger/v2/level" - "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -137,22 +136,37 @@ func (f *federatingDB) activityCreate( return gtserror.Newf("could not convert asType %T to ActivityStreamsCreate", asType) } - for _, object := range ap.ExtractObjects(create) { - // Try to get object as vocab.Type, - // else skip handling (likely) IRI. - objType := object.GetType() - if objType == nil { - continue - } + var errs gtserror.MultiError + + // Extract objects from create activity. + objects := ap.ExtractObjects(create) + + // Extract Statusables from objects slice (this must be + // done AFTER extracting options due to how AS typing works). + statusables, objects := ap.ExtractStatusables(objects) - if statusable, ok := ap.ToStatusable(objType); ok { - return f.createStatusable(ctx, statusable, receivingAccount, requestingAccount) + for _, statusable := range statusables { + // Check if this is a forwarded object, i.e. did + // the account making the request also create this? + forwarded := !isSender(statusable, requestingAccount) + + // Handle create event for this statusable. + if err := f.createStatusable(ctx, + receivingAccount, + requestingAccount, + statusable, + forwarded, + ); err != nil { + errs.Appendf("error creating statusable: %w", err) } + } - // TODO: handle CREATE of other types? + if len(objects) > 0 { + // Log any unhandled objects after filtering for debug purposes. + log.Debugf(ctx, "unhandled CREATE types: %v", typeNames(objects)) } - return nil + return errs.Combine() } // createStatusable handles a Create activity for a Statusable. @@ -161,88 +175,36 @@ func (f *federatingDB) activityCreate( // the processor for further asynchronous processing. func (f *federatingDB) createStatusable( ctx context.Context, + receiver *gtsmodel.Account, + requester *gtsmodel.Account, statusable ap.Statusable, - receivingAccount *gtsmodel.Account, - requestingAccount *gtsmodel.Account, + forwarded bool, ) error { - // Statusable must have an attributedTo. - attrToProp := statusable.GetActivityStreamsAttributedTo() - if attrToProp == nil { - return gtserror.Newf("statusable had no attributedTo") - } - - // Statusable must have an ID. - idProp := statusable.GetJSONLDId() - if idProp == nil || !idProp.IsIRI() { - return gtserror.Newf("statusable had no id, or id was not a URI") - } - - statusableURI := idProp.GetIRI() - - // Check if we have a forward. In other words, was the - // statusable posted to our inbox by at least one actor - // who actually created it, or are they forwarding it? - forward := true - for iter := attrToProp.Begin(); iter != attrToProp.End(); iter = iter.Next() { - actorURI, err := pub.ToId(iter) - if err != nil { - return gtserror.Newf("error extracting id from attributedTo entry: %w", err) - } - - if requestingAccount.URI == actorURI.String() { - // The actor who posted this statusable to our inbox is - // (one of) its creator(s), so this is not a forward. - forward = false - break - } - } - - // Check if we already have a status entry - // for this statusable, based on the ID/URI. - statusableURIStr := statusableURI.String() - status, err := f.state.DB.GetStatusByURI(ctx, statusableURIStr) - if err != nil && !errors.Is(err, db.ErrNoEntries) { - return gtserror.Newf("db error checking existence of status %s: %w", statusableURIStr, err) - } - - if status != nil { - // We already had this status in the db, no need for further action. - log.Trace(ctx, "status already exists: %s", statusableURIStr) - return nil - } - // If we do have a forward, we should ignore the content // and instead deref based on the URI of the statusable. // // In other words, don't automatically trust whoever sent // this status to us, but fetch the authentic article from // the server it originated from. - if forward { - // Pass the statusable URI (APIri) into the processor worker - // and do the rest of the processing asynchronously. + if forwarded { + // Pass the statusable URI (APIri) into the processor + // worker and do the rest of the processing asynchronously. f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, - APIri: statusableURI, + APIri: ap.GetJSONLDId(statusable), APObjectModel: nil, GTSModel: nil, - ReceivingAccount: receivingAccount, + ReceivingAccount: receiver, }) return nil } - // This is a non-forwarded status we can trust the requester on, - // convert this provided statusable data to a useable gtsmodel status. - status, err = f.converter.ASStatusToStatus(ctx, statusable) - if err != nil { - return gtserror.Newf("error converting statusable to status: %w", err) - } - // Check whether we should accept this new status. accept, err := f.shouldAcceptStatusable(ctx, - receivingAccount, - requestingAccount, - status, + receiver, + requester, + statusable, ) if err != nil { return gtserror.Newf("error checking status acceptibility: %w", err) @@ -258,65 +220,52 @@ func (f *federatingDB) createStatusable( return nil } - // ID the new status based on the time it was created. - status.ID, err = id.NewULIDFromTime(status.CreatedAt) - if err != nil { - return err - } - - // Put this newly parsed status in the database. - if err := f.state.DB.PutStatus(ctx, status); err != nil { - if errors.Is(err, db.ErrAlreadyExists) { - // The status already exists in the database, which - // means we've already processed it and some race - // condition means we didn't catch it yet. We can - // just return nil here and be done with it. - return nil - } - return gtserror.Newf("db error inserting status: %w", err) - } - // Do the rest of the processing asynchronously. The processor // will handle inserting/updating + further dereferencing the status. f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, APIri: nil, + GTSModel: nil, APObjectModel: statusable, - GTSModel: status, - ReceivingAccount: receivingAccount, + ReceivingAccount: receiver, }) return nil } -func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, status *gtsmodel.Status) (bool, error) { +func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, statusable ap.Statusable) (bool, error) { host := config.GetHost() accountDomain := config.GetAccountDomain() // Check whether status mentions the receiver, // this is the quickest check so perform it first. - // Prefer checking using mention Href, fall back to Name. - for _, mention := range status.Mentions { - targetURI := mention.TargetAccountURI - nameString := mention.NameString - - if targetURI != "" { - if targetURI == receiver.URI || targetURI == receiver.URL { - // Target URI or URL match; - // receiver is mentioned. + mentions, _ := ap.ExtractMentions(statusable) + for _, mention := range mentions { + + // Extract placeholder mention vars. + accURI := mention.TargetAccountURI + name := mention.NameString + + switch { + case accURI != "": + if accURI == receiver.URI || + accURI == receiver.URL { + // Mention target is receiver, + // they are mentioned in status. return true, nil } - } else if nameString != "" { - username, domain, err := util.ExtractNamestringParts(nameString) + + case accURI == "" && name != "": + // Only a name was provided, extract the user@domain parts. + user, domain, err := util.ExtractNamestringParts(name) if err != nil { - return false, gtserror.Newf("error checking if mentioned: %w", err) + return false, gtserror.Newf("error extracting mention name parts: %w", err) } - if (domain == host || domain == accountDomain) && - strings.EqualFold(username, receiver.Username) { - // Username + domain match; - // receiver is mentioned. + // Check if the name points to our receiving local user. + isLocal := (domain == host || domain == accountDomain) + if isLocal && strings.EqualFold(user, receiver.Username) { return true, nil } } diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go index 6c18f5bd0..a1f1a7e18 100644 --- a/internal/federation/federatingdb/create_test.go +++ b/internal/federation/federatingdb/create_test.go @@ -39,6 +39,8 @@ func (suite *CreateTestSuite) TestCreateNote() { ctx := createTestContext(receivingAccount, requestingAccount) create := suite.testActivities["dm_for_zork"].Activity + objProp := create.GetActivityStreamsObject() + note := objProp.At(0).GetType().(ap.Statusable) err := suite.federatingDB.Create(ctx, create) suite.NoError(err) @@ -47,18 +49,7 @@ func (suite *CreateTestSuite) TestCreateNote() { msg := <-suite.fromFederator suite.Equal(ap.ObjectNote, msg.APObjectType) suite.Equal(ap.ActivityCreate, msg.APActivityType) - - // shiny new status should be defined on the message - suite.NotNil(msg.GTSModel) - status := msg.GTSModel.(*gtsmodel.Status) - - // status should have some expected values - suite.Equal(requestingAccount.ID, status.AccountID) - suite.Equal("@the_mighty_zork@localhost:8080 hey zork here's a new private note for you", status.Content) - - // status should be in the database - _, err = suite.db.GetStatusByID(context.Background(), status.ID) - suite.NoError(err) + suite.Equal(note, msg.APObjectModel) } func (suite *CreateTestSuite) TestCreateNoteForward() { @@ -78,7 +69,7 @@ func (suite *CreateTestSuite) TestCreateNoteForward() { suite.Equal(ap.ActivityCreate, msg.APActivityType) // nothing should be set as the model since this is a forward - suite.Nil(msg.GTSModel) + suite.Nil(msg.APObjectModel) // but we should have a uri set suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIri.String()) diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go index 8e98dc2f2..75ef3a2a7 100644 --- a/internal/federation/federatingdb/db.go +++ b/internal/federation/federatingdb/db.go @@ -24,6 +24,7 @@ import ( "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" ) // DB wraps the pub.Database interface with a couple of custom functions for GoToSocial. @@ -33,7 +34,6 @@ type DB interface { Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error - Question(ctx context.Context, question vocab.ActivityStreamsQuestion) error } // FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface. @@ -41,13 +41,15 @@ type DB interface { type federatingDB struct { state *state.State converter *typeutils.Converter + filter *visibility.Filter } // New returns a DB interface using the given database and config -func New(state *state.State, converter *typeutils.Converter) DB { +func New(state *state.State, converter *typeutils.Converter, filter *visibility.Filter) DB { fdb := federatingDB{ state: state, converter: converter, + filter: filter, } return &fdb } diff --git a/internal/federation/federatingdb/question.go b/internal/federation/federatingdb/question.go deleted file mode 100644 index 85226d9ed..000000000 --- a/internal/federation/federatingdb/question.go +++ /dev/null @@ -1,32 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package federatingdb - -import ( - "context" - - "github.com/superseriousbusiness/activity/streams/vocab" -) - -func (f *federatingDB) Question(ctx context.Context, question vocab.ActivityStreamsQuestion) error { - receivingAccount, requestingAccount, internal := extractFromCtx(ctx) - if internal { - return nil // Already processed. - } - return f.createStatusable(ctx, question, receivingAccount, requestingAccount) -} diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go index 5d3d4a0ff..26ea81f72 100644 --- a/internal/federation/federatingdb/update.go +++ b/internal/federation/federatingdb/update.go @@ -19,11 +19,13 @@ package federatingdb import ( "context" + "errors" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -71,18 +73,21 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts // Extract AP URI of the updated Accountable model. idProp := accountable.GetJSONLDId() if idProp == nil || !idProp.IsIRI() { - return gtserror.New("Accountable id prop was nil or not IRI") + return gtserror.New("invalid id prop") } - updatedAcctURI := idProp.GetIRI() - // Don't try to update local accounts, it will break things. - if updatedAcctURI.Host == config.GetHost() { + // Get the account URI string for checks + accountURI := idProp.GetIRI() + accountURIStr := accountURI.String() + + // Don't try to update local accounts. + if accountURI.Host == config.GetHost() { return nil } - // Ensure Accountable and requesting account are one and the same. - if updatedAcctURIStr := updatedAcctURI.String(); requestingAcct.URI != updatedAcctURIStr { - return gtserror.Newf("update for %s was requested by %s, this is not valid", updatedAcctURIStr, requestingAcct.URI) + // Check that update was by the account themselves. + if accountURIStr != requestingAcct.URI { + return gtserror.Newf("update for %s was not requested by owner", accountURIStr) } // Pass in to the processor the existing version of the requesting @@ -117,15 +122,31 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm return nil } + // Check if this is a forwarded object, i.e. did + // the account making the request also create this? + forwarded := !isSender(statusable, requestingAcct) + // Get the status we have on file for this URI string. status, err := f.state.DB.GetStatusByURI(ctx, statusURIStr) - if err != nil { + if err != nil && !errors.Is(err, db.ErrNoEntries) { return gtserror.Newf("error fetching status from db: %w", err) } - // Check that update was by the status author. - if status.AccountID != requestingAcct.ID { - return gtserror.Newf("update for %s was not requested by author", statusURIStr) + if status == nil { + // We haven't seen this status before, be + // lenient and handle as a CREATE event. + return f.createStatusable(ctx, + receivingAcct, + requestingAcct, + statusable, + forwarded, + ) + } + + if forwarded { + // For forwarded updates, set a nil AS + // status to force refresh from remote. + statusable = nil } // Queue an UPDATE NOTE activity to our fedi API worker, @@ -134,7 +155,7 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm APObjectType: ap.ObjectNote, APActivityType: ap.ActivityUpdate, GTSModel: status, // original status - APObjectModel: statusable, + APObjectModel: (ap.Statusable)(statusable), ReceivingAccount: receivingAcct, }) diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go index d46451e21..dd7a2240e 100644 --- a/internal/federation/federatingdb/util.go +++ b/internal/federation/federatingdb/util.go @@ -20,7 +20,6 @@ package federatingdb import ( "context" "encoding/json" - "errors" "fmt" "net/url" @@ -37,6 +36,30 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/uris" ) +func typeNames(objects []ap.TypeOrIRI) []string { + names := make([]string, len(objects)) + for i, object := range objects { + if object.IsIRI() { + names[i] = "IRI" + } else if t := object.GetType(); t != nil { + names[i] = t.GetTypeName() + } else { + names[i] = "nil" + } + } + return names +} + +// isSender returns whether an object with AttributedTo property comes from the given requesting account. +func isSender(with ap.WithAttributedTo, requester *gtsmodel.Account) bool { + for _, uri := range ap.GetAttributedTo(with) { + if uri.String() == requester.URI { + return true + } + } + return false +} + func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityStreamsActorProperty) bool { if actor1 == nil || actor2 == nil { return false @@ -78,131 +101,31 @@ func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL, l.Debug("entering NewID") } - switch t.GetTypeName() { - case ap.ActivityFollow: - // FOLLOW - // ID might already be set on a follow we've created, so check it here and return it if it is - follow, ok := t.(vocab.ActivityStreamsFollow) - if !ok { - return nil, errors.New("newid: follow couldn't be parsed into vocab.ActivityStreamsFollow") - } - idProp := follow.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - // it's not set so create one based on the actor set on the follow (ie., the followER not the followEE) - actorProp := follow.GetActivityStreamsActor() - if actorProp != nil { - for iter := actorProp.Begin(); iter != actorProp.End(); iter = iter.Next() { - // take the IRI of the first actor we can find (there should only be one) - if iter.IsIRI() { - // if there's an error here, just use the fallback behavior -- we don't need to return an error here - if actorAccount, err := f.state.DB.GetAccountByURI(ctx, iter.GetIRI().String()); err == nil { - newID, err := id.NewRandomULID() - if err != nil { - return nil, err - } - return url.Parse(uris.GenerateURIForFollow(actorAccount.Username, newID)) - } + // Most of our types set an ID already + // by this point, return this if found. + idProp := t.GetJSONLDId() + if idProp != nil && idProp.IsIRI() { + return idProp.GetIRI(), nil + } + + if t.GetTypeName() == ap.ActivityFollow { + follow, _ := t.(vocab.ActivityStreamsFollow) + + // If an actor URI has been set, create a new ID + // based on actor (i.e. followER not the followEE). + if uri := ap.GetActor(follow); len(uri) == 1 { + if actorAccount, err := f.state.DB.GetAccountByURI(ctx, uri[0].String()); err == nil { + newID, err := id.NewRandomULID() + if err != nil { + return nil, err } - } - } - case ap.ObjectNote: - // NOTE aka STATUS - // ID might already be set on a note we've created, so check it here and return it if it is - note, ok := t.(vocab.ActivityStreamsNote) - if !ok { - return nil, errors.New("newid: note couldn't be parsed into vocab.ActivityStreamsNote") - } - idProp := note.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityLike: - // LIKE aka FAVE - // ID might already be set on a fave we've created, so check it here and return it if it is - fave, ok := t.(vocab.ActivityStreamsLike) - if !ok { - return nil, errors.New("newid: fave couldn't be parsed into vocab.ActivityStreamsLike") - } - idProp := fave.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityCreate: - // CREATE - // ID might already be set on a Create, so check it here and return it if it is - create, ok := t.(vocab.ActivityStreamsCreate) - if !ok { - return nil, errors.New("newid: create couldn't be parsed into vocab.ActivityStreamsCreate") - } - idProp := create.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityAnnounce: - // ANNOUNCE aka BOOST - // ID might already be set on an announce we've created, so check it here and return it if it is - announce, ok := t.(vocab.ActivityStreamsAnnounce) - if !ok { - return nil, errors.New("newid: announce couldn't be parsed into vocab.ActivityStreamsAnnounce") - } - idProp := announce.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityUpdate: - // UPDATE - // ID might already be set on an update we've created, so check it here and return it if it is - update, ok := t.(vocab.ActivityStreamsUpdate) - if !ok { - return nil, errors.New("newid: update couldn't be parsed into vocab.ActivityStreamsUpdate") - } - idProp := update.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityBlock: - // BLOCK - // ID might already be set on a block we've created, so check it here and return it if it is - block, ok := t.(vocab.ActivityStreamsBlock) - if !ok { - return nil, errors.New("newid: block couldn't be parsed into vocab.ActivityStreamsBlock") - } - idProp := block.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil - } - } - case ap.ActivityUndo: - // UNDO - // ID might already be set on an undo we've created, so check it here and return it if it is - undo, ok := t.(vocab.ActivityStreamsUndo) - if !ok { - return nil, errors.New("newid: undo couldn't be parsed into vocab.ActivityStreamsUndo") - } - idProp := undo.GetJSONLDId() - if idProp != nil { - if idProp.IsIRI() { - return idProp.GetIRI(), nil + return url.Parse(uris.GenerateURIForFollow(actorAccount.Username, newID)) } } } - // fallback default behavior: just return a random ULID after our protocol and host + // Default fallback behaviour: + // {proto}://{host}/{randomID} newID, err := id.NewRandomULID() if err != nil { return nil, err diff --git a/internal/federation/federatingprotocol.go b/internal/federation/federatingprotocol.go index 28dc145af..5a913dbbe 100644 --- a/internal/federation/federatingprotocol.go +++ b/internal/federation/federatingprotocol.go @@ -522,9 +522,6 @@ func (f *Federator) FederatingCallbacks(ctx context.Context) (wrapped pub.Federa func(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error { return f.FederatingDB().Announce(ctx, announce) }, - func(ctx context.Context, question vocab.ActivityStreamsQuestion) error { - return f.FederatingDB().Question(ctx, question) - }, } return |