diff options
Diffstat (limited to 'internal/federation/dereferencing/account.go')
-rw-r--r-- | internal/federation/dereferencing/account.go | 139 |
1 files changed, 100 insertions, 39 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 670c8e2c8..58f07f9cd 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -122,7 +122,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string, } // Create and pass-through a new bare-bones model for dereferencing. - return d.enrichAccount(ctx, requestUser, uri, >smodel.Account{ + return d.enrichAccountSafely(ctx, requestUser, uri, >smodel.Account{ ID: id.NewULID(), Domain: uri.Host, URI: uriStr, @@ -139,7 +139,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string, } // Try to update existing account model. - latest, apubAcc, err := d.enrichAccount(ctx, + latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, @@ -148,10 +148,6 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string, if err != nil { log.Errorf(ctx, "error enriching remote account: %v", err) - // Update fetch-at to slow re-attempts. - account.FetchedAt = time.Now() - _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") - // Fallback to existing. return account, nil, nil } @@ -218,7 +214,7 @@ func (d *Dereferencer) getAccountByUsernameDomain( } // Create and pass-through a new bare-bones model for dereferencing. - account, apubAcc, err := d.enrichAccount(ctx, requestUser, nil, >smodel.Account{ + account, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, nil, >smodel.Account{ ID: id.NewULID(), Username: username, Domain: domain, @@ -244,7 +240,7 @@ func (d *Dereferencer) getAccountByUsernameDomain( if apubAcc == nil { // This is existing up-to-date account, ensure it is populated. - if err := d.state.DB.PopulateAccount(ctx, account); err != nil { + if err := d.state.DB.PopulateAccount(ctx, latest); err != nil { log.Errorf(ctx, "error populating existing account: %v", err) } } @@ -267,8 +263,8 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a return nil, nil, gtserror.Newf("invalid account uri %q: %w", account.URI, err) } - // Try to update + deref existing account model. - latest, apubAcc, err := d.enrichAccount(ctx, + // Try to update + deref passed account model. + latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, @@ -276,20 +272,17 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a ) if err != nil { log.Errorf(ctx, "error enriching remote account: %v", err) - - // Update fetch-at to slow re-attempts. - account.FetchedAt = time.Now() - _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") - - return nil, nil, err + return nil, nil, gtserror.Newf("error enriching remote account: %w", err) } - // This account was updated, enqueue re-dereference featured posts. - d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil { - log.Errorf(ctx, "error fetching account featured collection: %v", err) - } - }) + if apubAcc != nil { + // This account was updated, enqueue re-dereference featured posts. + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { + log.Errorf(ctx, "error fetching account featured collection: %v", err) + } + }) + } return latest, apubAcc, nil } @@ -311,21 +304,94 @@ func (d *Dereferencer) RefreshAccountAsync(ctx context.Context, requestUser stri // Enqueue a worker function to enrich this account async. d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - latest, _, err := d.enrichAccount(ctx, requestUser, uri, account, apubAcc) + latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, apubAcc) if err != nil { log.Errorf(ctx, "error enriching remote account: %v", err) return } - // This account was updated, re-dereference account featured posts. - if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { - log.Errorf(ctx, "error fetching account featured collection: %v", err) + if apubAcc != nil { + // This account was updated, enqueue re-dereference featured posts. + d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { + if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil { + log.Errorf(ctx, "error fetching account featured collection: %v", err) + } + }) } }) } -// enrichAccount will enrich the given account, whether a new barebones model, or existing model from the database. It handles necessary dereferencing, webfingering etc. -func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, uri *url.URL, account *gtsmodel.Account, apubAcc ap.Accountable) (*gtsmodel.Account, ap.Accountable, error) { +// enrichAccountSafely wraps enrichAccount() to perform +// it within the State{}.FedLocks mutexmap, which protects +// dereferencing actions with per-URI mutex locks. +func (d *Dereferencer) enrichAccountSafely( + ctx context.Context, + requestUser string, + uri *url.URL, + account *gtsmodel.Account, + apubAcc ap.Accountable, +) (*gtsmodel.Account, ap.Accountable, error) { + // By default use account.URI + // as the per-URI deref lock. + uriStr := account.URI + + if uriStr == "" { + // No URI is set yet, instead generate a faux-one from user+domain. + uriStr = "https://" + account.Domain + "/user/" + account.Username + } + + // Acquire per-URI deref lock, wraping unlock + // to safely defer in case of panic, while still + // performing more granular unlocks when needed. + unlock := d.state.FedLocks.Lock(uriStr) + unlock = doOnce(unlock) + defer unlock() + + // Perform status enrichment with passed vars. + latest, apubAcc, err := d.enrichAccount(ctx, + requestUser, + uri, + account, + apubAcc, + ) + + if gtserror.StatusCode(err) >= 400 { + // Update fetch-at to slow re-attempts. + account.FetchedAt = time.Now() + _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at") + } + + // Unlock now + // we're done. + unlock() + + if errors.Is(err, db.ErrAlreadyExists) { + // Ensure AP model isn't set, + // otherwise this indicates WE + // enriched the account. + apubAcc = nil + + // DATA RACE! We likely lost out to another goroutine + // in a call to db.Put(Account). Look again in DB by URI. + latest, err = d.state.DB.GetAccountByURI(ctx, account.URI) + if err != nil { + err = gtserror.Newf("error getting account %s from database after race: %w", uriStr, err) + } + } + + return latest, apubAcc, err +} + +// enrichAccount will enrich the given account, whether a +// new barebones model, or existing model from the database. +// It handles necessary dereferencing, webfingering etc. +func (d *Dereferencer) enrichAccount( + ctx context.Context, + requestUser string, + uri *url.URL, + account *gtsmodel.Account, + apubAcc ap.Accountable, +) (*gtsmodel.Account, ap.Accountable, error) { // Pre-fetch a transport for requesting username, used by later deref procedures. tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser) if err != nil { @@ -476,13 +542,6 @@ func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, ur // This is new, put it in the database. err := d.state.DB.PutAccount(ctx, latestAcc) - - if errors.Is(err, db.ErrAlreadyExists) { - // TODO: replace this quick fix with per-URI deref locks. - latestAcc, err = d.state.DB.GetAccountByURI(ctx, latestAcc.URI) - return latestAcc, nil, err - } - if err != nil { return nil, nil, gtserror.Newf("error putting in database: %w", err) } @@ -545,7 +604,8 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran } // Acquire lock for derefs map. - unlock := d.derefAvatarsMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) + unlock = doOnce(unlock) defer unlock() // Look for an existing dereference in progress. @@ -573,7 +633,7 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran defer func() { // On exit safely remove media from map. - unlock := d.derefAvatarsMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) delete(d.derefAvatars, latestAcc.AvatarRemoteURL) unlock() }() @@ -635,7 +695,8 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran } // Acquire lock for derefs map. - unlock := d.derefHeadersMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) + unlock = doOnce(unlock) defer unlock() // Look for an existing dereference in progress. @@ -663,7 +724,7 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran defer func() { // On exit safely remove media from map. - unlock := d.derefHeadersMu.Lock() + unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) delete(d.derefHeaders, latestAcc.HeaderRemoteURL) unlock() }() |