diff options
Diffstat (limited to 'internal/federation/dereferencing/status.go')
-rw-r--r-- | internal/federation/dereferencing/status.go | 121 |
1 files changed, 77 insertions, 44 deletions
diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index 712692814..4dd6d3baf 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -22,9 +22,8 @@ import ( "errors" "io" "net/url" - "time" - "slices" + "time" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -55,12 +54,12 @@ func statusUpToDate(status *gtsmodel.Status) bool { return false } -// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model -// whose last_fetched date is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information -// may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the status thread. An ActivityPub object indicates the status was dereferenced. +// GetStatusByURI will attempt to fetch a status by its URI, first checking the database. In the case of a newly-met remote model, or a remote model whose 'last_fetched' date +// is beyond a certain interval, the status will be dereferenced. In the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, +// e.g. dereferencing the status thread. Param 'syncParent' = true indicates to fetch status ancestors synchronously. An ActivityPub object indicates the status was dereferenced. func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { // Fetch and dereference status if necessary. - status, apubStatus, err := d.getStatusByURI(ctx, + status, statusable, isNew, err := d.getStatusByURI(ctx, requestUser, uri, ) @@ -68,18 +67,22 @@ func (d *Dereferencer) GetStatusByURI(ctx context.Context, requestUser string, u return nil, nil, err } - if apubStatus != nil { - // This status was updated, enqueue re-dereferencing the whole thread. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - d.dereferenceThread(ctx, requestUser, uri, status, apubStatus) - }) + if statusable != nil { + // Deref parents + children. + d.dereferenceThread(ctx, + requestUser, + uri, + status, + statusable, + isNew, + ) } - return status, apubStatus, nil + return status, statusable, nil } // getStatusByURI is a package internal form of .GetStatusByURI() that doesn't bother dereferencing the whole thread on update. -func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, error) { +func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, uri *url.URL) (*gtsmodel.Status, ap.Statusable, bool, error) { var ( status *gtsmodel.Status uriStr = uri.String() @@ -94,7 +97,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u uriStr, ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, nil, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err) + return nil, nil, false, gtserror.Newf("error checking database for status %s by uri: %w", uriStr, err) } if status == nil { @@ -104,14 +107,14 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u uriStr, ) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, nil, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err) + return nil, nil, false, gtserror.Newf("error checking database for status %s by url: %w", uriStr, err) } } if status == nil { // Ensure that this isn't a search for a local status. if uri.Host == config.GetHost() || uri.Host == config.GetAccountDomain() { - return nil, nil, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries + return nil, nil, false, gtserror.SetUnretrievable(err) // this will be db.ErrNoEntries } // Create and pass-through a new bare-bones model for deref. @@ -127,11 +130,11 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err := d.state.DB.PopulateStatus(ctx, status); err != nil { log.Errorf(ctx, "error populating existing status: %v", err) } - return status, nil, nil + return status, nil, false, nil } // Try to update + deref existing status model. - latest, apubStatus, err := d.enrichStatusSafely(ctx, + latest, statusable, isNew, err := d.enrichStatusSafely(ctx, requestUser, uri, status, @@ -140,17 +143,22 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) - // Fallback to existing. - return status, nil, nil + // Fallback to existing status. + return status, nil, false, nil } - return latest, apubStatus, nil + return latest, statusable, isNew, nil } -// RefreshStatus updates the given status if remote and last_fetched is beyond fetch interval, or if force is set. An updated status model is returned, -// but in the case of dereferencing, some low-priority status information may be enqueued for asynchronous fetching, e.g. dereferencing the remainder of the -// status thread. An ActivityPub object indicates the status was dereferenced (i.e. updated). -func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) (*gtsmodel.Status, ap.Statusable, error) { +// RefreshStatus is functionally equivalent to GetStatusByURI(), except that it requires a pre +// populated status model (with AT LEAST uri set), and ALL thread dereferencing is asynchronous. +func (d *Dereferencer) RefreshStatus( + ctx context.Context, + requestUser string, + status *gtsmodel.Status, + statusable ap.Statusable, + force bool, +) (*gtsmodel.Status, ap.Statusable, error) { // Check whether needs update. if !force && statusUpToDate(status) { return status, nil, nil @@ -162,28 +170,40 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err) } - // Try to update + deref the passed status model. - latest, apubStatus, err := d.enrichStatusSafely(ctx, + // Try to update + dereference the passed status model. + latest, statusable, isNew, err := d.enrichStatusSafely(ctx, requestUser, uri, status, - apubStatus, + statusable, ) if err != nil { return nil, nil, err } - // This status was updated, enqueue re-dereferencing the whole thread. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) - }) + if statusable != nil { + // Deref parents + children. + d.dereferenceThread(ctx, + requestUser, + uri, + status, + statusable, + isNew, + ) + } - return latest, apubStatus, nil + return latest, statusable, nil } -// RefreshStatusAsync enqueues the given status for an asychronous update fetching, if last_fetched is beyond fetch interval, or if force is set. -// This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary. -func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) { +// RefreshStatusAsync is functionally equivalent to RefreshStatus(), except that ALL +// dereferencing is queued for asynchronous processing, (both thread AND status). +func (d *Dereferencer) RefreshStatusAsync( + ctx context.Context, + requestUser string, + status *gtsmodel.Status, + statusable ap.Statusable, + force bool, +) { // Check whether needs update. if !force && statusUpToDate(status) { return @@ -196,17 +216,25 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin return } - // Enqueue a worker function to re-fetch this status async. + // Enqueue a worker function to re-fetch this status entirely async. d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus) + latest, statusable, _, err := d.enrichStatusSafely(ctx, + requestUser, + uri, + status, + statusable, + ) if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) return } - - if apubStatus != nil { - // This status was updated, re-dereference the whole thread. - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) + if statusable != nil { + if err := d.DereferenceStatusAncestors(ctx, requestUser, latest); err != nil { + log.Error(ctx, err) + } + if err := d.DereferenceStatusDescendants(ctx, requestUser, uri, statusable); err != nil { + log.Error(ctx, err) + } } }) } @@ -220,7 +248,7 @@ func (d *Dereferencer) enrichStatusSafely( uri *url.URL, status *gtsmodel.Status, apubStatus ap.Statusable, -) (*gtsmodel.Status, ap.Statusable, error) { +) (*gtsmodel.Status, ap.Statusable, bool, error) { uriStr := status.URI if status.ID != "" { @@ -238,6 +266,9 @@ func (d *Dereferencer) enrichStatusSafely( unlock = doOnce(unlock) defer unlock() + // This is a NEW status (to us). + isNew := (status.ID == "") + // Perform status enrichment with passed vars. latest, apubStatus, err := d.enrichStatus(ctx, requestUser, @@ -261,6 +292,7 @@ func (d *Dereferencer) enrichStatusSafely( // otherwise this indicates WE // enriched the status. apubStatus = nil + isNew = false // DATA RACE! We likely lost out to another goroutine // in a call to db.Put(Status). Look again in DB by URI. @@ -270,7 +302,7 @@ func (d *Dereferencer) enrichStatusSafely( } } - return latest, apubStatus, err + return latest, apubStatus, isNew, err } // enrichStatus will enrich the given status, whether a new @@ -343,6 +375,7 @@ func (d *Dereferencer) enrichStatus( } // Carry-over values and set fetch time. + latestStatus.UpdatedAt = status.UpdatedAt latestStatus.FetchedAt = time.Now() latestStatus.Local = status.Local |