diff options
Diffstat (limited to 'internal/federation')
-rw-r--r-- | internal/federation/dereferencing/dereferencer.go | 10 | ||||
-rw-r--r-- | internal/federation/dereferencing/emoji.go | 101 | ||||
-rw-r--r-- | internal/federation/dereferencing/media.go | 184 |
3 files changed, 174 insertions, 121 deletions
diff --git a/internal/federation/dereferencing/dereferencer.go b/internal/federation/dereferencing/dereferencer.go index f7f4d975e..bcc145c27 100644 --- a/internal/federation/dereferencing/dereferencer.go +++ b/internal/federation/dereferencing/dereferencer.go @@ -85,12 +85,9 @@ type Dereferencer struct { mediaManager *media.Manager visibility *visibility.Filter - // in-progress dereferencing emoji. we already perform - // locks per-status and per-account so we don't need - // processing maps for other media which won't often - // end up being repeated. worst case we run into an - // db.ErrAlreadyExists error which then gets handled - // appropriately by enrich{Account,Status}Safely(). + // in-progress dereferencing media / emoji + derefMedia map[string]*media.ProcessingMedia + derefMediaMu sync.Mutex derefEmojis map[string]*media.ProcessingEmoji derefEmojisMu sync.Mutex @@ -119,6 +116,7 @@ func NewDereferencer( transportController: transportController, mediaManager: mediaManager, visibility: visFilter, + derefMedia: make(map[string]*media.ProcessingMedia), derefEmojis: make(map[string]*media.ProcessingEmoji), handshakes: make(map[string][]*url.URL), } diff --git a/internal/federation/dereferencing/emoji.go b/internal/federation/dereferencing/emoji.go index 806a3f5ee..22b5a0442 100644 --- a/internal/federation/dereferencing/emoji.go +++ b/internal/federation/dereferencing/emoji.go @@ -77,32 +77,34 @@ func (d *Dereferencer) GetEmoji( // Generate shortcode domain for locks + logging. shortcodeDomain := shortcode + "@" + domain - // Ensure we have a valid remote URL. - url, err := url.Parse(remoteURL) - if err != nil { - err := gtserror.Newf("invalid image remote url %s for emoji %s: %w", remoteURL, shortcodeDomain, err) - return nil, err - } - - // Acquire new instance account transport for emoji dereferencing. - tsport, err := d.transportController.NewTransportForUsername(ctx, "") - if err != nil { - err := gtserror.Newf("error getting instance transport: %w", err) - return nil, err - } - - // Get maximum supported remote emoji size. - maxsz := config.GetMediaEmojiRemoteMaxSize() - - // Prepare data function to dereference remote emoji media. - data := func(context.Context) (io.ReadCloser, error) { - return tsport.DereferenceMedia(ctx, url, int64(maxsz)) - } - // Pass along for safe processing. return d.processEmojiSafely(ctx, shortcodeDomain, func() (*media.ProcessingEmoji, error) { + + // Ensure we have a valid remote URL. + url, err := url.Parse(remoteURL) + if err != nil { + err := gtserror.Newf("invalid image remote url %s for emoji %s: %w", remoteURL, shortcodeDomain, err) + return nil, err + } + + // Acquire new instance account transport for emoji dereferencing. + tsport, err := d.transportController.NewTransportForUsername(ctx, "") + if err != nil { + err := gtserror.Newf("error getting instance transport: %w", err) + return nil, err + } + + // Get maximum supported remote emoji size. + maxsz := config.GetMediaEmojiRemoteMaxSize() + + // Prepare data function to dereference remote emoji media. + data := func(context.Context) (io.ReadCloser, error) { + return tsport.DereferenceMedia(ctx, url, int64(maxsz)) + } + + // Create new emoji with prepared info. return d.mediaManager.CreateEmoji(ctx, shortcode, domain, @@ -142,24 +144,25 @@ func (d *Dereferencer) RefreshEmoji( switch { case info.URI != nil && *info.URI != emoji.URI: + emoji.URI = *info.URI force = true case info.ImageRemoteURL != nil && *info.ImageRemoteURL != emoji.ImageRemoteURL: + emoji.ImageRemoteURL = *info.ImageRemoteURL force = true case info.ImageStaticRemoteURL != nil && *info.ImageStaticRemoteURL != emoji.ImageStaticRemoteURL: + emoji.ImageStaticRemoteURL = *info.ImageStaticRemoteURL force = true } // Check if needs updating. - if !force && *emoji.Cached { + if *emoji.Cached && !force { return emoji, nil } - // TODO: more finegrained freshness checks. - - // Generate shortcode domain for locks + logging. - shortcodeDomain := emoji.Shortcode + "@" + emoji.Domain + // Get shortcode domain for locks + logging. + shortcodeDomain := emoji.ShortcodeDomain() // Ensure we have a valid image remote URL. url, err := url.Parse(emoji.ImageRemoteURL) @@ -168,25 +171,27 @@ func (d *Dereferencer) RefreshEmoji( return nil, err } - // Acquire new instance account transport for emoji dereferencing. - tsport, err := d.transportController.NewTransportForUsername(ctx, "") - if err != nil { - err := gtserror.Newf("error getting instance transport: %w", err) - return nil, err - } - - // Get maximum supported remote emoji size. - maxsz := config.GetMediaEmojiRemoteMaxSize() - - // Prepare data function to dereference remote emoji media. - data := func(context.Context) (io.ReadCloser, error) { - return tsport.DereferenceMedia(ctx, url, int64(maxsz)) - } - // Pass along for safe processing. return d.processEmojiSafely(ctx, shortcodeDomain, func() (*media.ProcessingEmoji, error) { + + // Acquire new instance account transport for emoji dereferencing. + tsport, err := d.transportController.NewTransportForUsername(ctx, "") + if err != nil { + err := gtserror.Newf("error getting instance transport: %w", err) + return nil, err + } + + // Get maximum supported remote emoji size. + maxsz := config.GetMediaEmojiRemoteMaxSize() + + // Prepare data function to dereference remote emoji media. + data := func(context.Context) (io.ReadCloser, error) { + return tsport.DereferenceMedia(ctx, url, int64(maxsz)) + } + + // Refresh emoji with prepared info. return d.mediaManager.RefreshEmoji(ctx, emoji, data, @@ -226,6 +231,13 @@ func (d *Dereferencer) processEmojiSafely( if err != nil { return nil, err } + + defer func() { + // Remove on finish. + d.derefEmojisMu.Lock() + delete(d.derefEmojis, shortcodeDomain) + d.derefEmojisMu.Unlock() + }() } // Unlock map. @@ -240,10 +252,7 @@ func (d *Dereferencer) processEmojiSafely( // which can determine if loading error should allow remaining placeholder. } - // Return a COPY of emoji. - emoji2 := new(gtsmodel.Emoji) - *emoji2 = *emoji - return emoji2, err + return } func (d *Dereferencer) fetchEmojis( diff --git a/internal/federation/dereferencing/media.go b/internal/federation/dereferencing/media.go index 956866e94..48c0e258e 100644 --- a/internal/federation/dereferencing/media.go +++ b/internal/federation/dereferencing/media.go @@ -26,6 +26,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/media" + "github.com/superseriousbusiness/gotosocial/internal/util" ) // GetMedia fetches the media at given remote URL by @@ -56,46 +57,39 @@ func (d *Dereferencer) GetMedia( *gtsmodel.MediaAttachment, error, ) { - // Parse str as valid URL object. + // Ensure we have a valid remote URL. url, err := url.Parse(remoteURL) if err != nil { - return nil, gtserror.Newf("invalid remote media url %q: %v", remoteURL, err) + err := gtserror.Newf("invalid media remote url %s: %w", remoteURL, err) + return nil, err } - // Fetch transport for the provided request user from controller. - tsport, err := d.transportController.NewTransportForUsername(ctx, - requestUser, - ) - if err != nil { - return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err) - } + return d.processMediaSafeley(ctx, + remoteURL, + func() (*media.ProcessingMedia, error) { - // Get maximum supported remote media size. - maxsz := config.GetMediaRemoteMaxSize() + // Fetch transport for the provided request user from controller. + tsport, err := d.transportController.NewTransportForUsername(ctx, + requestUser, + ) + if err != nil { + return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err) + } - // Start processing remote attachment at URL. - processing, err := d.mediaManager.CreateMedia( - ctx, - accountID, - func(ctx context.Context) (io.ReadCloser, error) { - return tsport.DereferenceMedia(ctx, url, int64(maxsz)) + // Get maximum supported remote media size. + maxsz := config.GetMediaRemoteMaxSize() + + // Create media with prepared info. + return d.mediaManager.CreateMedia( + ctx, + accountID, + func(ctx context.Context) (io.ReadCloser, error) { + return tsport.DereferenceMedia(ctx, url, int64(maxsz)) + }, + info, + ) }, - info, ) - if err != nil { - return nil, err - } - - // Perform media load operation. - media, err := processing.Load(ctx) - if err != nil { - err = gtserror.Newf("error loading media %s: %w", media.RemoteURL, err) - - // TODO: in time we should return checkable flags by gtserror.Is___() - // which can determine if loading error should allow remaining placeholder. - } - - return media, err } // RefreshMedia ensures that given media is up-to-date, @@ -119,7 +113,7 @@ func (d *Dereferencer) GetMedia( func (d *Dereferencer) RefreshMedia( ctx context.Context, requestUser string, - media *gtsmodel.MediaAttachment, + attach *gtsmodel.MediaAttachment, info media.AdditionalMediaInfo, force bool, ) ( @@ -127,67 +121,65 @@ func (d *Dereferencer) RefreshMedia( error, ) { // Can't refresh local. - if media.IsLocal() { - return media, nil + if attach.IsLocal() { + return attach, nil } // Check emoji is up-to-date // with provided extra info. switch { case info.Blurhash != nil && - *info.Blurhash != media.Blurhash: + *info.Blurhash != attach.Blurhash: + attach.Blurhash = *info.Blurhash force = true case info.Description != nil && - *info.Description != media.Description: + *info.Description != attach.Description: + attach.Description = *info.Description force = true case info.RemoteURL != nil && - *info.RemoteURL != media.RemoteURL: + *info.RemoteURL != attach.RemoteURL: + attach.RemoteURL = *info.RemoteURL force = true } // Check if needs updating. - if !force && *media.Cached { - return media, nil + if *attach.Cached && !force { + return attach, nil } - // TODO: more finegrained freshness checks. - // Ensure we have a valid remote URL. - url, err := url.Parse(media.RemoteURL) + url, err := url.Parse(attach.RemoteURL) if err != nil { - err := gtserror.Newf("invalid media remote url %s: %w", media.RemoteURL, err) + err := gtserror.Newf("invalid media remote url %s: %w", attach.RemoteURL, err) return nil, err } - // Fetch transport for the provided request user from controller. - tsport, err := d.transportController.NewTransportForUsername(ctx, - requestUser, - ) - if err != nil { - return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err) - } + // Pass along for safe processing. + return d.processMediaSafeley(ctx, + attach.RemoteURL, + func() (*media.ProcessingMedia, error) { + + // Fetch transport for the provided request user from controller. + tsport, err := d.transportController.NewTransportForUsername(ctx, + requestUser, + ) + if err != nil { + return nil, gtserror.Newf("failed getting transport for %s: %w", requestUser, err) + } - // Get maximum supported remote media size. - maxsz := config.GetMediaRemoteMaxSize() + // Get maximum supported remote media size. + maxsz := config.GetMediaRemoteMaxSize() - // Start processing remote attachment recache. - processing := d.mediaManager.RecacheMedia( - media, - func(ctx context.Context) (io.ReadCloser, error) { - return tsport.DereferenceMedia(ctx, url, int64(maxsz)) + // Recache media with prepared info, + // this will also update media in db. + return d.mediaManager.RecacheMedia( + attach, + func(ctx context.Context) (io.ReadCloser, error) { + return tsport.DereferenceMedia(ctx, url, int64(maxsz)) + }, + ), nil }, ) - - // Perform media load operation. - media, err = processing.Load(ctx) - if err != nil { - err = gtserror.Newf("error loading media %s: %w", media.RemoteURL, err) - - // TODO: in time we should return checkable flags by gtserror.Is___() - // which can determine if loading error should allow remaining placeholder. - } - - return media, err } // updateAttachment handles the case of an existing media attachment @@ -220,3 +212,57 @@ func (d *Dereferencer) updateAttachment( false, ) } + +// processingEmojiSafely provides concurrency-safe processing of +// an emoji with given shortcode+domain. if a copy of the emoji is +// not already being processed, the given 'process' callback will +// be used to generate new *media.ProcessingEmoji{} instance. +func (d *Dereferencer) processMediaSafeley( + ctx context.Context, + remoteURL string, + process func() (*media.ProcessingMedia, error), +) ( + media *gtsmodel.MediaAttachment, + err error, +) { + + // Acquire map lock. + d.derefMediaMu.Lock() + + // Ensure unlock only done once. + unlock := d.derefMediaMu.Unlock + unlock = util.DoOnce(unlock) + defer unlock() + + // Look for an existing deref in progress. + processing, ok := d.derefMedia[remoteURL] + + if !ok { + // Start new processing emoji. + processing, err = process() + if err != nil { + return nil, err + } + + defer func() { + // Remove on finish. + d.derefMediaMu.Lock() + delete(d.derefMedia, remoteURL) + d.derefMediaMu.Unlock() + }() + } + + // Unlock map. + unlock() + + // Perform media load operation. + media, err = processing.Load(ctx) + if err != nil { + err = gtserror.Newf("error loading media %s: %w", remoteURL, err) + + // TODO: in time we should return checkable flags by gtserror.Is___() + // which can determine if loading error should allow remaining placeholder. + } + + return +} |