summaryrefslogtreecommitdiff
path: root/internal/federation
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2023-11-04 20:21:20 +0000
committerLibravatar GitHub <noreply@github.com>2023-11-04 20:21:20 +0000
commit41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8 (patch)
tree987b5d7787b24f6f6e340bbcf7fd1b052fe40dfc /internal/federation
parent[docs/bugfix] fix link to swagger yaml (#2333) (diff)
downloadgotosocial-41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8.tar.xz
[feature] support canceling scheduled tasks, some federation API performance improvements (#2329)
Diffstat (limited to 'internal/federation')
-rw-r--r--internal/federation/dereferencing/account.go2
-rw-r--r--internal/federation/dereferencing/status.go121
-rw-r--r--internal/federation/dereferencing/thread.go47
-rw-r--r--internal/federation/federatingdb/create.go173
-rw-r--r--internal/federation/federatingdb/create_test.go17
-rw-r--r--internal/federation/federatingdb/db.go6
-rw-r--r--internal/federation/federatingdb/question.go32
-rw-r--r--internal/federation/federatingdb/update.go45
-rw-r--r--internal/federation/federatingdb/util.go165
-rw-r--r--internal/federation/federatingprotocol.go3
10 files changed, 261 insertions, 350 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go
index 58f07f9cd..a4e74de3c 100644
--- a/internal/federation/dereferencing/account.go
+++ b/internal/federation/dereferencing/account.go
@@ -945,7 +945,7 @@ func (d *Dereferencer) dereferenceAccountFeatured(ctx context.Context, requestUs
// we still know it was *meant* to be pinned.
statusURIs = append(statusURIs, statusURI)
- status, _, err := d.getStatusByURI(ctx, requestUser, statusURI)
+ status, _, _, err := d.getStatusByURI(ctx, requestUser, statusURI)
if err != nil {
// We couldn't get the status, bummer. Just log + move on, we can try later.
log.Errorf(ctx, "error getting status from featured collection %s: %v", statusURI, err)
diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go
index 712692814..4dd6d3baf 100644
--- a/internal/federation/dereferencing/status.go
+++ b/internal/federation/dereferencing/status.go
@@ -22,9 +22,8 @@ import (
"errors"
"io"
"net/url"
- "time"
-
"slices"
+ "time"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/config"
@@ -55,12 +54,12 @@ func statusUpToDate(status *gtsmodel.Status) bool {
return false
}
-// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model
-// whose last_fetched date is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information
-// may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the status thread. An ActivityPub object indicates the status was dereferenced.
+// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model whose 'last_fetched' date
+// is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching,
+// e.g. dereferencing the status thread. Param 'syncParent' = true indicates to fetch status ancestors synchronously. An ActivityPub object indicates the status was dereferenced.
func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) {
// Fetch and dereference status if necessary.
- status, apubStatus, err := d.getStatusByURI(ctx,
+ status, statusable, isNew, err := d.getStatusByURI(ctx,
requestUser,
uri,
)
@@ -68,18 +67,22 @@ func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, u
return nil, nil, err
}
- if apubStatus != nil {
- // This status was updated, enqueue re-dereferencing the whole thread.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- d.dereferenceThread(ctx, requestUser, uri, status, apubStatus)
- })
+ if statusable != nil {
+ // Deref parents + children.
+ d.dereferenceThread(ctx,
+ requestUser,
+ uri,
+ status,
+ statusable,
+ isNew,
+ )
}
- return status, apubStatus, nil
+ return status, statusable, nil
}
// getStatusByURI is a package internal form of .GetStatusByURI() that doesn't bother dereferencing the whole thread on update.
-func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) {
+func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, bool, error) {
var (
status *gtsmodel.Status
uriStr = uri.String()
@@ -94,7 +97,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
uriStr,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
- return nil, nil, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err)
+ return nil, nil, false, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err)
}
if status == nil {
@@ -104,14 +107,14 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
uriStr,
)
if err != nil && !errors.Is(err, db.ErrNoEntries) {
- return nil, nil, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err)
+ return nil, nil, false, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err)
}
}
if status == nil {
// Ensure that this isn't a search for a local status.
if uri.Host == config.GetHost() || uri.Host == config.GetAccountDomain() {
- return nil, nil, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries
+ return nil, nil, false, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries
}
// Create and pass-through a new bare-bones model for deref.
@@ -127,11 +130,11 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
if err := d.state.DB.PopulateStatus(ctx, status); err != nil {
log.Errorf(ctx, "error populating existing status: %v", err)
}
- return status, nil, nil
+ return status, nil, false, nil
}
// Try to update + deref existing status model.
- latest, apubStatus, err := d.enrichStatusSafely(ctx,
+ latest, statusable, isNew, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
status,
@@ -140,17 +143,22 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u
if err != nil {
log.Errorf(ctx, "error enriching remote status: %v", err)
- // Fallback to existing.
- return status, nil, nil
+ // Fallback to existing status.
+ return status, nil, false, nil
}
- return latest, apubStatus, nil
+ return latest, statusable, isNew, nil
}
-// RefreshStatus updates the given status if remote and last_fetched is beyond fetch interval, or if force is set. An updated status model is returned,
-// but in the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the
-// status thread. An ActivityPub object indicates the status was dereferenced (i.e. updated).
-func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) (*gtsmodel.Status, ap.Statusable, error) {
+// RefreshStatus is functionally equivalent to GetStatusByURI(), except that it requires a pre
+// populated status model (with AT LEAST uri set), and ALL thread dereferencing is asynchronous.
+func (d *Dereferencer) RefreshStatus(
+ ctx context.Context,
+ requestUser string,
+ status *gtsmodel.Status,
+ statusable ap.Statusable,
+ force bool,
+) (*gtsmodel.Status, ap.Statusable, error) {
// Check whether needs update.
if !force && statusUpToDate(status) {
return status, nil, nil
@@ -162,28 +170,40 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st
return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err)
}
- // Try to update + deref the passed status model.
- latest, apubStatus, err := d.enrichStatusSafely(ctx,
+ // Try to update + dereference the passed status model.
+ latest, statusable, isNew, err := d.enrichStatusSafely(ctx,
requestUser,
uri,
status,
- apubStatus,
+ statusable,
)
if err != nil {
return nil, nil, err
}
- // This status was updated, enqueue re-dereferencing the whole thread.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus)
- })
+ if statusable != nil {
+ // Deref parents + children.
+ d.dereferenceThread(ctx,
+ requestUser,
+ uri,
+ status,
+ statusable,
+ isNew,
+ )
+ }
- return latest, apubStatus, nil
+ return latest, statusable, nil
}
-// RefreshStatusAsync enqueues the given status for an asychronous update fetching, if last_fetched is beyond fetch interval, or if force is set.
-// This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary.
-func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) {
+// RefreshStatusAsync is functionally equivalent to RefreshStatus(), except that ALL
+// dereferencing is queued for asynchronous processing, (both thread AND status).
+func (d *Dereferencer) RefreshStatusAsync(
+ ctx context.Context,
+ requestUser string,
+ status *gtsmodel.Status,
+ statusable ap.Statusable,
+ force bool,
+) {
// Check whether needs update.
if !force && statusUpToDate(status) {
return
@@ -196,17 +216,25 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin
return
}
- // Enqueue a worker function to re-fetch this status async.
+ // Enqueue a worker function to re-fetch this status entirely async.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus)
+ latest, statusable, _, err := d.enrichStatusSafely(ctx,
+ requestUser,
+ uri,
+ status,
+ statusable,
+ )
if err != nil {
log.Errorf(ctx, "error enriching remote status: %v", err)
return
}
-
- if apubStatus != nil {
- // This status was updated, re-dereference the whole thread.
- d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus)
+ if statusable != nil {
+ if err := d.DereferenceStatusAncestors(ctx, requestUser, latest); err != nil {
+ log.Error(ctx, err)
+ }
+ if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {
+ log.Error(ctx, err)
+ }
}
})
}
@@ -220,7 +248,7 @@ func (d *Dereferencer) enrichStatusSafely(
uri *url.URL,
status *gtsmodel.Status,
apubStatus ap.Statusable,
-) (*gtsmodel.Status, ap.Statusable, error) {
+) (*gtsmodel.Status, ap.Statusable, bool, error) {
uriStr := status.URI
if status.ID != "" {
@@ -238,6 +266,9 @@ func (d *Dereferencer) enrichStatusSafely(
unlock = doOnce(unlock)
defer unlock()
+ // This is a NEW status (to us).
+ isNew := (status.ID == "")
+
// Perform status enrichment with passed vars.
latest, apubStatus, err := d.enrichStatus(ctx,
requestUser,
@@ -261,6 +292,7 @@ func (d *Dereferencer) enrichStatusSafely(
// otherwise this indicates WE
// enriched the status.
apubStatus = nil
+ isNew = false
// DATA RACE! We likely lost out to another goroutine
// in a call to db.Put(Status). Look again in DB by URI.
@@ -270,7 +302,7 @@ func (d *Dereferencer) enrichStatusSafely(
}
}
- return latest, apubStatus, err
+ return latest, apubStatus, isNew, err
}
// enrichStatus will enrich the given status, whether a new
@@ -343,6 +375,7 @@ func (d *Dereferencer) enrichStatus(
}
// Carry-over values and set fetch time.
+ latestStatus.UpdatedAt = status.UpdatedAt
latestStatus.FetchedAt = time.Now()
latestStatus.Local = status.Local
diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go
index 5753ce4dd..0ad8f09e4 100644
--- a/internal/federation/dereferencing/thread.go
+++ b/internal/federation/dereferencing/thread.go
@@ -38,15 +38,42 @@ import (
// ancesters we are willing to follow before returning error.
const maxIter = 1000
-func (d *Dereferencer) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) {
- // Ensure that ancestors have been fully dereferenced
- if err := d.DereferenceStatusAncestors(ctx, username, status); err != nil {
- log.Error(ctx, err)
- }
+// dereferenceThread handles dereferencing status thread after
+// fetch. Passing off appropriate parts to be enqueued for async
+// processing, or handling some parts synchronously when required.
+func (d *Dereferencer) dereferenceThread(
+ ctx context.Context,
+ requestUser string,
+ uri *url.URL,
+ status *gtsmodel.Status,
+ statusable ap.Statusable,
+ isNew bool,
+) {
+ if isNew {
+ // This is a new status that we need the ancestors of in
+ // order to determine visibility. Perform the initial part
+ // of thread dereferencing, i.e. parents, synchronously.
+ err := d.DereferenceStatusAncestors(ctx, requestUser, status)
+ if err != nil {
+ log.Error(ctx, err)
+ }
- // Ensure that descendants have been fully dereferenced
- if err := d.DereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil {
- log.Error(ctx, err)
+ // Enqueue dereferencing remaining status thread, (children), asychronously .
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {
+ log.Error(ctx, err)
+ }
+ })
+ } else {
+ // This is an existing status, dereference the WHOLE thread asynchronously.
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil {
+ log.Error(ctx, err)
+ }
+ if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {
+ log.Error(ctx, err)
+ }
+ })
}
}
@@ -157,7 +184,7 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username
// - refetching recently fetched statuses (recursion!)
// - remote domain is blocked (will return unretrievable)
// - any http type error for a new status returns unretrievable
- parent, _, err := d.getStatusByURI(ctx, username, inReplyToURI)
+ parent, _, _, err := d.getStatusByURI(ctx, username, inReplyToURI)
if err == nil {
// We successfully fetched the parent.
// Update current status with new info.
@@ -325,7 +352,7 @@ stackLoop:
// - refetching recently fetched statuses (recursion!)
// - remote domain is blocked (will return unretrievable)
// - any http type error for a new status returns unretrievable
- _, statusable, err := d.getStatusByURI(ctx, username, itemIRI)
+ _, statusable, _, err := d.getStatusByURI(ctx, username, itemIRI)
if err != nil {
if !gtserror.Unretrievable(err) {
l.Errorf("error dereferencing remote status %s: %v", itemIRI, err)
diff --git a/internal/federation/federatingdb/create.go b/internal/federation/federatingdb/create.go
index 14e846b15..0fb459190 100644
--- a/internal/federation/federatingdb/create.go
+++ b/internal/federation/federatingdb/create.go
@@ -24,7 +24,6 @@ import (
"strings"
"codeberg.org/gruf/go-logger/v2/level"
- "github.com/superseriousbusiness/activity/pub"
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/config"
@@ -137,22 +136,37 @@ func (f *federatingDB) activityCreate(
return gtserror.Newf("could not convert asType %T to ActivityStreamsCreate", asType)
}
- for _, object := range ap.ExtractObjects(create) {
- // Try to get object as vocab.Type,
- // else skip handling (likely) IRI.
- objType := object.GetType()
- if objType == nil {
- continue
- }
+ var errs gtserror.MultiError
+
+ // Extract objects from create activity.
+ objects := ap.ExtractObjects(create)
+
+ // Extract Statusables from objects slice (this must be
+ // done AFTER extracting options due to how AS typing works).
+ statusables, objects := ap.ExtractStatusables(objects)
- if statusable, ok := ap.ToStatusable(objType); ok {
- return f.createStatusable(ctx, statusable, receivingAccount, requestingAccount)
+ for _, statusable := range statusables {
+ // Check if this is a forwarded object, i.e. did
+ // the account making the request also create this?
+ forwarded := !isSender(statusable, requestingAccount)
+
+ // Handle create event for this statusable.
+ if err := f.createStatusable(ctx,
+ receivingAccount,
+ requestingAccount,
+ statusable,
+ forwarded,
+ ); err != nil {
+ errs.Appendf("error creating statusable: %w", err)
}
+ }
- // TODO: handle CREATE of other types?
+ if len(objects) > 0 {
+ // Log any unhandled objects after filtering for debug purposes.
+ log.Debugf(ctx, "unhandled CREATE types: %v", typeNames(objects))
}
- return nil
+ return errs.Combine()
}
// createStatusable handles a Create activity for a Statusable.
@@ -161,88 +175,36 @@ func (f *federatingDB) activityCreate(
// the processor for further asynchronous processing.
func (f *federatingDB) createStatusable(
ctx context.Context,
+ receiver *gtsmodel.Account,
+ requester *gtsmodel.Account,
statusable ap.Statusable,
- receivingAccount *gtsmodel.Account,
- requestingAccount *gtsmodel.Account,
+ forwarded bool,
) error {
- // Statusable must have an attributedTo.
- attrToProp := statusable.GetActivityStreamsAttributedTo()
- if attrToProp == nil {
- return gtserror.Newf("statusable had no attributedTo")
- }
-
- // Statusable must have an ID.
- idProp := statusable.GetJSONLDId()
- if idProp == nil || !idProp.IsIRI() {
- return gtserror.Newf("statusable had no id, or id was not a URI")
- }
-
- statusableURI := idProp.GetIRI()
-
- // Check if we have a forward. In other words, was the
- // statusable posted to our inbox by at least one actor
- // who actually created it, or are they forwarding it?
- forward := true
- for iter := attrToProp.Begin(); iter != attrToProp.End(); iter = iter.Next() {
- actorURI, err := pub.ToId(iter)
- if err != nil {
- return gtserror.Newf("error extracting id from attributedTo entry: %w", err)
- }
-
- if requestingAccount.URI == actorURI.String() {
- // The actor who posted this statusable to our inbox is
- // (one of) its creator(s), so this is not a forward.
- forward = false
- break
- }
- }
-
- // Check if we already have a status entry
- // for this statusable, based on the ID/URI.
- statusableURIStr := statusableURI.String()
- status, err := f.state.DB.GetStatusByURI(ctx, statusableURIStr)
- if err != nil && !errors.Is(err, db.ErrNoEntries) {
- return gtserror.Newf("db error checking existence of status %s: %w", statusableURIStr, err)
- }
-
- if status != nil {
- // We already had this status in the db, no need for further action.
- log.Trace(ctx, "status already exists: %s", statusableURIStr)
- return nil
- }
-
// If we do have a forward, we should ignore the content
// and instead deref based on the URI of the statusable.
//
// In other words, don't automatically trust whoever sent
// this status to us, but fetch the authentic article from
// the server it originated from.
- if forward {
- // Pass the statusable URI (APIri) into the processor worker
- // and do the rest of the processing asynchronously.
+ if forwarded {
+ // Pass the statusable URI (APIri) into the processor
+ // worker and do the rest of the processing asynchronously.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
- APIri: statusableURI,
+ APIri: ap.GetJSONLDId(statusable),
APObjectModel: nil,
GTSModel: nil,
- ReceivingAccount: receivingAccount,
+ ReceivingAccount: receiver,
})
return nil
}
- // This is a non-forwarded status we can trust the requester on,
- // convert this provided statusable data to a useable gtsmodel status.
- status, err = f.converter.ASStatusToStatus(ctx, statusable)
- if err != nil {
- return gtserror.Newf("error converting statusable to status: %w", err)
- }
-
// Check whether we should accept this new status.
accept, err := f.shouldAcceptStatusable(ctx,
- receivingAccount,
- requestingAccount,
- status,
+ receiver,
+ requester,
+ statusable,
)
if err != nil {
return gtserror.Newf("error checking status acceptibility: %w", err)
@@ -258,65 +220,52 @@ func (f *federatingDB) createStatusable(
return nil
}
- // ID the new status based on the time it was created.
- status.ID, err = id.NewULIDFromTime(status.CreatedAt)
- if err != nil {
- return err
- }
-
- // Put this newly parsed status in the database.
- if err := f.state.DB.PutStatus(ctx, status); err != nil {
- if errors.Is(err, db.ErrAlreadyExists) {
- // The status already exists in the database, which
- // means we've already processed it and some race
- // condition means we didn't catch it yet. We can
- // just return nil here and be done with it.
- return nil
- }
- return gtserror.Newf("db error inserting status: %w", err)
- }
-
// Do the rest of the processing asynchronously. The processor
// will handle inserting/updating + further dereferencing the status.
f.state.Workers.EnqueueFediAPI(ctx, messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
APIri: nil,
+ GTSModel: nil,
APObjectModel: statusable,
- GTSModel: status,
- ReceivingAccount: receivingAccount,
+ ReceivingAccount: receiver,
})
return nil
}
-func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, status *gtsmodel.Status) (bool, error) {
+func (f *federatingDB) shouldAcceptStatusable(ctx context.Context, receiver *gtsmodel.Account, requester *gtsmodel.Account, statusable ap.Statusable) (bool, error) {
host := config.GetHost()
accountDomain := config.GetAccountDomain()
// Check whether status mentions the receiver,
// this is the quickest check so perform it first.
- // Prefer checking using mention Href, fall back to Name.
- for _, mention := range status.Mentions {
- targetURI := mention.TargetAccountURI
- nameString := mention.NameString
-
- if targetURI != "" {
- if targetURI == receiver.URI || targetURI == receiver.URL {
- // Target URI or URL match;
- // receiver is mentioned.
+ mentions, _ := ap.ExtractMentions(statusable)
+ for _, mention := range mentions {
+
+ // Extract placeholder mention vars.
+ accURI := mention.TargetAccountURI
+ name := mention.NameString
+
+ switch {
+ case accURI != "":
+ if accURI == receiver.URI ||
+ accURI == receiver.URL {
+ // Mention target is receiver,
+ // they are mentioned in status.
return true, nil
}
- } else if nameString != "" {
- username, domain, err := util.ExtractNamestringParts(nameString)
+
+ case accURI == "" && name != "":
+ // Only a name was provided, extract the user@domain parts.
+ user, domain, err := util.ExtractNamestringParts(name)
if err != nil {
- return false, gtserror.Newf("error checking if mentioned: %w", err)
+ return false, gtserror.Newf("error extracting mention name parts: %w", err)
}
- if (domain == host || domain == accountDomain) &&
- strings.EqualFold(username, receiver.Username) {
- // Username + domain match;
- // receiver is mentioned.
+ // Check if the name points to our receiving local user.
+ isLocal := (domain == host || domain == accountDomain)
+ if isLocal && strings.EqualFold(user, receiver.Username) {
return true, nil
}
}
diff --git a/internal/federation/federatingdb/create_test.go b/internal/federation/federatingdb/create_test.go
index 6c18f5bd0..a1f1a7e18 100644
--- a/internal/federation/federatingdb/create_test.go
+++ b/internal/federation/federatingdb/create_test.go
@@ -39,6 +39,8 @@ func (suite *CreateTestSuite) TestCreateNote() {
ctx := createTestContext(receivingAccount, requestingAccount)
create := suite.testActivities["dm_for_zork"].Activity
+ objProp := create.GetActivityStreamsObject()
+ note := objProp.At(0).GetType().(ap.Statusable)
err := suite.federatingDB.Create(ctx, create)
suite.NoError(err)
@@ -47,18 +49,7 @@ func (suite *CreateTestSuite) TestCreateNote() {
msg := <-suite.fromFederator
suite.Equal(ap.ObjectNote, msg.APObjectType)
suite.Equal(ap.ActivityCreate, msg.APActivityType)
-
- // shiny new status should be defined on the message
- suite.NotNil(msg.GTSModel)
- status := msg.GTSModel.(*gtsmodel.Status)
-
- // status should have some expected values
- suite.Equal(requestingAccount.ID, status.AccountID)
- suite.Equal("@the_mighty_zork@localhost:8080 hey zork here's a new private note for you", status.Content)
-
- // status should be in the database
- _, err = suite.db.GetStatusByID(context.Background(), status.ID)
- suite.NoError(err)
+ suite.Equal(note, msg.APObjectModel)
}
func (suite *CreateTestSuite) TestCreateNoteForward() {
@@ -78,7 +69,7 @@ func (suite *CreateTestSuite) TestCreateNoteForward() {
suite.Equal(ap.ActivityCreate, msg.APActivityType)
// nothing should be set as the model since this is a forward
- suite.Nil(msg.GTSModel)
+ suite.Nil(msg.APObjectModel)
// but we should have a uri set
suite.Equal("http://example.org/users/Some_User/statuses/afaba698-5740-4e32-a702-af61aa543bc1", msg.APIri.String())
diff --git a/internal/federation/federatingdb/db.go b/internal/federation/federatingdb/db.go
index 8e98dc2f2..75ef3a2a7 100644
--- a/internal/federation/federatingdb/db.go
+++ b/internal/federation/federatingdb/db.go
@@ -24,6 +24,7 @@ import (
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/visibility"
)
// DB wraps the pub.Database interface with a couple of custom functions for GoToSocial.
@@ -33,7 +34,6 @@ type DB interface {
Accept(ctx context.Context, accept vocab.ActivityStreamsAccept) error
Reject(ctx context.Context, reject vocab.ActivityStreamsReject) error
Announce(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error
- Question(ctx context.Context, question vocab.ActivityStreamsQuestion) error
}
// FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.
@@ -41,13 +41,15 @@ type DB interface {
type federatingDB struct {
state *state.State
converter *typeutils.Converter
+ filter *visibility.Filter
}
// New returns a DB interface using the given database and config
-func New(state *state.State, converter *typeutils.Converter) DB {
+func New(state *state.State, converter *typeutils.Converter, filter *visibility.Filter) DB {
fdb := federatingDB{
state: state,
converter: converter,
+ filter: filter,
}
return &fdb
}
diff --git a/internal/federation/federatingdb/question.go b/internal/federation/federatingdb/question.go
deleted file mode 100644
index 85226d9ed..000000000
--- a/internal/federation/federatingdb/question.go
+++ /dev/null
@@ -1,32 +0,0 @@
-// GoToSocial
-// Copyright (C) GoToSocial Authors admin@gotosocial.org
-// SPDX-License-Identifier: AGPL-3.0-or-later
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-package federatingdb
-
-import (
- "context"
-
- "github.com/superseriousbusiness/activity/streams/vocab"
-)
-
-func (f *federatingDB) Question(ctx context.Context, question vocab.ActivityStreamsQuestion) error {
- receivingAccount, requestingAccount, internal := extractFromCtx(ctx)
- if internal {
- return nil // Already processed.
- }
- return f.createStatusable(ctx, question, receivingAccount, requestingAccount)
-}
diff --git a/internal/federation/federatingdb/update.go b/internal/federation/federatingdb/update.go
index 5d3d4a0ff..26ea81f72 100644
--- a/internal/federation/federatingdb/update.go
+++ b/internal/federation/federatingdb/update.go
@@ -19,11 +19,13 @@ package federatingdb
import (
"context"
+ "errors"
"codeberg.org/gruf/go-logger/v2/level"
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
@@ -71,18 +73,21 @@ func (f *federatingDB) updateAccountable(ctx context.Context, receivingAcct *gts
// Extract AP URI of the updated Accountable model.
idProp := accountable.GetJSONLDId()
if idProp == nil || !idProp.IsIRI() {
- return gtserror.New("Accountable id prop was nil or not IRI")
+ return gtserror.New("invalid id prop")
}
- updatedAcctURI := idProp.GetIRI()
- // Don't try to update local accounts, it will break things.
- if updatedAcctURI.Host == config.GetHost() {
+ // Get the account URI string for checks
+ accountURI := idProp.GetIRI()
+ accountURIStr := accountURI.String()
+
+ // Don't try to update local accounts.
+ if accountURI.Host == config.GetHost() {
return nil
}
- // Ensure Accountable and requesting account are one and the same.
- if updatedAcctURIStr := updatedAcctURI.String(); requestingAcct.URI != updatedAcctURIStr {
- return gtserror.Newf("update for %s was requested by %s, this is not valid", updatedAcctURIStr, requestingAcct.URI)
+ // Check that update was by the account themselves.
+ if accountURIStr != requestingAcct.URI {
+ return gtserror.Newf("update for %s was not requested by owner", accountURIStr)
}
// Pass in to the processor the existing version of the requesting
@@ -117,15 +122,31 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm
return nil
}
+ // Check if this is a forwarded object, i.e. did
+ // the account making the request also create this?
+ forwarded := !isSender(statusable, requestingAcct)
+
// Get the status we have on file for this URI string.
status, err := f.state.DB.GetStatusByURI(ctx, statusURIStr)
- if err != nil {
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
return gtserror.Newf("error fetching status from db: %w", err)
}
- // Check that update was by the status author.
- if status.AccountID != requestingAcct.ID {
- return gtserror.Newf("update for %s was not requested by author", statusURIStr)
+ if status == nil {
+ // We haven't seen this status before, be
+ // lenient and handle as a CREATE event.
+ return f.createStatusable(ctx,
+ receivingAcct,
+ requestingAcct,
+ statusable,
+ forwarded,
+ )
+ }
+
+ if forwarded {
+ // For forwarded updates, set a nil AS
+ // status to force refresh from remote.
+ statusable = nil
}
// Queue an UPDATE NOTE activity to our fedi API worker,
@@ -134,7 +155,7 @@ func (f *federatingDB) updateStatusable(ctx context.Context, receivingAcct *gtsm
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityUpdate,
GTSModel: status, // original status
- APObjectModel: statusable,
+ APObjectModel: (ap.Statusable)(statusable),
ReceivingAccount: receivingAcct,
})
diff --git a/internal/federation/federatingdb/util.go b/internal/federation/federatingdb/util.go
index d46451e21..dd7a2240e 100644
--- a/internal/federation/federatingdb/util.go
+++ b/internal/federation/federatingdb/util.go
@@ -20,7 +20,6 @@ package federatingdb
import (
"context"
"encoding/json"
- "errors"
"fmt"
"net/url"
@@ -37,6 +36,30 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/uris"
)
+func typeNames(objects []ap.TypeOrIRI) []string {
+ names := make([]string, len(objects))
+ for i, object := range objects {
+ if object.IsIRI() {
+ names[i] = "IRI"
+ } else if t := object.GetType(); t != nil {
+ names[i] = t.GetTypeName()
+ } else {
+ names[i] = "nil"
+ }
+ }
+ return names
+}
+
+// isSender returns whether an object with AttributedTo property comes from the given requesting account.
+func isSender(with ap.WithAttributedTo, requester *gtsmodel.Account) bool {
+ for _, uri := range ap.GetAttributedTo(with) {
+ if uri.String() == requester.URI {
+ return true
+ }
+ }
+ return false
+}
+
func sameActor(actor1 vocab.ActivityStreamsActorProperty, actor2 vocab.ActivityStreamsActorProperty) bool {
if actor1 == nil || actor2 == nil {
return false
@@ -78,131 +101,31 @@ func (f *federatingDB) NewID(ctx context.Context, t vocab.Type) (idURL *url.URL,
l.Debug("entering NewID")
}
- switch t.GetTypeName() {
- case ap.ActivityFollow:
- // FOLLOW
- // ID might already be set on a follow we've created, so check it here and return it if it is
- follow, ok := t.(vocab.ActivityStreamsFollow)
- if !ok {
- return nil, errors.New("newid: follow couldn't be parsed into vocab.ActivityStreamsFollow")
- }
- idProp := follow.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
- }
- }
- // it's not set so create one based on the actor set on the follow (ie., the followER not the followEE)
- actorProp := follow.GetActivityStreamsActor()
- if actorProp != nil {
- for iter := actorProp.Begin(); iter != actorProp.End(); iter = iter.Next() {
- // take the IRI of the first actor we can find (there should only be one)
- if iter.IsIRI() {
- // if there's an error here, just use the fallback behavior -- we don't need to return an error here
- if actorAccount, err := f.state.DB.GetAccountByURI(ctx, iter.GetIRI().String()); err == nil {
- newID, err := id.NewRandomULID()
- if err != nil {
- return nil, err
- }
- return url.Parse(uris.GenerateURIForFollow(actorAccount.Username, newID))
- }
+ // Most of our types set an ID already
+ // by this point, return this if found.
+ idProp := t.GetJSONLDId()
+ if idProp != nil && idProp.IsIRI() {
+ return idProp.GetIRI(), nil
+ }
+
+ if t.GetTypeName() == ap.ActivityFollow {
+ follow, _ := t.(vocab.ActivityStreamsFollow)
+
+ // If an actor URI has been set, create a new ID
+ // based on actor (i.e. followER not the followEE).
+ if uri := ap.GetActor(follow); len(uri) == 1 {
+ if actorAccount, err := f.state.DB.GetAccountByURI(ctx, uri[0].String()); err == nil {
+ newID, err := id.NewRandomULID()
+ if err != nil {
+ return nil, err
}
- }
- }
- case ap.ObjectNote:
- // NOTE aka STATUS
- // ID might already be set on a note we've created, so check it here and return it if it is
- note, ok := t.(vocab.ActivityStreamsNote)
- if !ok {
- return nil, errors.New("newid: note couldn't be parsed into vocab.ActivityStreamsNote")
- }
- idProp := note.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
- }
- }
- case ap.ActivityLike:
- // LIKE aka FAVE
- // ID might already be set on a fave we've created, so check it here and return it if it is
- fave, ok := t.(vocab.ActivityStreamsLike)
- if !ok {
- return nil, errors.New("newid: fave couldn't be parsed into vocab.ActivityStreamsLike")
- }
- idProp := fave.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
- }
- }
- case ap.ActivityCreate:
- // CREATE
- // ID might already be set on a Create, so check it here and return it if it is
- create, ok := t.(vocab.ActivityStreamsCreate)
- if !ok {
- return nil, errors.New("newid: create couldn't be parsed into vocab.ActivityStreamsCreate")
- }
- idProp := create.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
- }
- }
- case ap.ActivityAnnounce:
- // ANNOUNCE aka BOOST
- // ID might already be set on an announce we've created, so check it here and return it if it is
- announce, ok := t.(vocab.ActivityStreamsAnnounce)
- if !ok {
- return nil, errors.New("newid: announce couldn't be parsed into vocab.ActivityStreamsAnnounce")
- }
- idProp := announce.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
- }
- }
- case ap.ActivityUpdate:
- // UPDATE
- // ID might already be set on an update we've created, so check it here and return it if it is
- update, ok := t.(vocab.ActivityStreamsUpdate)
- if !ok {
- return nil, errors.New("newid: update couldn't be parsed into vocab.ActivityStreamsUpdate")
- }
- idProp := update.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
- }
- }
- case ap.ActivityBlock:
- // BLOCK
- // ID might already be set on a block we've created, so check it here and return it if it is
- block, ok := t.(vocab.ActivityStreamsBlock)
- if !ok {
- return nil, errors.New("newid: block couldn't be parsed into vocab.ActivityStreamsBlock")
- }
- idProp := block.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
- }
- }
- case ap.ActivityUndo:
- // UNDO
- // ID might already be set on an undo we've created, so check it here and return it if it is
- undo, ok := t.(vocab.ActivityStreamsUndo)
- if !ok {
- return nil, errors.New("newid: undo couldn't be parsed into vocab.ActivityStreamsUndo")
- }
- idProp := undo.GetJSONLDId()
- if idProp != nil {
- if idProp.IsIRI() {
- return idProp.GetIRI(), nil
+ return url.Parse(uris.GenerateURIForFollow(actorAccount.Username, newID))
}
}
}
- // fallback default behavior: just return a random ULID after our protocol and host
+ // Default fallback behaviour:
+ // {proto}://{host}/{randomID}
newID, err := id.NewRandomULID()
if err != nil {
return nil, err
diff --git a/internal/federation/federatingprotocol.go b/internal/federation/federatingprotocol.go
index 28dc145af..5a913dbbe 100644
--- a/internal/federation/federatingprotocol.go
+++ b/internal/federation/federatingprotocol.go
@@ -522,9 +522,6 @@ func (f *Federator) FederatingCallbacks(ctx context.Context) (wrapped pub.Federa
func(ctx context.Context, announce vocab.ActivityStreamsAnnounce) error {
return f.FederatingDB().Announce(ctx, announce)
},
- func(ctx context.Context, question vocab.ActivityStreamsQuestion) error {
- return f.FederatingDB().Question(ctx, question)
- },
}
return