diff options
Diffstat (limited to 'internal/federation/dereferencing/status.go')
-rw-r--r-- | internal/federation/dereferencing/status.go | 167 |
1 files changed, 110 insertions, 57 deletions
diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index adc73e843..712692814 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -115,7 +115,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u } // Create and pass-through a new bare-bones model for deref. - return d.enrichStatus(ctx, requestUser, uri, >smodel.Status{ + return d.enrichStatusSafely(ctx, requestUser, uri, >smodel.Status{ Local: func() *bool { var false bool; return &false }(), URI: uriStr, }, nil) @@ -131,7 +131,7 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u } // Try to update + deref existing status model. - latest, apubStatus, err := d.enrichStatus(ctx, + latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, @@ -140,10 +140,6 @@ func (d *Dereferencer) getStatusByURI(ctx context.Context, requestUser string, u if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) - // Update fetch-at to slow re-attempts. - status.FetchedAt = time.Now() - _ = d.state.DB.UpdateStatus(ctx, status, "fetched_at") - // Fallback to existing. return status, nil, nil } @@ -166,8 +162,8 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st return nil, nil, gtserror.Newf("invalid status uri %q: %w", status.URI, err) } - // Try to update + deref existing status model. - latest, apubStatus, err := d.enrichStatus(ctx, + // Try to update + deref the passed status model. + latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, @@ -189,7 +185,7 @@ func (d *Dereferencer) RefreshStatus(ctx context.Context, requestUser string, st // This is a more optimized form of manually enqueueing .UpdateStatus() to the federation worker, since it only enqueues update if necessary. func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser string, status *gtsmodel.Status, apubStatus ap.Statusable, force bool) { // Check whether needs update. - if statusUpToDate(status) { + if !force && statusUpToDate(status) { return } @@ -202,17 +198,81 @@ func (d *Dereferencer) RefreshStatusAsync(ctx context.Context, requestUser strin // Enqueue a worker function to re-fetch this status async. d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - latest, apubStatus, err := d.enrichStatus(ctx, requestUser, uri, status, apubStatus) + latest, apubStatus, err := d.enrichStatusSafely(ctx, requestUser, uri, status, apubStatus) if err != nil { log.Errorf(ctx, "error enriching remote status: %v", err) return } - // This status was updated, re-dereference the whole thread. - d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) + if apubStatus != nil { + // This status was updated, re-dereference the whole thread. + d.dereferenceThread(ctx, requestUser, uri, latest, apubStatus) + } }) } +// enrichStatusSafely wraps enrichStatus() to perform +// it within the State{}.FedLocks mutexmap, which protects +// dereferencing actions with per-URI mutex locks. +func (d *Dereferencer) enrichStatusSafely( + ctx context.Context, + requestUser string, + uri *url.URL, + status *gtsmodel.Status, + apubStatus ap.Statusable, +) (*gtsmodel.Status, ap.Statusable, error) { + uriStr := status.URI + + if status.ID != "" { + // This is an existing status, first try to populate it. This + // is required by the checks below for existing tags, media etc. + if err := d.state.DB.PopulateStatus(ctx, status); err != nil { + log.Errorf(ctx, "error populating existing status %s: %v", uriStr, err) + } + } + + // Acquire per-URI deref lock, wraping unlock + // to safely defer in case of panic, while still + // performing more granular unlocks when needed. + unlock := d.state.FedLocks.Lock(uriStr) + unlock = doOnce(unlock) + defer unlock() + + // Perform status enrichment with passed vars. + latest, apubStatus, err := d.enrichStatus(ctx, + requestUser, + uri, + status, + apubStatus, + ) + + if gtserror.StatusCode(err) >= 400 { + // Update fetch-at to slow re-attempts. + status.FetchedAt = time.Now() + _ = d.state.DB.UpdateStatus(ctx, status, "fetched_at") + } + + // Unlock now + // we're done. + unlock() + + if errors.Is(err, db.ErrAlreadyExists) { + // Ensure AP model isn't set, + // otherwise this indicates WE + // enriched the status. + apubStatus = nil + + // DATA RACE! We likely lost out to another goroutine + // in a call to db.Put(Status). Look again in DB by URI. + latest, err = d.state.DB.GetStatusByURI(ctx, status.URI) + if err != nil { + err = gtserror.Newf("error getting status %s from database after race: %w", uriStr, err) + } + } + + return latest, apubStatus, err +} + // enrichStatus will enrich the given status, whether a new // barebones model, or existing model from the database. // It handles necessary dereferencing, database updates, etc. @@ -258,15 +318,10 @@ func (d *Dereferencer) enrichStatus( return nil, nil, gtserror.New("attributedTo was empty") } - // Ensure we have the author account of the status dereferenced (+ up-to-date). - if author, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil { - if status.AccountID == "" { - // Provided status account is nil, i.e. this is a new status / author, so a deref fail is unrecoverable. - return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err) - } - } else if status.AccountID != "" && status.AccountID != author.ID { - // There already existed an account for this status author, but account ID changed. This shouldn't happen! - log.Warnf(ctx, "status author account ID changed: old=%s new=%s", status.AccountID, author.ID) + // Ensure we have the author account of the status dereferenced (+ up-to-date). If this is a new status + // (i.e. status.AccountID == "") then any error here is irrecoverable. AccountID must ALWAYS be set. + if _, _, err := d.getAccountByURI(ctx, requestUser, attributedTo); err != nil && status.AccountID == "" { + return nil, nil, gtserror.Newf("failed to dereference status author %s: %w", uri, err) } // ActivityPub model was recently dereferenced, so assume that passed status @@ -303,7 +358,7 @@ func (d *Dereferencer) enrichStatus( } // Ensure the status' tags are populated, (changes are expected / okay). - if err := d.fetchStatusTags(ctx, latestStatus); err != nil { + if err := d.fetchStatusTags(ctx, status, latestStatus); err != nil { return nil, nil, gtserror.Newf("error populating tags for status %s: %w", uri, err) } @@ -323,13 +378,6 @@ func (d *Dereferencer) enrichStatus( // // This is new, put the status in the database. err := d.state.DB.PutStatus(ctx, latestStatus) - - if errors.Is(err, db.ErrAlreadyExists) { - // TODO: replace this quick fix with per-URI deref locks. - latestStatus, err = d.state.DB.GetStatusByURI(ctx, latestStatus.URI) - return latestStatus, nil, err - } - if err != nil { return nil, nil, gtserror.Newf("error putting in database: %w", err) } @@ -545,36 +593,41 @@ func (d *Dereferencer) threadStatus(ctx context.Context, status *gtsmodel.Status return nil } -func (d *Dereferencer) fetchStatusTags(ctx context.Context, status *gtsmodel.Status) error { +func (d *Dereferencer) fetchStatusTags(ctx context.Context, existing, status *gtsmodel.Status) error { // Allocate new slice to take the yet-to-be determined tag IDs. status.TagIDs = make([]string, len(status.Tags)) for i := range status.Tags { - placeholder := status.Tags[i] + tag := status.Tags[i] + + // Look for tag in existing status with name. + existing, ok := existing.GetTagByName(tag.Name) + if ok && existing.ID != "" { + status.Tags[i] = existing + status.TagIDs[i] = existing.ID + continue + } - // Look for existing tag with this name first. - tag, err := d.state.DB.GetTagByName(ctx, placeholder.Name) + // Look for existing tag with name in the database. + existing, err := d.state.DB.GetTagByName(ctx, tag.Name) if err != nil && !errors.Is(err, db.ErrNoEntries) { - log.Errorf(ctx, "db error getting tag %s: %v", tag.Name, err) + return gtserror.Newf("db error getting tag %s: %w", tag.Name, err) + } else if existing != nil { + status.Tags[i] = existing + status.TagIDs[i] = existing.ID continue } - if tag == nil { - // Create new ID for tag name. - tag = >smodel.Tag{ - ID: id.NewULID(), - Name: placeholder.Name, - } + // Create new ID for tag. + tag.ID = id.NewULID() - // Insert this tag with new name into the database. - if err := d.state.DB.PutTag(ctx, tag); err != nil { - log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err) - continue - } + // Insert this tag with new name into the database. + if err := d.state.DB.PutTag(ctx, tag); err != nil { + log.Errorf(ctx, "db error putting tag %s: %v", tag.Name, err) + continue } - // Set the *new* tag and ID. - status.Tags[i] = tag + // Set new tag ID in slice. status.TagIDs[i] = tag.ID } @@ -600,10 +653,10 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp status.AttachmentIDs = make([]string, len(status.Attachments)) for i := range status.Attachments { - placeholder := status.Attachments[i] + attachment := status.Attachments[i] // Look for existing media attachment with remoet URL first. - existing, ok := existing.GetAttachmentByRemoteURL(placeholder.RemoteURL) + existing, ok := existing.GetAttachmentByRemoteURL(attachment.RemoteURL) if ok && existing.ID != "" && *existing.Cached { status.Attachments[i] = existing status.AttachmentIDs[i] = existing.ID @@ -611,9 +664,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp } // Ensure a valid media attachment remote URL. - remoteURL, err := url.Parse(placeholder.RemoteURL) + remoteURL, err := url.Parse(attachment.RemoteURL) if err != nil { - log.Errorf(ctx, "invalid remote media url %q: %v", placeholder.RemoteURL, err) + log.Errorf(ctx, "invalid remote media url %q: %v", attachment.RemoteURL, err) continue } @@ -622,9 +675,9 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp return tsport.DereferenceMedia(ctx, remoteURL) }, status.AccountID, &media.AdditionalMediaInfo{ StatusID: &status.ID, - RemoteURL: &placeholder.RemoteURL, - Description: &placeholder.Description, - Blurhash: &placeholder.Blurhash, + RemoteURL: &attachment.RemoteURL, + Description: &attachment.Description, + Blurhash: &attachment.Blurhash, }) if err != nil { log.Errorf(ctx, "error processing attachment: %v", err) @@ -632,15 +685,15 @@ func (d *Dereferencer) fetchStatusAttachments(ctx context.Context, tsport transp } // Force attachment loading *right now*. - media, err := processing.LoadAttachment(ctx) + attachment, err = processing.LoadAttachment(ctx) if err != nil { log.Errorf(ctx, "error loading attachment: %v", err) continue } // Set the *new* attachment and ID. - status.Attachments[i] = media - status.AttachmentIDs[i] = media.ID + status.Attachments[i] = attachment + status.AttachmentIDs[i] = attachment.ID } for i := 0; i < len(status.AttachmentIDs); { |