summaryrefslogtreecommitdiff
path: root/internal/federation/dereferencing/account.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/federation/dereferencing/account.go')
-rw-r--r--internal/federation/dereferencing/account.go139
1 files changed, 100 insertions, 39 deletions
diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go
index 670c8e2c8..58f07f9cd 100644
--- a/internal/federation/dereferencing/account.go
+++ b/internal/federation/dereferencing/account.go
@@ -122,7 +122,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,
}
// Create and pass-through a new bare-bones model for dereferencing.
- return d.enrichAccount(ctx, requestUser, uri, &gtsmodel.Account{
+ return d.enrichAccountSafely(ctx, requestUser, uri, &gtsmodel.Account{
ID: id.NewULID(),
Domain: uri.Host,
URI: uriStr,
@@ -139,7 +139,7 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,
}
// Try to update existing account model.
- latest, apubAcc, err := d.enrichAccount(ctx,
+ latest, apubAcc, err := d.enrichAccountSafely(ctx,
requestUser,
uri,
account,
@@ -148,10 +148,6 @@ func (d *Dereferencer) getAccountByURI(ctx context.Context, requestUser string,
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
- // Update fetch-at to slow re-attempts.
- account.FetchedAt = time.Now()
- _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at")
-
// Fallback to existing.
return account, nil, nil
}
@@ -218,7 +214,7 @@ func (d *Dereferencer) getAccountByUsernameDomain(
}
// Create and pass-through a new bare-bones model for dereferencing.
- account, apubAcc, err := d.enrichAccount(ctx, requestUser, nil, &gtsmodel.Account{
+ account, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, nil, &gtsmodel.Account{
ID: id.NewULID(),
Username: username,
Domain: domain,
@@ -244,7 +240,7 @@ func (d *Dereferencer) getAccountByUsernameDomain(
if apubAcc == nil {
// This is existing up-to-date account, ensure it is populated.
- if err := d.state.DB.PopulateAccount(ctx, account); err != nil {
+ if err := d.state.DB.PopulateAccount(ctx, latest); err != nil {
log.Errorf(ctx, "error populating existing account: %v", err)
}
}
@@ -267,8 +263,8 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a
return nil, nil, gtserror.Newf("invalid account uri %q: %w", account.URI, err)
}
- // Try to update + deref existing account model.
- latest, apubAcc, err := d.enrichAccount(ctx,
+ // Try to update + deref passed account model.
+ latest, apubAcc, err := d.enrichAccountSafely(ctx,
requestUser,
uri,
account,
@@ -276,20 +272,17 @@ func (d *Dereferencer) RefreshAccount(ctx context.Context, requestUser string, a
)
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
-
- // Update fetch-at to slow re-attempts.
- account.FetchedAt = time.Now()
- _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at")
-
- return nil, nil, err
+ return nil, nil, gtserror.Newf("error enriching remote account: %w", err)
}
- // This account was updated, enqueue re-dereference featured posts.
- d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- if err := d.dereferenceAccountFeatured(ctx, requestUser, account); err != nil {
- log.Errorf(ctx, "error fetching account featured collection: %v", err)
- }
- })
+ if apubAcc != nil {
+ // This account was updated, enqueue re-dereference featured posts.
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
+ log.Errorf(ctx, "error fetching account featured collection: %v", err)
+ }
+ })
+ }
return latest, apubAcc, nil
}
@@ -311,21 +304,94 @@ func (d *Dereferencer) RefreshAccountAsync(ctx context.Context, requestUser stri
// Enqueue a worker function to enrich this account async.
d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- latest, _, err := d.enrichAccount(ctx, requestUser, uri, account, apubAcc)
+ latest, apubAcc, err := d.enrichAccountSafely(ctx, requestUser, uri, account, apubAcc)
if err != nil {
log.Errorf(ctx, "error enriching remote account: %v", err)
return
}
- // This account was updated, re-dereference account featured posts.
- if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
- log.Errorf(ctx, "error fetching account featured collection: %v", err)
+ if apubAcc != nil {
+ // This account was updated, enqueue re-dereference featured posts.
+ d.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
+ if err := d.dereferenceAccountFeatured(ctx, requestUser, latest); err != nil {
+ log.Errorf(ctx, "error fetching account featured collection: %v", err)
+ }
+ })
}
})
}
-// enrichAccount will enrich the given account, whether a new barebones model, or existing model from the database. It handles necessary dereferencing, webfingering etc.
-func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, uri *url.URL, account *gtsmodel.Account, apubAcc ap.Accountable) (*gtsmodel.Account, ap.Accountable, error) {
+// enrichAccountSafely wraps enrichAccount() to perform
+// it within the State{}.FedLocks mutexmap, which protects
+// dereferencing actions with per-URI mutex locks.
+func (d *Dereferencer) enrichAccountSafely(
+ ctx context.Context,
+ requestUser string,
+ uri *url.URL,
+ account *gtsmodel.Account,
+ apubAcc ap.Accountable,
+) (*gtsmodel.Account, ap.Accountable, error) {
+ // By default use account.URI
+ // as the per-URI deref lock.
+ uriStr := account.URI
+
+ if uriStr == "" {
+ // No URI is set yet, instead generate a faux-one from user+domain.
+ uriStr = "https://" + account.Domain + "/user/" + account.Username
+ }
+
+ // Acquire per-URI deref lock, wraping unlock
+ // to safely defer in case of panic, while still
+ // performing more granular unlocks when needed.
+ unlock := d.state.FedLocks.Lock(uriStr)
+ unlock = doOnce(unlock)
+ defer unlock()
+
+ // Perform status enrichment with passed vars.
+ latest, apubAcc, err := d.enrichAccount(ctx,
+ requestUser,
+ uri,
+ account,
+ apubAcc,
+ )
+
+ if gtserror.StatusCode(err) >= 400 {
+ // Update fetch-at to slow re-attempts.
+ account.FetchedAt = time.Now()
+ _ = d.state.DB.UpdateAccount(ctx, account, "fetched_at")
+ }
+
+ // Unlock now
+ // we're done.
+ unlock()
+
+ if errors.Is(err, db.ErrAlreadyExists) {
+ // Ensure AP model isn't set,
+ // otherwise this indicates WE
+ // enriched the account.
+ apubAcc = nil
+
+ // DATA RACE! We likely lost out to another goroutine
+ // in a call to db.Put(Account). Look again in DB by URI.
+ latest, err = d.state.DB.GetAccountByURI(ctx, account.URI)
+ if err != nil {
+ err = gtserror.Newf("error getting account %s from database after race: %w", uriStr, err)
+ }
+ }
+
+ return latest, apubAcc, err
+}
+
+// enrichAccount will enrich the given account, whether a
+// new barebones model, or existing model from the database.
+// It handles necessary dereferencing, webfingering etc.
+func (d *Dereferencer) enrichAccount(
+ ctx context.Context,
+ requestUser string,
+ uri *url.URL,
+ account *gtsmodel.Account,
+ apubAcc ap.Accountable,
+) (*gtsmodel.Account, ap.Accountable, error) {
// Pre-fetch a transport for requesting username, used by later deref procedures.
tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)
if err != nil {
@@ -476,13 +542,6 @@ func (d *Dereferencer) enrichAccount(ctx context.Context, requestUser string, ur
// This is new, put it in the database.
err := d.state.DB.PutAccount(ctx, latestAcc)
-
- if errors.Is(err, db.ErrAlreadyExists) {
- // TODO: replace this quick fix with per-URI deref locks.
- latestAcc, err = d.state.DB.GetAccountByURI(ctx, latestAcc.URI)
- return latestAcc, nil, err
- }
-
if err != nil {
return nil, nil, gtserror.Newf("error putting in database: %w", err)
}
@@ -545,7 +604,8 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran
}
// Acquire lock for derefs map.
- unlock := d.derefAvatarsMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL)
+ unlock = doOnce(unlock)
defer unlock()
// Look for an existing dereference in progress.
@@ -573,7 +633,7 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran
defer func() {
// On exit safely remove media from map.
- unlock := d.derefAvatarsMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL)
delete(d.derefAvatars, latestAcc.AvatarRemoteURL)
unlock()
}()
@@ -635,7 +695,8 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran
}
// Acquire lock for derefs map.
- unlock := d.derefHeadersMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL)
+ unlock = doOnce(unlock)
defer unlock()
// Look for an existing dereference in progress.
@@ -663,7 +724,7 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran
defer func() {
// On exit safely remove media from map.
- unlock := d.derefHeadersMu.Lock()
+ unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL)
delete(d.derefHeaders, latestAcc.HeaderRemoteURL)
unlock()
}()