From 3b7faac604000297b74baf8f922c79c6b387217d Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Thu, 6 Jun 2024 08:50:14 +0000 Subject: [bugfix] concurrent map writes in dereferencer media processing maps (#2964) * removes the avatar / header deref maps as we now have per-uri status / account locks, adds retries on data-races, adds separate emoji map mutex * work with a copy of account / status for each retry loop * revert to old data race behaviour, it gets too complicated otherwise --------- Co-authored-by: tobi --- internal/federation/dereferencing/account.go | 97 +++++++--------------------- 1 file changed, 23 insertions(+), 74 deletions(-) (limited to 'internal/federation/dereferencing/account.go') diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 94df9538a..bd97b91ed 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -496,7 +496,7 @@ func (d *Dereferencer) enrichAccount( account.Username, account.Domain, err, ) - case err == nil && account.Domain != accDomain: + case account.Domain != accDomain: // After webfinger, we now have correct account domain from which we can do a final DB check. alreadyAcc, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain) if err != nil && !errors.Is(err, db.ErrNoEntries) { @@ -518,7 +518,7 @@ func (d *Dereferencer) enrichAccount( // or the stub account we were passed. fallthrough - case err == nil: + default: // Update account with latest info. account.URI = accURI.String() account.Domain = accDomain @@ -531,19 +531,14 @@ func (d *Dereferencer) enrichAccount( // must parse from account. uri, err = url.Parse(account.URI) if err != nil { - return nil, nil, gtserror.Newf( - "invalid uri %q: %w", - account.URI, gtserror.SetUnretrievable(err), - ) + err := gtserror.Newf("invalid uri %q: %w", account.URI, err) + return nil, nil, gtserror.SetUnretrievable(err) } // Check URI scheme ahead of time for more useful errs. if uri.Scheme != "http" && uri.Scheme != "https" { - err := errors.New("account URI scheme must be http or https") - return nil, nil, gtserror.Newf( - "invalid uri %q: %w", - account.URI, gtserror.SetUnretrievable(err), - ) + err := gtserror.Newf("invalid uri %q: scheme must be http(s)", account.URI) + return nil, nil, gtserror.SetUnretrievable(err) } } @@ -634,7 +629,7 @@ func (d *Dereferencer) enrichAccount( if err != nil { // ASRepresentationToAccount will set Malformed on the // returned error, so we don't need to do it here. - err = gtserror.Newf("error converting accountable to gts model for account %s: %w", uri, err) + err = gtserror.Newf("error converting %s to gts model: %w", uri, err) return nil, nil, err } @@ -798,39 +793,16 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran return gtserror.Newf("error parsing url %s: %w", latestAcc.AvatarRemoteURL, err) } - // Acquire lock for derefs map. - unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) - unlock = util.DoOnce(unlock) - defer unlock() - - // Look for an existing dereference in progress. - processing, ok := d.derefAvatars[latestAcc.AvatarRemoteURL] - - if !ok { - // Set the media data function to dereference avatar from URI. - data := func(ctx context.Context) (io.ReadCloser, int64, error) { - return tsport.DereferenceMedia(ctx, avatarURI) - } - - // Create new media processing request from the media manager instance. - processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ - Avatar: func() *bool { v := true; return &v }(), - RemoteURL: &latestAcc.AvatarRemoteURL, - }) - - // Store media in map to mark as processing. - d.derefAvatars[latestAcc.AvatarRemoteURL] = processing - - defer func() { - // On exit safely remove media from map. - unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) - delete(d.derefAvatars, latestAcc.AvatarRemoteURL) - unlock() - }() + // Set the media data function to dereference avatar from URI. + data := func(ctx context.Context) (io.ReadCloser, int64, error) { + return tsport.DereferenceMedia(ctx, avatarURI) } - // Unlock map. - unlock() + // Create new media processing request from the media manager instance. + processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ + Avatar: func() *bool { v := true; return &v }(), + RemoteURL: &latestAcc.AvatarRemoteURL, + }) // Start media attachment loading (blocking call). if _, err := processing.LoadAttachment(ctx); err != nil { @@ -884,39 +856,16 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran return gtserror.Newf("error parsing url %s: %w", latestAcc.HeaderRemoteURL, err) } - // Acquire lock for derefs map. - unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) - unlock = util.DoOnce(unlock) - defer unlock() - - // Look for an existing dereference in progress. - processing, ok := d.derefHeaders[latestAcc.HeaderRemoteURL] - - if !ok { - // Set the media data function to dereference avatar from URI. - data := func(ctx context.Context) (io.ReadCloser, int64, error) { - return tsport.DereferenceMedia(ctx, headerURI) - } - - // Create new media processing request from the media manager instance. - processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ - Header: func() *bool { v := true; return &v }(), - RemoteURL: &latestAcc.HeaderRemoteURL, - }) - - // Store media in map to mark as processing. - d.derefHeaders[latestAcc.HeaderRemoteURL] = processing - - defer func() { - // On exit safely remove media from map. - unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) - delete(d.derefHeaders, latestAcc.HeaderRemoteURL) - unlock() - }() + // Set the media data function to dereference avatar from URI. + data := func(ctx context.Context) (io.ReadCloser, int64, error) { + return tsport.DereferenceMedia(ctx, headerURI) } - // Unlock map. - unlock() + // Create new media processing request from the media manager instance. + processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ + Header: func() *bool { v := true; return &v }(), + RemoteURL: &latestAcc.HeaderRemoteURL, + }) // Start media attachment loading (blocking call). if _, err := processing.LoadAttachment(ctx); err != nil { -- cgit v1.2.3