diff options
Diffstat (limited to 'internal/federation/dereferencing')
| -rw-r--r-- | internal/federation/dereferencing/thread.go | 251 | 
1 files changed, 148 insertions, 103 deletions
| diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index ad677b228..d35627ff1 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -56,9 +56,20 @@ func (d *deref) DereferenceStatusAncestors(  	username string,  	status *gtsmodel.Status,  ) error { +	// Start log entry with fields +	l := log.WithContext(ctx). +		WithFields(kv.Fields{ +			{"username", username}, +			{"original", status.URI}, +		}...) + +	// Keep track of already dereferenced statuses +	// for this ancestor thread to prevent recursion. +	derefdStatuses := make(map[string]struct{}, 10) +  	// Mark given status as the one  	// we're currently working on. -	var current = status +	current := status  	for i := 0; i < maxIter; i++ {  		if current.InReplyToURI == "" { @@ -67,14 +78,21 @@ func (d *deref) DereferenceStatusAncestors(  			return nil  		} -		l := log. -			WithContext(ctx). -			WithFields(kv.Fields{ -				{"username", username}, -				{"originalStatusIRI", status.URI}, -				{"currentStatusURI", current.URI}, -				{"currentInReplyToURI", current.InReplyToURI}, -			}...) +		// Add new log fields for this iteration. +		l = l.WithFields(kv.Fields{ +			{"current", current.URI}, +			{"parent", current.InReplyToURI}, +		}...) +		l.Trace("following status ancestors") + +		// Check whether this parent has already been deref'd. +		if _, ok := derefdStatuses[current.InReplyToURI]; ok { +			l.Warn("self referencing status ancestors") +			return nil +		} + +		// Add this status URI to map of deref'd. +		derefdStatuses[current.URI] = struct{}{}  		if current.InReplyToID != "" {  			// We already have an InReplyToID set. This means @@ -123,7 +141,7 @@ func (d *deref) DereferenceStatusAncestors(  			// by another action.  			//  			// TODO: clean this up in a nightly task. -			l.Warnf("current status has been orphaned (parent %s no longer exists in database)", current.InReplyToID) +			l.Warn("orphaned status (parent no longer exists)")  			return nil // Cannot iterate further.  		} @@ -134,16 +152,15 @@ func (d *deref) DereferenceStatusAncestors(  		inReplyToURI, err := url.Parse(current.InReplyToURI)  		if err != nil || inReplyToURI == nil {  			// Parent URI is not something we can handle. -			l.Debug("current status has been orphaned (invalid InReplyToURI)") +			l.Warn("orphaned status (invalid InReplyToURI)")  			return nil //nolint:nilerr  		}  		// Parent URI is valid, try to get it.  		// getStatusByURI guards against the following conditions: -		// +		//   - refetching recently fetched statuses (recursion!)  		//   - remote domain is blocked (will return unretrievable) -		//   - domain is local (will try to return something, or -		//     return unretrievable). +		//   - any http type error for a new status returns unretrievable  		parent, _, err := d.getStatusByURI(ctx, username, inReplyToURI)  		if err == nil {  			// We successfully fetched the parent. @@ -171,7 +188,7 @@ func (d *deref) DereferenceStatusAncestors(  		case code == http.StatusGone:  			// 410 means the status has definitely been deleted.  			// Update this status to reflect that, then bail. -			l.Debug("current status has been orphaned (call to parent returned code 410 Gone)") +			l.Debug("orphaned status: parent returned 410 Gone")  			current.InReplyToURI = ""  			if err := d.state.DB.UpdateStatus( @@ -180,19 +197,19 @@ func (d *deref) DereferenceStatusAncestors(  			); err != nil {  				return gtserror.Newf("db error updating status %s: %w", current.ID, err)  			} +  			return nil  		case code != 0: -			// We had a code, but not one indicating deletion, -			// log the code but don't return error or update the -			// status; we can try again later. -			l.Warnf("cannot dereference parent (%q)", err) +			// We had a code, but not one indicating deletion, log the code +			// but don't return error or update the status; we can try again later. +			l.Warnf("orphaned status: http error dereferencing parent: %v)", err)  			return nil  		case gtserror.Unretrievable(err):  			// Not retrievable for some other reason, so just -			// bail; we can try again later if necessary. -			l.Debugf("parent unretrievable (%q)", err) +			// bail for now; we can try again later if necessary. +			l.Warnf("orphaned status: parent unretrievable: %v)", err)  			return nil  		default: @@ -205,35 +222,44 @@ func (d *deref) DereferenceStatusAncestors(  }  func (d *deref) DereferenceStatusDescendants(ctx context.Context, username string, statusIRI *url.URL, parent ap.Statusable) error { -	// Take ref to original -	ogIRI := statusIRI +	statusIRIStr := statusIRI.String()  	// Start log entry with fields  	l := log.WithContext(ctx).  		WithFields(kv.Fields{  			{"username", username}, -			{"statusIRI", ogIRI}, +			{"status", statusIRIStr},  		}...)  	// Log function start  	l.Trace("beginning") -	// frame represents a single stack frame when iteratively -	// dereferencing status descendants. where statusIRI and -	// statusable are of the status whose children we are to -	// descend, page is the current activity streams collection -	// page of entities we are on (as we often push a frame to -	// stack mid-paging), and item___ are entity iterators for -	// this activity streams collection page. +	// OUR instance hostname. +	localhost := config.GetHost() + +	// Keep track of already dereferenced collection +	// pages for this thread to prevent recursion. +	derefdPages := make(map[string]struct{}, 10) + +	// frame represents a single stack frame when +	// iteratively derefencing status descendants.  	type frame struct { -		statusIRI  *url.URL -		statusable ap.Statusable -		page       ap.CollectionPageable -		itemIter   vocab.ActivityStreamsItemsPropertyIterator +		// page is the current activity streams +		// collection page we are on (as we often +		// push a frame to stack mid-paging). +		page ap.CollectionPageable + +		// pageURI is the URI string of +		// the frame's collection page +		// (is useful for logging). +		pageURI string + +		// items is the entity iterator for frame's page. +		items vocab.ActivityStreamsItemsPropertyIterator  	}  	var ( -		// current is the current stack frame +		// current stack frame  		current *frame  		// stack is a list of "shelved" descendand iterator @@ -242,11 +268,14 @@ func (d *deref) DereferenceStatusDescendants(ctx context.Context, username strin  		// popped from into 'current' when that child's tree  		// of further descendants is exhausted.  		stack = []*frame{ -			{ -				// Starting input is first frame -				statusIRI:  statusIRI, -				statusable: parent, -			}, +			func() *frame { +				// Start input frame is built from the first input. +				page, pageURI := getAttachedStatusCollection(parent) +				if page == nil { +					return nil +				} +				return &frame{page: page, pageURI: pageURI} +			}(),  		}  		// popStack will remove and return the top frame @@ -274,42 +303,9 @@ stackLoop:  			return nil  		} -		if current.page == nil { -			if current.statusIRI.Host == config.GetHost() { -				// This is a local status, no looping to do -				continue stackLoop -			} - -			l.Tracef("following remote status descendants: %s", current.statusIRI) - -			// Look for an attached status replies (as collection) -			replies := current.statusable.GetActivityStreamsReplies() -			if replies == nil { -				continue stackLoop -			} - -			// Get the status replies collection -			collection := replies.GetActivityStreamsCollection() -			if collection == nil { -				continue stackLoop -			} - -			// Get the "first" property of the replies collection -			first := collection.GetActivityStreamsFirst() -			if first == nil { -				continue stackLoop -			} - -			// Set the first activity stream collection page -			current.page = first.GetActivityStreamsCollectionPage() -			if current.page == nil { -				continue stackLoop -			} -		} -  	pageLoop:  		for { -			if current.itemIter == nil { +			if current.items == nil {  				// Get the items associated with this page  				items := current.page.GetActivityStreamsItems()  				if items == nil { @@ -317,21 +313,23 @@ stackLoop:  				}  				// Start off the item iterator -				current.itemIter = items.Begin() +				current.items = items.Begin()  			} +			l.Tracef("following collection page: %s", current.pageURI) +  		itemLoop:  			for {  				// Check for remaining iter -				if current.itemIter == nil { +				if current.items == nil {  					break itemLoop  				}  				// Get current item iterator -				itemIter := current.itemIter +				itemIter := current.items  				// Set the next available iterator -				current.itemIter = itemIter.Next() +				current.items = itemIter.Next()  				// Check for available IRI on item  				itemIRI, _ := pub.ToId(itemIter) @@ -339,76 +337,123 @@ stackLoop:  					continue itemLoop  				} -				if itemIRI.Host == config.GetHost() { +				if itemIRI.Host == localhost {  					// This child is one of ours,  					continue itemLoop  				}  				// Dereference the remote status and store in the database.  				// getStatusByURI guards against the following conditions: -				// +				//   - refetching recently fetched statuses (recursion!)  				//   - remote domain is blocked (will return unretrievable) -				//   - domain is local (will try to return something, or -				//     return unretrievable). +				//   - any http type error for a new status returns unretrievable  				_, statusable, err := d.getStatusByURI(ctx, username, itemIRI)  				if err != nil {  					if !gtserror.Unretrievable(err) {  						l.Errorf("error dereferencing remote status %s: %v", itemIRI, err)  					} -  					continue itemLoop  				}  				if statusable == nil { -					// Already up-to-date. +					// A nil statusable return from +					// getStatusByURI() indicates a +					// remote status that was already +					// dereferenced recently (so no +					// need to go through descendents). +					continue itemLoop +				} + +				// Extract any attached collection + URI from status. +				page, pageURI := getAttachedStatusCollection(statusable) +				if page == nil {  					continue itemLoop  				}  				// Put current and next frame at top of stack  				stack = append(stack, current, &frame{ -					statusIRI:  itemIRI, -					statusable: statusable, +					pageURI: pageURI, +					page:    page,  				})  				// Now start at top of loop  				continue stackLoop  			} -			// Get the current page's "next" property +			// Get the current page's "next" property.  			pageNext := current.page.GetActivityStreamsNext()  			if pageNext == nil || !pageNext.IsIRI() {  				continue stackLoop  			}  			// Get the IRI of the "next" property. -			pageNextIRI := pageNext.GetIRI() - -			// Ensure this isn't a self-referencing page... -			// We don't need to store / check against a map of IRIs -			// as our getStatusByIRI() function above prevents iter'ing -			// over statuses that have been dereferenced recently, due to -			// the `fetched_at` field preventing frequent refetches. -			if id := current.page.GetJSONLDId(); id != nil && -				pageNextIRI.String() == id.Get().String() { -				log.Warnf(ctx, "self referencing collection page: %s", pageNextIRI) +			pageNextURI := pageNext.GetIRI() +			pageNextURIStr := pageNextURI.String() + +			// Check whether this page has already been deref'd. +			if _, ok := derefdPages[pageNextURIStr]; ok { +				l.Warnf("self referencing collection page(s): %s", pageNextURIStr)  				continue stackLoop  			} -			// Dereference this next collection page by its IRI +			// Mark this collection page as deref'd. +			derefdPages[pageNextURIStr] = struct{}{} + +			// Dereference this next collection page by its IRI.  			collectionPage, err := d.dereferenceCollectionPage(ctx,  				username, -				pageNextIRI, +				pageNextURI,  			)  			if err != nil { -				l.Errorf("error dereferencing remote collection page %q: %s", pageNextIRI.String(), err) +				l.Errorf("error dereferencing collection page %q: %s", pageNextURIStr, err)  				continue stackLoop  			} -			// Set the updated collection page +			// Set the next collection page.  			current.page = collectionPage +			current.pageURI = pageNextURIStr  			continue pageLoop  		}  	} -	return gtserror.Newf("reached %d descendant iterations for %q", maxIter, ogIRI.String()) +	return gtserror.Newf("reached %d descendant iterations for %q", maxIter, statusIRIStr) +} + +// getAttachedStatusCollection is a small utility function to fetch the first page +// of an attached activity streams collection from a provided statusable object . +func getAttachedStatusCollection(status ap.Statusable) (page ap.CollectionPageable, uri string) { //nolint:gocritic +	// Look for an attached status replies (as collection) +	replies := status.GetActivityStreamsReplies() +	if replies == nil { +		return nil, "" +	} + +	// Get the status replies collection +	collection := replies.GetActivityStreamsCollection() +	if collection == nil { +		return nil, "" +	} + +	// Get the "first" property of the replies collection +	first := collection.GetActivityStreamsFirst() +	if first == nil { +		return nil, "" +	} + +	// Return the first activity stream collection page +	page = first.GetActivityStreamsCollectionPage() +	if page == nil { +		return nil, "" +	} + +	if pageID := page.GetJSONLDId(); pageID != nil { +		// By default use collection JSONLD ID +		return page, pageID.Get().String() +	} else if statusID := status.GetJSONLDId(); statusID != nil { +		// Else, if possible use status JSONLD ID +		return page, statusID.Get().String() +	} else { +		// MUST have some kind of ID +		return nil, "" +	}  } | 
