diff options
author | 2024-06-26 15:01:16 +0000 | |
---|---|---|
committer | 2024-06-26 16:01:16 +0100 | |
commit | 21bb324156f582e918a097ea744e52fc21b2ddf4 (patch) | |
tree | 50db5cfd42e26224591f59ff62de14a3715677b5 /internal/media/processingmedia.go | |
parent | [docs] restructure federation section (#3038) (diff) | |
download | gotosocial-21bb324156f582e918a097ea744e52fc21b2ddf4.tar.xz |
[chore] media and emoji refactoring (#3000)
* start updating media manager interface ready for storing attachments / emoji right away
* store emoji and media as uncached immediately, then (re-)cache on Processing{}.Load()
* remove now unused media workers
* fix tests and issues
* fix another test!
* fix emoji activitypub uri setting behaviour, fix remainder of test compilation issues
* fix more tests
* fix (most of) remaining tests, add debouncing to repeatedly failing media / emojis
* whoops, rebase issue
* remove kim's whacky experiments
* do some reshuffling, ensure emoji uri gets set
* ensure marked as not cached on cleanup
* tweaks to media / emoji processing to handle context canceled better
* ensure newly fetched emojis actually get set in returned slice
* use different varnames to be a bit more obvious
* move emoji refresh rate limiting to dereferencer
* add exported dereferencer functions for remote media, use these for recaching in processor
* add check for nil attachment in updateAttachment()
* remove unused emoji and media fields + columns
* see previous commit
* fix old migrations expecting image_updated_at to exists (from copies of old models)
* remove freshness checking code (seems to be broken...)
* fix error arg causing nil ptr exception
* finish documentating functions with comments, slight tweaks to media / emoji deref error logic
* remove some extra unneeded boolean checking
* finish writing documentation (code comments) for exported media manager methods
* undo changes to migration snapshot gtsmodels, updated failing migration to have its own snapshot
* move doesColumnExist() to util.go in migrations package
Diffstat (limited to 'internal/media/processingmedia.go')
-rw-r--r-- | internal/media/processingmedia.go | 287 |
1 files changed, 125 insertions, 162 deletions
diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index b65e3cd48..466c3443f 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -19,6 +19,7 @@ package media import ( "bytes" + "cmp" "context" "image/jpeg" "io" @@ -29,6 +30,7 @@ import ( terminator "codeberg.org/superseriousbusiness/exif-terminator" "github.com/disintegration/imaging" "github.com/h2non/filetype" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -41,18 +43,16 @@ import ( // currently being processed. It exposes functions // for retrieving data from the process. type ProcessingMedia struct { - media *gtsmodel.MediaAttachment // processing media attachment details - dataFn DataFunc // load-data function, returns media stream - recache bool // recaching existing (uncached) media - done bool // done is set when process finishes with non ctx canceled type error - proc runners.Processor // proc helps synchronize only a singular running processing instance - err error // error stores permanent error value when done - mgr *Manager // mgr instance (access to db / storage) + media *gtsmodel.MediaAttachment // processing media attachment details + dataFn DataFunc // load-data function, returns media stream + done bool // done is set when process finishes with non ctx canceled type error + proc runners.Processor // proc helps synchronize only a singular running processing instance + err error // error stores permanent error value when done + mgr *Manager // mgr instance (access to db / storage) } -// AttachmentID returns the ID of the underlying -// media attachment without blocking processing. -func (p *ProcessingMedia) AttachmentID() string { +// ID returns the ID of the underlying media. +func (p *ProcessingMedia) ID() string { return p.media.ID // immutable, safe outside mutex. } @@ -65,124 +65,102 @@ func (p *ProcessingMedia) AttachmentID() string { // will still be returned in that case, but it will // only be partially complete and should be treated // as a placeholder. -func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAttachment, error) { - // Attempt to load synchronously. +func (p *ProcessingMedia) Load(ctx context.Context) (*gtsmodel.MediaAttachment, error) { media, done, err := p.load(ctx) - if err == nil { - // No issue, return media. - return media, nil - } - if !done { - // Provided context was cancelled, - // e.g. request aborted early before - // its context could be used to finish - // loading the attachment. Enqueue for - // asynchronous processing, which will - // use a background context. - log.Warnf(ctx, "reprocessing media %s after canceled ctx", p.media.ID) - p.mgr.state.Workers.Media.Queue.Push(p.Process) + // On a context-canceled error (marked as !done), requeue for loading. + p.mgr.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { + if _, _, err := p.load(ctx); err != nil { + log.Errorf(ctx, "error loading media: %v", err) + } + }) } - - // Media could not be retrieved FULLY, - // but partial attachment should be present. return media, err } -// Process allows the receiving object to fit the -// runners.WorkerFunc signature. It performs a -// (blocking) load and logs on error. -func (p *ProcessingMedia) Process(ctx context.Context) { - if _, _, err := p.load(ctx); err != nil { - log.Errorf(ctx, "error(s) processing media: %v", err) - } -} - -// load performs a concurrency-safe load of ProcessingMedia, only -// marking itself as complete when returned error is NOT a context cancel. -func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment, bool, error) { - var ( - done bool - err error - ) - +// load is the package private form of load() that is wrapped to catch context canceled. +func (p *ProcessingMedia) load(ctx context.Context) ( + media *gtsmodel.MediaAttachment, + done bool, + err error, +) { err = p.proc.Process(func() error { - if p.done { + if done = p.done; done { // Already proc'd. return p.err } defer func() { // This is only done when ctx NOT cancelled. - done = err == nil || !errorsv2.IsV2(err, + done = (err == nil || !errorsv2.IsV2(err, context.Canceled, context.DeadlineExceeded, - ) + )) if !done { return } + // Anything from here, we + // need to ensure happens + // (i.e. no ctx canceled). + ctx = gtscontext.WithValues( + context.Background(), + ctx, // values + ) + + // On error or unknown media types, perform error cleanup. + if err != nil || p.media.Type == gtsmodel.FileTypeUnknown { + p.cleanup(ctx) + } + + // Update with latest details, whatever happened. + e := p.mgr.state.DB.UpdateAttachment(ctx, p.media) + if e != nil { + log.Errorf(ctx, "error updating media in db: %v", e) + } + // Store final values. p.done = true p.err = err }() - // Gather errors as we proceed. - var errs = gtserror.NewMultiError(4) + // TODO: in time update this + // to perhaps follow a similar + // freshness window to statuses + // / accounts? But that's a big + // maybe, media don't change in + // the same way so this is largely + // just to slow down fail retries. + const maxfreq = 6 * time.Hour + + // Check whether media is uncached but repeatedly failing, + // specifically limit the frequency at which we allow this. + if !p.media.UpdatedAt.Equal(p.media.CreatedAt) && // i.e. not new + p.media.UpdatedAt.Add(maxfreq).Before(time.Now()) { + return nil + } // Attempt to store media and calculate // full-size media attachment details. // // This will update p.media as it goes. - storeErr := p.store(ctx) - if storeErr != nil { - errs.Append(storeErr) + if err = p.store(ctx); err != nil { + return err } // Finish processing by reloading media into // memory to get dimension and generate a thumb. // // This will update p.media as it goes. - if finishErr := p.finish(ctx); finishErr != nil { - errs.Append(finishErr) - } - - // If this isn't a file we were able to process, - // we may have partially stored it (eg., it's a - // jpeg, which is fine, but streaming it to storage - // was interrupted halfway through and so it was - // never decoded). Try to clean up in this case. - if p.media.Type == gtsmodel.FileTypeUnknown { - deleteErr := p.mgr.state.Storage.Delete(ctx, p.media.File.Path) - if deleteErr != nil && !storage.IsNotFound(deleteErr) { - errs.Append(deleteErr) - } - } - - var dbErr error - switch { - case !p.recache: - // First time caching this attachment, insert it. - dbErr = p.mgr.state.DB.PutAttachment(ctx, p.media) - - case p.recache && len(errs) == 0: - // Existing attachment we're recaching, update it. - // - // (We only want to update if everything went OK so far, - // otherwise we'd better leave previous version alone.) - dbErr = p.mgr.state.DB.UpdateAttachment(ctx, p.media) + if err = p.finish(ctx); err != nil { + return err //nolint:revive } - if dbErr != nil { - errs.Append(dbErr) - } - - err = errs.Combine() - return err + return nil }) - - return p.media, done, err + media = p.media + return } // store calls the data function attached to p if it hasn't been called yet, @@ -231,10 +209,6 @@ func (p *ProcessingMedia) store(ctx context.Context) error { // Initial file size was misreported, so we didn't read // fully into hdrBuf. Reslice it to the size we did read. - log.Warnf(ctx, - "recovered from misreported file size; reported %d; read %d", - fileSize, n, - ) hdrBuf = hdrBuf[:n] fileSize = n p.media.File.FileSize = fileSize @@ -273,20 +247,13 @@ func (p *ProcessingMedia) store(ctx context.Context) error { } default: - // The file is not a supported format that - // we can process, so we can't do much with it. - log.Warnf(ctx, - "media extension '%s' not officially supported, will be processed as "+ - "type '%s' with minimal metadata, and will not be cached locally", - info.Extension, gtsmodel.FileTypeUnknown, - ) - - // Don't bother storing this. + // The file is not a supported format that we can process, so we can't do much with it. + log.Warnf(ctx, "unsupported media extension '%s'; not caching locally", info.Extension) store = false } // Fill in correct attachment - // data now we're parsed it. + // data now we've parsed it. p.media.URL = uris.URIForAttachment( p.media.AccountID, string(TypeAttachment), @@ -295,15 +262,11 @@ func (p *ProcessingMedia) store(ctx context.Context) error { info.Extension, ) - // Prefer discovered mime type, fall back to - // generic "this contains some bytes" type. - mime := info.MIME.Value - if mime == "" { - mime = "application/octet-stream" - } + // Prefer discovered MIME, fallback to generic data stream. + mime := cmp.Or(info.MIME.Value, "application/octet-stream") p.media.File.ContentType = mime - // Calculate attachment file path. + // Calculate final media attachment file path. p.media.File.Path = uris.StoragePathForAttachment( p.media.AccountID, string(TypeAttachment), @@ -323,23 +286,23 @@ func (p *ProcessingMedia) store(ctx context.Context) error { // File shouldn't already exist in storage at this point, // but we do a check as it's worth logging / cleaning up. if have, _ := p.mgr.state.Storage.Has(ctx, p.media.File.Path); have { - log.Warnf(ctx, "media already exists at storage path: %s", p.media.File.Path) + log.Warnf(ctx, "media already exists at: %s", p.media.File.Path) // Attempt to remove existing media at storage path (might be broken / out-of-date) if err := p.mgr.state.Storage.Delete(ctx, p.media.File.Path); err != nil { - return gtserror.Newf("error removing media from storage: %v", err) + return gtserror.Newf("error removing media %s from storage: %v", p.media.File.Path, err) } } - // Write the final reader stream to our storage. - wroteSize, err := p.mgr.state.Storage.PutStream(ctx, p.media.File.Path, r) + // Write the final reader stream to our storage driver. + sz, err = p.mgr.state.Storage.PutStream(ctx, p.media.File.Path, r) if err != nil { return gtserror.Newf("error writing media to storage: %w", err) } // Set actual written size // as authoritative file size. - p.media.File.FileSize = int(wroteSize) + p.media.File.FileSize = int(sz) // We can now consider this cached. p.media.Cached = util.Ptr(true) @@ -348,36 +311,9 @@ func (p *ProcessingMedia) store(ctx context.Context) error { } func (p *ProcessingMedia) finish(ctx context.Context) error { - // Make a jolly assumption about thumbnail type. - p.media.Thumbnail.ContentType = mimeImageJpeg - - // Calculate attachment thumbnail file path - p.media.Thumbnail.Path = uris.StoragePathForAttachment( - p.media.AccountID, - string(TypeAttachment), - string(SizeSmall), - p.media.ID, - // Always encode attachment - // thumbnails as jpg. - "jpg", - ) - - // Calculate attachment thumbnail serve path. - p.media.Thumbnail.URL = uris.URIForAttachment( - p.media.AccountID, - string(TypeAttachment), - string(SizeSmall), - p.media.ID, - // Always encode attachment - // thumbnails as jpg. - "jpg", - ) - - // If original file hasn't been stored, there's - // likely something wrong with the data, or we - // don't want to store it. Skip everything else. + // Nothing else to do if + // media was not cached. if !*p.media.Cached { - p.media.Processing = gtsmodel.ProcessingStatusProcessed return nil } @@ -398,8 +334,7 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { // .jpeg, .gif, .webp image type case mimeImageJpeg, mimeImageGif, mimeImageWebp: - fullImg, err = decodeImage( - rc, + fullImg, err = decodeImage(rc, imaging.AutoOrientation(true), ) if err != nil { @@ -451,9 +386,9 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { } // Set full-size dimensions in attachment info. - p.media.FileMeta.Original.Width = int(fullImg.Width()) - p.media.FileMeta.Original.Height = int(fullImg.Height()) - p.media.FileMeta.Original.Size = int(fullImg.Size()) + p.media.FileMeta.Original.Width = fullImg.Width() + p.media.FileMeta.Original.Height = fullImg.Height() + p.media.FileMeta.Original.Size = fullImg.Size() p.media.FileMeta.Original.Aspect = fullImg.AspectRatio() // Get smaller thumbnail image @@ -475,44 +410,72 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { p.media.Blurhash = hash } - // Thumbnail shouldn't already exist in storage at this point, + // Thumbnail shouldn't exist in storage at this point, // but we do a check as it's worth logging / cleaning up. if have, _ := p.mgr.state.Storage.Has(ctx, p.media.Thumbnail.Path); have { - log.Warnf(ctx, "thumbnail already exists at storage path: %s", p.media.Thumbnail.Path) + log.Warnf(ctx, "thumbnail already exists at: %s", p.media.Thumbnail.Path) - // Attempt to remove existing thumbnail at storage path (might be broken / out-of-date) + // Attempt to remove existing thumbnail (might be broken / out-of-date). if err := p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path); err != nil { - return gtserror.Newf("error removing thumbnail from storage: %v", err) + return gtserror.Newf("error removing thumbnail %s from storage: %v", p.media.Thumbnail.Path, err) } } // Create a thumbnail JPEG encoder stream. enc := thumbImg.ToJPEG(&jpeg.Options{ + // Good enough for // a thumbnail. Quality: 70, }) - // Stream-encode the JPEG thumbnail image into storage. + // Stream-encode the JPEG thumbnail image into our storage driver. sz, err := p.mgr.state.Storage.PutStream(ctx, p.media.Thumbnail.Path, enc) if err != nil { return gtserror.Newf("error stream-encoding thumbnail to storage: %w", err) } + // Set final written thumb size. + p.media.Thumbnail.FileSize = int(sz) + // Set thumbnail dimensions in attachment info. p.media.FileMeta.Small = gtsmodel.Small{ - Width: int(thumbImg.Width()), - Height: int(thumbImg.Height()), - Size: int(thumbImg.Size()), + Width: thumbImg.Width(), + Height: thumbImg.Height(), + Size: thumbImg.Size(), Aspect: thumbImg.AspectRatio(), } - // Set written image size. - p.media.Thumbnail.FileSize = int(sz) - - // Finally set the attachment as processed and update time. + // Finally set the attachment as processed. p.media.Processing = gtsmodel.ProcessingStatusProcessed - p.media.File.UpdatedAt = time.Now() return nil } + +// cleanup will remove any traces of processing media from storage. +// and perform any other necessary cleanup steps after failure. +func (p *ProcessingMedia) cleanup(ctx context.Context) { + var err error + + if p.media.File.Path != "" { + // Ensure media file at path is deleted from storage. + err = p.mgr.state.Storage.Delete(ctx, p.media.File.Path) + if err != nil && !storage.IsNotFound(err) { + log.Errorf(ctx, "error deleting %s: %v", p.media.File.Path, err) + } + } + + if p.media.Thumbnail.Path != "" { + // Ensure media thumbnail at path is deleted from storage. + err = p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path) + if err != nil && !storage.IsNotFound(err) { + log.Errorf(ctx, "error deleting %s: %v", p.media.Thumbnail.Path, err) + } + } + + // Also ensure marked as unknown and finished + // processing so gets inserted as placeholder URL. + p.media.Processing = gtsmodel.ProcessingStatusProcessed + p.media.Type = gtsmodel.FileTypeUnknown + p.media.Cached = util.Ptr(false) +} |