summaryrefslogtreecommitdiff
path: root/internal/federation/dereferencing/thread.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/federation/dereferencing/thread.go')
-rw-r--r--internal/federation/dereferencing/thread.go251
1 files changed, 148 insertions, 103 deletions
diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go
index ad677b228..d35627ff1 100644
--- a/internal/federation/dereferencing/thread.go
+++ b/internal/federation/dereferencing/thread.go
@@ -56,9 +56,20 @@ func (d *deref) DereferenceStatusAncestors(
username string,
status *gtsmodel.Status,
) error {
+ // Start log entry with fields
+ l := log.WithContext(ctx).
+ WithFields(kv.Fields{
+ {"username", username},
+ {"original", status.URI},
+ }...)
+
+ // Keep track of already dereferenced statuses
+ // for this ancestor thread to prevent recursion.
+ derefdStatuses := make(map[string]struct{}, 10)
+
// Mark given status as the one
// we're currently working on.
- var current = status
+ current := status
for i := 0; i < maxIter; i++ {
if current.InReplyToURI == "" {
@@ -67,14 +78,21 @@ func (d *deref) DereferenceStatusAncestors(
return nil
}
- l := log.
- WithContext(ctx).
- WithFields(kv.Fields{
- {"username", username},
- {"originalStatusIRI", status.URI},
- {"currentStatusURI", current.URI},
- {"currentInReplyToURI", current.InReplyToURI},
- }...)
+ // Add new log fields for this iteration.
+ l = l.WithFields(kv.Fields{
+ {"current", current.URI},
+ {"parent", current.InReplyToURI},
+ }...)
+ l.Trace("following status ancestors")
+
+ // Check whether this parent has already been deref'd.
+ if _, ok := derefdStatuses[current.InReplyToURI]; ok {
+ l.Warn("self referencing status ancestors")
+ return nil
+ }
+
+ // Add this status URI to map of deref'd.
+ derefdStatuses[current.URI] = struct{}{}
if current.InReplyToID != "" {
// We already have an InReplyToID set. This means
@@ -123,7 +141,7 @@ func (d *deref) DereferenceStatusAncestors(
// by another action.
//
// TODO: clean this up in a nightly task.
- l.Warnf("current status has been orphaned (parent %s no longer exists in database)", current.InReplyToID)
+ l.Warn("orphaned status (parent no longer exists)")
return nil // Cannot iterate further.
}
@@ -134,16 +152,15 @@ func (d *deref) DereferenceStatusAncestors(
inReplyToURI, err := url.Parse(current.InReplyToURI)
if err != nil || inReplyToURI == nil {
// Parent URI is not something we can handle.
- l.Debug("current status has been orphaned (invalid InReplyToURI)")
+ l.Warn("orphaned status (invalid InReplyToURI)")
return nil //nolint:nilerr
}
// Parent URI is valid, try to get it.
// getStatusByURI guards against the following conditions:
- //
+ // - refetching recently fetched statuses (recursion!)
// - remote domain is blocked (will return unretrievable)
- // - domain is local (will try to return something, or
- // return unretrievable).
+ // - any http type error for a new status returns unretrievable
parent, _, err := d.getStatusByURI(ctx, username, inReplyToURI)
if err == nil {
// We successfully fetched the parent.
@@ -171,7 +188,7 @@ func (d *deref) DereferenceStatusAncestors(
case code == http.StatusGone:
// 410 means the status has definitely been deleted.
// Update this status to reflect that, then bail.
- l.Debug("current status has been orphaned (call to parent returned code 410 Gone)")
+ l.Debug("orphaned status: parent returned 410 Gone")
current.InReplyToURI = ""
if err := d.state.DB.UpdateStatus(
@@ -180,19 +197,19 @@ func (d *deref) DereferenceStatusAncestors(
); err != nil {
return gtserror.Newf("db error updating status %s: %w", current.ID, err)
}
+
return nil
case code != 0:
- // We had a code, but not one indicating deletion,
- // log the code but don't return error or update the
- // status; we can try again later.
- l.Warnf("cannot dereference parent (%q)", err)
+ // We had a code, but not one indicating deletion, log the code
+ // but don't return error or update the status; we can try again later.
+ l.Warnf("orphaned status: http error dereferencing parent: %v)", err)
return nil
case gtserror.Unretrievable(err):
// Not retrievable for some other reason, so just
- // bail; we can try again later if necessary.
- l.Debugf("parent unretrievable (%q)", err)
+ // bail for now; we can try again later if necessary.
+ l.Warnf("orphaned status: parent unretrievable: %v)", err)
return nil
default:
@@ -205,35 +222,44 @@ func (d *deref) DereferenceStatusAncestors(
}
func (d *deref) DereferenceStatusDescendants(ctx context.Context, username string, statusIRI *url.URL, parent ap.Statusable) error {
- // Take ref to original
- ogIRI := statusIRI
+ statusIRIStr := statusIRI.String()
// Start log entry with fields
l := log.WithContext(ctx).
WithFields(kv.Fields{
{"username", username},
- {"statusIRI", ogIRI},
+ {"status", statusIRIStr},
}...)
// Log function start
l.Trace("beginning")
- // frame represents a single stack frame when iteratively
- // dereferencing status descendants. where statusIRI and
- // statusable are of the status whose children we are to
- // descend, page is the current activity streams collection
- // page of entities we are on (as we often push a frame to
- // stack mid-paging), and item___ are entity iterators for
- // this activity streams collection page.
+ // OUR instance hostname.
+ localhost := config.GetHost()
+
+ // Keep track of already dereferenced collection
+ // pages for this thread to prevent recursion.
+ derefdPages := make(map[string]struct{}, 10)
+
+ // frame represents a single stack frame when
+ // iteratively derefencing status descendants.
type frame struct {
- statusIRI *url.URL
- statusable ap.Statusable
- page ap.CollectionPageable
- itemIter vocab.ActivityStreamsItemsPropertyIterator
+ // page is the current activity streams
+ // collection page we are on (as we often
+ // push a frame to stack mid-paging).
+ page ap.CollectionPageable
+
+ // pageURI is the URI string of
+ // the frame's collection page
+ // (is useful for logging).
+ pageURI string
+
+ // items is the entity iterator for frame's page.
+ items vocab.ActivityStreamsItemsPropertyIterator
}
var (
- // current is the current stack frame
+ // current stack frame
current *frame
// stack is a list of "shelved" descendand iterator
@@ -242,11 +268,14 @@ func (d *deref) DereferenceStatusDescendants(ctx context.Context, username strin
// popped from into 'current' when that child's tree
// of further descendants is exhausted.
stack = []*frame{
- {
- // Starting input is first frame
- statusIRI: statusIRI,
- statusable: parent,
- },
+ func() *frame {
+ // Start input frame is built from the first input.
+ page, pageURI := getAttachedStatusCollection(parent)
+ if page == nil {
+ return nil
+ }
+ return &frame{page: page, pageURI: pageURI}
+ }(),
}
// popStack will remove and return the top frame
@@ -274,42 +303,9 @@ stackLoop:
return nil
}
- if current.page == nil {
- if current.statusIRI.Host == config.GetHost() {
- // This is a local status, no looping to do
- continue stackLoop
- }
-
- l.Tracef("following remote status descendants: %s", current.statusIRI)
-
- // Look for an attached status replies (as collection)
- replies := current.statusable.GetActivityStreamsReplies()
- if replies == nil {
- continue stackLoop
- }
-
- // Get the status replies collection
- collection := replies.GetActivityStreamsCollection()
- if collection == nil {
- continue stackLoop
- }
-
- // Get the "first" property of the replies collection
- first := collection.GetActivityStreamsFirst()
- if first == nil {
- continue stackLoop
- }
-
- // Set the first activity stream collection page
- current.page = first.GetActivityStreamsCollectionPage()
- if current.page == nil {
- continue stackLoop
- }
- }
-
pageLoop:
for {
- if current.itemIter == nil {
+ if current.items == nil {
// Get the items associated with this page
items := current.page.GetActivityStreamsItems()
if items == nil {
@@ -317,21 +313,23 @@ stackLoop:
}
// Start off the item iterator
- current.itemIter = items.Begin()
+ current.items = items.Begin()
}
+ l.Tracef("following collection page: %s", current.pageURI)
+
itemLoop:
for {
// Check for remaining iter
- if current.itemIter == nil {
+ if current.items == nil {
break itemLoop
}
// Get current item iterator
- itemIter := current.itemIter
+ itemIter := current.items
// Set the next available iterator
- current.itemIter = itemIter.Next()
+ current.items = itemIter.Next()
// Check for available IRI on item
itemIRI, _ := pub.ToId(itemIter)
@@ -339,76 +337,123 @@ stackLoop:
continue itemLoop
}
- if itemIRI.Host == config.GetHost() {
+ if itemIRI.Host == localhost {
// This child is one of ours,
continue itemLoop
}
// Dereference the remote status and store in the database.
// getStatusByURI guards against the following conditions:
- //
+ // - refetching recently fetched statuses (recursion!)
// - remote domain is blocked (will return unretrievable)
- // - domain is local (will try to return something, or
- // return unretrievable).
+ // - any http type error for a new status returns unretrievable
_, statusable, err := d.getStatusByURI(ctx, username, itemIRI)
if err != nil {
if !gtserror.Unretrievable(err) {
l.Errorf("error dereferencing remote status %s: %v", itemIRI, err)
}
-
continue itemLoop
}
if statusable == nil {
- // Already up-to-date.
+ // A nil statusable return from
+ // getStatusByURI() indicates a
+ // remote status that was already
+ // dereferenced recently (so no
+ // need to go through descendents).
+ continue itemLoop
+ }
+
+ // Extract any attached collection + URI from status.
+ page, pageURI := getAttachedStatusCollection(statusable)
+ if page == nil {
continue itemLoop
}
// Put current and next frame at top of stack
stack = append(stack, current, &frame{
- statusIRI: itemIRI,
- statusable: statusable,
+ pageURI: pageURI,
+ page: page,
})
// Now start at top of loop
continue stackLoop
}
- // Get the current page's "next" property
+ // Get the current page's "next" property.
pageNext := current.page.GetActivityStreamsNext()
if pageNext == nil || !pageNext.IsIRI() {
continue stackLoop
}
// Get the IRI of the "next" property.
- pageNextIRI := pageNext.GetIRI()
-
- // Ensure this isn't a self-referencing page...
- // We don't need to store / check against a map of IRIs
- // as our getStatusByIRI() function above prevents iter'ing
- // over statuses that have been dereferenced recently, due to
- // the `fetched_at` field preventing frequent refetches.
- if id := current.page.GetJSONLDId(); id != nil &&
- pageNextIRI.String() == id.Get().String() {
- log.Warnf(ctx, "self referencing collection page: %s", pageNextIRI)
+ pageNextURI := pageNext.GetIRI()
+ pageNextURIStr := pageNextURI.String()
+
+ // Check whether this page has already been deref'd.
+ if _, ok := derefdPages[pageNextURIStr]; ok {
+ l.Warnf("self referencing collection page(s): %s", pageNextURIStr)
continue stackLoop
}
- // Dereference this next collection page by its IRI
+ // Mark this collection page as deref'd.
+ derefdPages[pageNextURIStr] = struct{}{}
+
+ // Dereference this next collection page by its IRI.
collectionPage, err := d.dereferenceCollectionPage(ctx,
username,
- pageNextIRI,
+ pageNextURI,
)
if err != nil {
- l.Errorf("error dereferencing remote collection page %q: %s", pageNextIRI.String(), err)
+ l.Errorf("error dereferencing collection page %q: %s", pageNextURIStr, err)
continue stackLoop
}
- // Set the updated collection page
+ // Set the next collection page.
current.page = collectionPage
+ current.pageURI = pageNextURIStr
continue pageLoop
}
}
- return gtserror.Newf("reached %d descendant iterations for %q", maxIter, ogIRI.String())
+ return gtserror.Newf("reached %d descendant iterations for %q", maxIter, statusIRIStr)
+}
+
+// getAttachedStatusCollection is a small utility function to fetch the first page
+// of an attached activity streams collection from a provided statusable object .
+func getAttachedStatusCollection(status ap.Statusable) (page ap.CollectionPageable, uri string) { //nolint:gocritic
+ // Look for an attached status replies (as collection)
+ replies := status.GetActivityStreamsReplies()
+ if replies == nil {
+ return nil, ""
+ }
+
+ // Get the status replies collection
+ collection := replies.GetActivityStreamsCollection()
+ if collection == nil {
+ return nil, ""
+ }
+
+ // Get the "first" property of the replies collection
+ first := collection.GetActivityStreamsFirst()
+ if first == nil {
+ return nil, ""
+ }
+
+ // Return the first activity stream collection page
+ page = first.GetActivityStreamsCollectionPage()
+ if page == nil {
+ return nil, ""
+ }
+
+ if pageID := page.GetJSONLDId(); pageID != nil {
+ // By default use collection JSONLD ID
+ return page, pageID.Get().String()
+ } else if statusID := status.GetJSONLDId(); statusID != nil {
+ // Else, if possible use status JSONLD ID
+ return page, statusID.Get().String()
+ } else {
+ // MUST have some kind of ID
+ return nil, ""
+ }
}