summaryrefslogtreecommitdiff
path: root/internal/federation/dereferencing/thread.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/federation/dereferencing/thread.go')
-rw-r--r--internal/federation/dereferencing/thread.go47
1 files changed, 37 insertions, 10 deletions
diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go
index 5753ce4dd..0ad8f09e4 100644
--- a/internal/federation/dereferencing/thread.go
+++ b/internal/federation/dereferencing/thread.go
@@ -38,15 +38,42 @@ import (
// ancesters we are willing to follow before returning error.
const maxIter = 1000
-func (d *Dereferencer) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) {
- // Ensure that ancestors have been fully dereferenced
- if err := d.DereferenceStatusAncestors(ctx, username, status); err != nil {
- log.Error(ctx, err)
- }
+// dereferenceThread handles dereferencing status thread after
+// fetch. Passing off appropriate parts to be enqueued for async
+// processing, or handling some parts synchronously when required.
+func (d *Dereferencer) dereferenceThread(
+ ctx context.Context,
+ requestUser string,
+ uri *url.URL,
+ status *gtsmodel.Status,
+ statusable ap.Statusable,
+ isNew bool,
+) {
+ if isNew {
+ // This is a new status that we need the ancestors of in
+ // order to determine visibility. Perform the initial part
+ // of thread dereferencing, i.e. parents, synchronously.
+ err := d.DereferenceStatusAncestors(ctx, requestUser, status)
+ if err != nil {
+ log.Error(ctx, err)
+ }
- // Ensure that descendants have been fully dereferenced
- if err := d.DereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil {
- log.Error(ctx, err)
+ // Enqueue dereferencing remaining status thread, (children), asychronously .
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {
+ log.Error(ctx, err)
+ }
+ })
+ } else {
+ // This is an existing status, dereference the WHOLE thread asynchronously.
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil {
+ log.Error(ctx, err)
+ }
+ if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil {
+ log.Error(ctx, err)
+ }
+ })
}
}
@@ -157,7 +184,7 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username
// - refetching recently fetched statuses (recursion!)
// - remote domain is blocked (will return unretrievable)
// - any http type error for a new status returns unretrievable
- parent, _, err := d.getStatusByURI(ctx, username, inReplyToURI)
+ parent, _, _, err := d.getStatusByURI(ctx, username, inReplyToURI)
if err == nil {
// We successfully fetched the parent.
// Update current status with new info.
@@ -325,7 +352,7 @@ stackLoop:
// - refetching recently fetched statuses (recursion!)
// - remote domain is blocked (will return unretrievable)
// - any http type error for a new status returns unretrievable
- _, statusable, err := d.getStatusByURI(ctx, username, itemIRI)
+ _, statusable, _, err := d.getStatusByURI(ctx, username, itemIRI)
if err != nil {
if !gtserror.Unretrievable(err) {
l.Errorf("error dereferencing remote status %s: %v", itemIRI, err)