diff options
Diffstat (limited to 'internal/federation/dereferencing')
-rw-r--r-- | internal/federation/dereferencing/account.go | 2 | ||||
-rw-r--r-- | internal/federation/dereferencing/status.go | 121 | ||||
-rw-r--r-- | internal/federation/dereferencing/thread.go | 47 |
3 files changed, 115 insertions, 55 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 58f07f9cd..a4e74de3c 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -945,7 +945,7 @@ func (d *Dereferencer) dereferenceAccountFeatured(ctx context.Context, requestUs // we still know it was *meant* to be pinned. statusURIs = append(statusURIs, statusURI) - status, _, err := d.getStatusByURI(ctx, requestUser, statusURI) + status, _, _, err := d.getStatusByURI(ctx, requestUser, statusURI) if err != nil { // We couldn't get the status, bummer. Just log + move on, we can try later. log.Errorf(ctx, "error getting status from featured collection %s: %v", statusURI, err) diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index 712692814..4dd6d3baf 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -22,9 +22,8 @@ import ( "errors" "io" "net/url" - "time" - "slices" + "time" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -55,12 +54,12 @@ func statusUpToDate(status *gtsmodel.Status) bool { return false } -// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model -// whose last_fetched date is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information -// may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the status thread. An ActivityPub object indicates the status was dereferenced. +// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model whose 'last_fetched' date +// is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, +// e.g. dereferencing the status thread. Param 'syncParent' = true indicates to fetch status ancestors synchronously. An ActivityPub object indicates the status was dereferenced. func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { // Fetch and dereference status if necessary. - status, apubStatus, err := d.getStatusByURI(ctx, + status, statusable, isNew, err := d.getStatusByURI(ctx, requestUser, uri, ) @@ -68,18 +67,22 @@ func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, u return nil, nil, err } - if apubStatus != nil { - // This status was updated, enqueue re-dereferencing the whole thread. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - d.dereferenceThread(ctx, requestUser, uri, status, apubStatus) - }) + if statusable != nil { + // Deref parents + children. + d.dereferenceThread(ctx, + requestUser, + uri, + status, + statusable, + isNew, + ) } - return status, apubStatus, nil + return status, statusable, nil } // getStatusByURI is a package internal form of .GetStatusByURI() that doesn't bother dereferencing the whole thread on update. -func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { +func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, bool, error) { var ( status *gtsmodel.Status uriStr = uri.String() @@ -94,7 +97,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u uriStr, ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, nil, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err) + return nil, nil, false, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err) } if status == nil { @@ -104,14 +107,14 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u uriStr, ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, nil, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err) + return nil, nil, false, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err) } } if status == nil { // Ensure that this isn't a search for a local status. if uri.Host == config.GetHost() || uri.Host == config.GetAccountDomain() { - return nil, nil, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries + return nil, nil, false, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries } // Create and pass-through a new bare-bones model for deref. @@ -127,11 +130,11 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err := d.state.DB.PopulateStatus(ctx, status); err != nil { log.Errorf(ctx, "error populating existing status: %v", err) } - return status, nil, nil + return status, nil, false, nil } // Try to update + deref existing status model. - latest, apubStatus, err := d.enrichStatusSafely(ctx, + latest, statusable, isNew, err := d.enrichStatusSafely(ctx, requestUser, uri, status, @@ -140,17 +143,22 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) - // Fallback to existing. - return status, nil, nil + // Fallback to existing status. + return status, nil, false, nil } - return latest, apubStatus, nil + return latest, statusable, isNew, nil } -// RefreshStatus updates the given status if remote and last_fetched is beyond fetch interval, or if force is set. An updated status model is returned, -// but in the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the -// status thread. An ActivityPub object indicates the status was dereferenced (i.e. updated). -func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) (*gtsmodel.Status, ap.Statusable, error) { +// RefreshStatus is functionally equivalent to GetStatusByURI(), except that it requires a pre +// populated status model (with AT LEAST uri set), and ALL thread dereferencing is asynchronous. +func (d *Dereferencer) RefreshStatus( + ctx context.Context, + requestUser string, + status *gtsmodel.Status, + statusable ap.Statusable, + force bool, +) (*gtsmodel.Status, ap.Statusable, error) { // Check whether needs update. if !force && statusUpToDate(status) { return status, nil, nil @@ -162,28 +170,40 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err) } - // Try to update + deref the passed status model. - latest, apubStatus, err := d.enrichStatusSafely(ctx, + // Try to update + dereference the passed status model. + latest, statusable, isNew, err := d.enrichStatusSafely(ctx, requestUser, uri, status, - apubStatus, + statusable, ) if err != nil { return nil, nil, err } - // This status was updated, enqueue re-dereferencing the whole thread. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) - }) + if statusable != nil { + // Deref parents + children. + d.dereferenceThread(ctx, + requestUser, + uri, + status, + statusable, + isNew, + ) + } - return latest, apubStatus, nil + return latest, statusable, nil } -// RefreshStatusAsync enqueues the given status for an asychronous update fetching, if last_fetched is beyond fetch interval, or if force is set. -// This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary. -func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) { +// RefreshStatusAsync is functionally equivalent to RefreshStatus(), except that ALL +// dereferencing is queued for asynchronous processing, (both thread AND status). +func (d *Dereferencer) RefreshStatusAsync( + ctx context.Context, + requestUser string, + status *gtsmodel.Status, + statusable ap.Statusable, + force bool, +) { // Check whether needs update. if !force && statusUpToDate(status) { return @@ -196,17 +216,25 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin return } - // Enqueue a worker function to re-fetch this status async. + // Enqueue a worker function to re-fetch this status entirely async. d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus) + latest, statusable, _, err := d.enrichStatusSafely(ctx, + requestUser, + uri, + status, + statusable, + ) if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) return } - - if apubStatus != nil { - // This status was updated, re-dereference the whole thread. - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) + if statusable != nil { + if err := d.DereferenceStatusAncestors(ctx, requestUser, latest); err != nil { + log.Error(ctx, err) + } + if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { + log.Error(ctx, err) + } } }) } @@ -220,7 +248,7 @@ func (d *Dereferencer) enrichStatusSafely( uri *url.URL, status *gtsmodel.Status, apubStatus ap.Statusable, -) (*gtsmodel.Status, ap.Statusable, error) { +) (*gtsmodel.Status, ap.Statusable, bool, error) { uriStr := status.URI if status.ID != "" { @@ -238,6 +266,9 @@ func (d *Dereferencer) enrichStatusSafely( unlock = doOnce(unlock) defer unlock() + // This is a NEW status (to us). + isNew := (status.ID == "") + // Perform status enrichment with passed vars. latest, apubStatus, err := d.enrichStatus(ctx, requestUser, @@ -261,6 +292,7 @@ func (d *Dereferencer) enrichStatusSafely( // otherwise this indicates WE // enriched the status. apubStatus = nil + isNew = false // DATA RACE! We likely lost out to another goroutine // in a call to db.Put(Status). Look again in DB by URI. @@ -270,7 +302,7 @@ func (d *Dereferencer) enrichStatusSafely( } } - return latest, apubStatus, err + return latest, apubStatus, isNew, err } // enrichStatus will enrich the given status, whether a new @@ -343,6 +375,7 @@ func (d *Dereferencer) enrichStatus( } // Carry-over values and set fetch time. + latestStatus.UpdatedAt = status.UpdatedAt latestStatus.FetchedAt = time.Now() latestStatus.Local = status.Local diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index 5753ce4dd..0ad8f09e4 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -38,15 +38,42 @@ import ( // ancesters we are willing to follow before returning error. const maxIter = 1000 -func (d *Dereferencer) dereferenceThread(ctx context.Context, username string, statusIRI *url.URL, status *gtsmodel.Status, statusable ap.Statusable) { - // Ensure that ancestors have been fully dereferenced - if err := d.DereferenceStatusAncestors(ctx, username, status); err != nil { - log.Error(ctx, err) - } +// dereferenceThread handles dereferencing status thread after +// fetch. Passing off appropriate parts to be enqueued for async +// processing, or handling some parts synchronously when required. +func (d *Dereferencer) dereferenceThread( + ctx context.Context, + requestUser string, + uri *url.URL, + status *gtsmodel.Status, + statusable ap.Statusable, + isNew bool, +) { + if isNew { + // This is a new status that we need the ancestors of in + // order to determine visibility. Perform the initial part + // of thread dereferencing, i.e. parents, synchronously. + err := d.DereferenceStatusAncestors(ctx, requestUser, status) + if err != nil { + log.Error(ctx, err) + } - // Ensure that descendants have been fully dereferenced - if err := d.DereferenceStatusDescendants(ctx, username, statusIRI, statusable); err != nil { - log.Error(ctx, err) + // Enqueue dereferencing remaining status thread, (children), asychronously . + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { + log.Error(ctx, err) + } + }) + } else { + // This is an existing status, dereference the WHOLE thread asynchronously. + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.DereferenceStatusAncestors(ctx, requestUser, status); err != nil { + log.Error(ctx, err) + } + if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { + log.Error(ctx, err) + } + }) } } @@ -157,7 +184,7 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username // - refetching recently fetched statuses (recursion!) // - remote domain is blocked (will return unretrievable) // - any http type error for a new status returns unretrievable - parent, _, err := d.getStatusByURI(ctx, username, inReplyToURI) + parent, _, _, err := d.getStatusByURI(ctx, username, inReplyToURI) if err == nil { // We successfully fetched the parent. // Update current status with new info. @@ -325,7 +352,7 @@ stackLoop: // - refetching recently fetched statuses (recursion!) // - remote domain is blocked (will return unretrievable) // - any http type error for a new status returns unretrievable - _, statusable, err := d.getStatusByURI(ctx, username, itemIRI) + _, statusable, _, err := d.getStatusByURI(ctx, username, itemIRI) if err != nil { if !gtserror.Unretrievable(err) { l.Errorf("error dereferencing remote status %s: %v", itemIRI, err) |