diff options
| author | 2024-06-06 08:50:14 +0000 | |
|---|---|---|
| committer | 2024-06-06 10:50:14 +0200 | |
| commit | 3b7faac604000297b74baf8f922c79c6b387217d (patch) | |
| tree | 05b1d32eeaa902571a9ec1fb0d643a08b9be2a86 /internal/federation | |
| parent | [chore] Fiddle with CI tests; use wasmsqlite3 for CI tests (#2966) (diff) | |
| download | gotosocial-3b7faac604000297b74baf8f922c79c6b387217d.tar.xz | |
[bugfix] concurrent map writes in dereferencer media processing maps (#2964)
* removes the avatar / header deref maps as we now have per-uri status / account locks, adds retries on data-races, adds separate emoji map mutex
* work with a copy of account / status for each retry loop
* revert to old data race behaviour, it gets too complicated otherwise
---------
Co-authored-by: tobi <tobi.smethurst@protonmail.com>
Diffstat (limited to 'internal/federation')
| -rw-r--r-- | internal/federation/dereferencing/account.go | 97 | ||||
| -rw-r--r-- | internal/federation/dereferencing/dereferencer.go | 21 | ||||
| -rw-r--r-- | internal/federation/dereferencing/emoji.go | 69 | ||||
| -rw-r--r-- | internal/federation/dereferencing/status.go | 4 | 
4 files changed, 78 insertions, 113 deletions
| diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index 94df9538a..bd97b91ed 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -496,7 +496,7 @@ func (d *Dereferencer) enrichAccount(  				account.Username, account.Domain, err,  			) -		case err == nil && account.Domain != accDomain: +		case account.Domain != accDomain:  			// After webfinger, we now have correct account domain from which we can do a final DB check.  			alreadyAcc, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain)  			if err != nil && !errors.Is(err, db.ErrNoEntries) { @@ -518,7 +518,7 @@ func (d *Dereferencer) enrichAccount(  			// or the stub account we were passed.  			fallthrough -		case err == nil: +		default:  			// Update account with latest info.  			account.URI = accURI.String()  			account.Domain = accDomain @@ -531,19 +531,14 @@ func (d *Dereferencer) enrichAccount(  		// must parse from account.  		uri, err = url.Parse(account.URI)  		if err != nil { -			return nil, nil, gtserror.Newf( -				"invalid uri %q: %w", -				account.URI, gtserror.SetUnretrievable(err), -			) +			err := gtserror.Newf("invalid uri %q: %w", account.URI, err) +			return nil, nil, gtserror.SetUnretrievable(err)  		}  		// Check URI scheme ahead of time for more useful errs.  		if uri.Scheme != "http" && uri.Scheme != "https" { -			err := errors.New("account URI scheme must be http or https") -			return nil, nil, gtserror.Newf( -				"invalid uri %q: %w", -				account.URI, gtserror.SetUnretrievable(err), -			) +			err := gtserror.Newf("invalid uri %q: scheme must be http(s)", account.URI) +			return nil, nil, gtserror.SetUnretrievable(err)  		}  	} @@ -634,7 +629,7 @@ func (d *Dereferencer) enrichAccount(  	if err != nil {  		// ASRepresentationToAccount will set Malformed on the  		// returned error, so we don't need to do it here. -		err = gtserror.Newf("error converting accountable to gts model for account %s: %w", uri, err) +		err = gtserror.Newf("error converting %s to gts model: %w", uri, err)  		return nil, nil, err  	} @@ -798,39 +793,16 @@ func (d *Dereferencer) fetchRemoteAccountAvatar(ctx context.Context, tsport tran  		return gtserror.Newf("error parsing url %s: %w", latestAcc.AvatarRemoteURL, err)  	} -	// Acquire lock for derefs map. -	unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) -	unlock = util.DoOnce(unlock) -	defer unlock() - -	// Look for an existing dereference in progress. -	processing, ok := d.derefAvatars[latestAcc.AvatarRemoteURL] - -	if !ok { -		// Set the media data function to dereference avatar from URI. -		data := func(ctx context.Context) (io.ReadCloser, int64, error) { -			return tsport.DereferenceMedia(ctx, avatarURI) -		} - -		// Create new media processing request from the media manager instance. -		processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ -			Avatar:    func() *bool { v := true; return &v }(), -			RemoteURL: &latestAcc.AvatarRemoteURL, -		}) - -		// Store media in map to mark as processing. -		d.derefAvatars[latestAcc.AvatarRemoteURL] = processing - -		defer func() { -			// On exit safely remove media from map. -			unlock := d.state.FedLocks.Lock(latestAcc.AvatarRemoteURL) -			delete(d.derefAvatars, latestAcc.AvatarRemoteURL) -			unlock() -		}() +	// Set the media data function to dereference avatar from URI. +	data := func(ctx context.Context) (io.ReadCloser, int64, error) { +		return tsport.DereferenceMedia(ctx, avatarURI)  	} -	// Unlock map. -	unlock() +	// Create new media processing request from the media manager instance. +	processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ +		Avatar:    func() *bool { v := true; return &v }(), +		RemoteURL: &latestAcc.AvatarRemoteURL, +	})  	// Start media attachment loading (blocking call).  	if _, err := processing.LoadAttachment(ctx); err != nil { @@ -884,39 +856,16 @@ func (d *Dereferencer) fetchRemoteAccountHeader(ctx context.Context, tsport tran  		return gtserror.Newf("error parsing url %s: %w", latestAcc.HeaderRemoteURL, err)  	} -	// Acquire lock for derefs map. -	unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) -	unlock = util.DoOnce(unlock) -	defer unlock() - -	// Look for an existing dereference in progress. -	processing, ok := d.derefHeaders[latestAcc.HeaderRemoteURL] - -	if !ok { -		// Set the media data function to dereference avatar from URI. -		data := func(ctx context.Context) (io.ReadCloser, int64, error) { -			return tsport.DereferenceMedia(ctx, headerURI) -		} - -		// Create new media processing request from the media manager instance. -		processing = d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ -			Header:    func() *bool { v := true; return &v }(), -			RemoteURL: &latestAcc.HeaderRemoteURL, -		}) - -		// Store media in map to mark as processing. -		d.derefHeaders[latestAcc.HeaderRemoteURL] = processing - -		defer func() { -			// On exit safely remove media from map. -			unlock := d.state.FedLocks.Lock(latestAcc.HeaderRemoteURL) -			delete(d.derefHeaders, latestAcc.HeaderRemoteURL) -			unlock() -		}() +	// Set the media data function to dereference avatar from URI. +	data := func(ctx context.Context) (io.ReadCloser, int64, error) { +		return tsport.DereferenceMedia(ctx, headerURI)  	} -	// Unlock map. -	unlock() +	// Create new media processing request from the media manager instance. +	processing := d.mediaManager.PreProcessMedia(data, latestAcc.ID, &media.AdditionalMediaInfo{ +		Header:    func() *bool { v := true; return &v }(), +		RemoteURL: &latestAcc.HeaderRemoteURL, +	})  	// Start media attachment loading (blocking call).  	if _, err := processing.LoadAttachment(ctx); err != nil { diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index 3fa199345..f7f4d975e 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -85,11 +85,22 @@ type Dereferencer struct {  	mediaManager        *media.Manager  	visibility          *visibility.Filter -	// all protected by State{}.FedLocks. -	derefAvatars map[string]*media.ProcessingMedia -	derefHeaders map[string]*media.ProcessingMedia -	derefEmojis  map[string]*media.ProcessingEmoji +	// in-progress dereferencing emoji. we already perform +	// locks per-status and per-account so we don't need +	// processing maps for other media which won't often +	// end up being repeated. worst case we run into an +	// db.ErrAlreadyExists error which then gets handled +	// appropriately by enrich{Account,Status}Safely(). +	derefEmojis   map[string]*media.ProcessingEmoji +	derefEmojisMu sync.Mutex +	// handshakes marks current in-progress handshakes +	// occurring, useful to prevent a deadlock between +	// gotosocial instances attempting to dereference +	// accounts for the first time. when a handshake is +	// currently ongoing we know not to block waiting +	// on certain data and instead return an in-progress +	// form of the data as we currently see it.  	handshakes   map[string][]*url.URL  	handshakesMu sync.Mutex  } @@ -108,8 +119,6 @@ func NewDereferencer(  		transportController: transportController,  		mediaManager:        mediaManager,  		visibility:          visFilter, -		derefAvatars:        make(map[string]*media.ProcessingMedia), -		derefHeaders:        make(map[string]*media.ProcessingMedia),  		derefEmojis:         make(map[string]*media.ProcessingEmoji),  		handshakes:          make(map[string][]*url.URL),  	} diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index 009191780..e81737d04 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -24,6 +24,7 @@ import (  	"net/url"  	"github.com/superseriousbusiness/gotosocial/internal/db" +	"github.com/superseriousbusiness/gotosocial/internal/gtserror"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"  	"github.com/superseriousbusiness/gotosocial/internal/id"  	"github.com/superseriousbusiness/gotosocial/internal/log" @@ -31,11 +32,8 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/util"  ) -func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) { -	var ( -		shortcodeDomain = shortcode + "@" + domain -		processingEmoji *media.ProcessingEmoji -	) +func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestUser string, remoteURL string, shortcode string, domain string, id string, emojiURI string, ai *media.AdditionalEmojiInfo, refresh bool) (*media.ProcessingEmoji, error) { +	var shortcodeDomain = shortcode + "@" + domain  	// Ensure we have been passed a valid URL.  	derefURI, err := url.Parse(remoteURL) @@ -43,52 +41,61 @@ func (d *Dereferencer) GetRemoteEmoji(ctx context.Context, requestingUsername st  		return nil, fmt.Errorf("GetRemoteEmoji: error parsing url for emoji %s: %s", shortcodeDomain, err)  	} -	// Acquire lock for derefs map. -	unlock := d.state.FedLocks.Lock(remoteURL) +	// Acquire derefs lock. +	d.derefEmojisMu.Lock() + +	// Ensure unlock only done once. +	unlock := d.derefEmojisMu.Unlock  	unlock = util.DoOnce(unlock)  	defer unlock() -	// first check if we're already processing this emoji -	if alreadyProcessing, ok := d.derefEmojis[shortcodeDomain]; ok { -		// we're already on it, no worries -		processingEmoji = alreadyProcessing -	} else { -		// not processing it yet, let's start -		t, err := d.transportController.NewTransportForUsername(ctx, requestingUsername) +	// Look for an existing dereference in progress. +	processing, ok := d.derefEmojis[shortcodeDomain] + +	if !ok { +		// Fetch a transport for current request user in order to perform request. +		tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser)  		if err != nil { -			return nil, fmt.Errorf("GetRemoteEmoji: error creating transport to fetch emoji %s: %s", shortcodeDomain, err) +			return nil, gtserror.Newf("couldn't create transport: %w", err)  		} -		dataFunc := func(innerCtx context.Context) (io.ReadCloser, int64, error) { -			return t.DereferenceMedia(innerCtx, derefURI) +		// Set the media data function to dereference emoji from URI. +		data := func(ctx context.Context) (io.ReadCloser, int64, error) { +			return tsport.DereferenceMedia(ctx, derefURI)  		} -		newProcessing, err := d.mediaManager.PreProcessEmoji(ctx, dataFunc, shortcode, id, emojiURI, ai, refresh) +		// Create new emoji processing request from the media manager. +		processing, err = d.mediaManager.PreProcessEmoji(ctx, data, +			shortcode, +			id, +			emojiURI, +			ai, +			refresh, +		)  		if err != nil { -			return nil, fmt.Errorf("GetRemoteEmoji: error processing emoji %s: %s", shortcodeDomain, err) +			return nil, gtserror.Newf("error preprocessing emoji %s: %s", shortcodeDomain, err)  		} -		// store it in our map to indicate it's in process -		d.derefEmojis[shortcodeDomain] = newProcessing -		processingEmoji = newProcessing +		// Store media in map to mark as processing. +		d.derefEmojis[shortcodeDomain] = processing + +		defer func() { +			// On exit safely remove emoji from map. +			d.derefEmojisMu.Lock() +			delete(d.derefEmojis, shortcodeDomain) +			d.derefEmojisMu.Unlock() +		}()  	}  	// Unlock map.  	unlock() -	defer func() { -		// On exit safely remove emoji from map. -		unlock := d.state.FedLocks.Lock(remoteURL) -		delete(d.derefEmojis, shortcodeDomain) -		unlock() -	}() -  	// Start emoji attachment loading (blocking call). -	if _, err := processingEmoji.LoadEmoji(ctx); err != nil { +	if _, err := processing.LoadEmoji(ctx); err != nil {  		return nil, err  	} -	return processingEmoji, nil +	return processing, nil  }  func (d *Dereferencer) populateEmojis(ctx context.Context, rawEmojis []*gtsmodel.Emoji, requestingUsername string) ([]*gtsmodel.Emoji, error) { diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index bd50a08fd..69627adc2 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -285,7 +285,7 @@ func (d *Dereferencer) enrichStatusSafely(  	requestUser string,  	uri *url.URL,  	status *gtsmodel.Status, -	apubStatus ap.Statusable, +	statusable ap.Statusable,  ) (*gtsmodel.Status, ap.Statusable, bool, error) {  	uriStr := status.URI @@ -313,7 +313,7 @@ func (d *Dereferencer) enrichStatusSafely(  		requestUser,  		uri,  		status, -		apubStatus, +		statusable,  	)  	if gtserror.StatusCode(err) >= 400 { | 
