diff options
Diffstat (limited to 'internal/media/processingemoji.go')
-rw-r--r-- | internal/media/processingemoji.go | 207 |
1 files changed, 126 insertions, 81 deletions
diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go index b62c4f76e..d61043523 100644 --- a/internal/media/processingemoji.go +++ b/internal/media/processingemoji.go @@ -24,14 +24,16 @@ import ( "slices" "codeberg.org/gruf/go-bytesize" - "codeberg.org/gruf/go-errors/v2" + errorsv2 "codeberg.org/gruf/go-errors/v2" "codeberg.org/gruf/go-runners" "github.com/h2non/filetype" "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/regexes" + "github.com/superseriousbusiness/gotosocial/internal/storage" "github.com/superseriousbusiness/gotosocial/internal/uris" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -40,7 +42,6 @@ import ( // various functions for retrieving data from the process. type ProcessingEmoji struct { emoji *gtsmodel.Emoji // processing emoji details - existing bool // indicates whether this is an existing emoji ID being refreshed / recached newPathID string // new emoji path ID to use when being refreshed dataFn DataFunc // load-data function, returns media stream done bool // done is set when process finishes with non ctx canceled type error @@ -49,61 +50,72 @@ type ProcessingEmoji struct { mgr *Manager // mgr instance (access to db / storage) } -// EmojiID returns the ID of the underlying emoji without blocking processing. -func (p *ProcessingEmoji) EmojiID() string { +// ID returns the ID of the underlying emoji. +func (p *ProcessingEmoji) ID() string { return p.emoji.ID // immutable, safe outside mutex. } // LoadEmoji blocks until the static and fullsize image has been processed, and then returns the completed emoji. -func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error) { - // Attempt to load synchronously. +func (p *ProcessingEmoji) Load(ctx context.Context) (*gtsmodel.Emoji, error) { emoji, done, err := p.load(ctx) - if err == nil { - // No issue, return media. - return emoji, nil - } - if !done { - // Provided context was cancelled, e.g. request cancelled - // early. Queue this item for asynchronous processing. - log.Warnf(ctx, "reprocessing emoji %s after canceled ctx", p.emoji.ID) - p.mgr.state.Workers.Media.Queue.Push(p.Process) - } - - return nil, err -} - -// Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error. -func (p *ProcessingEmoji) Process(ctx context.Context) { - if _, _, err := p.load(ctx); err != nil { - log.Errorf(ctx, "error processing emoji: %v", err) + // On a context-canceled error (marked as !done), requeue for loading. + p.mgr.state.Workers.Dereference.Queue.Push(func(ctx context.Context) { + if _, _, err := p.load(ctx); err != nil { + log.Errorf(ctx, "error loading emoji: %v", err) + } + }) } + return emoji, err } -// load performs a concurrency-safe load of ProcessingEmoji, only marking itself as complete when returned error is NOT a context cancel. -func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, error) { - var ( - done bool - err error - ) - +// load is the package private form of load() that is wrapped to catch context canceled. +func (p *ProcessingEmoji) load(ctx context.Context) ( + emoji *gtsmodel.Emoji, + done bool, + err error, +) { err = p.proc.Process(func() error { - if p.done { + if done = p.done; done { // Already proc'd. return p.err } defer func() { // This is only done when ctx NOT cancelled. - done = err == nil || !errors.IsV2(err, + done = (err == nil || !errorsv2.IsV2(err, context.Canceled, context.DeadlineExceeded, + )) + + if !done { + return + } + + // Anything from here, we + // need to ensure happens + // (i.e. no ctx canceled). + ctx = gtscontext.WithValues( + context.Background(), + ctx, // values ) + // On error, clean + // downloaded files. + if err != nil { + p.cleanup(ctx) + } + if !done { return } + // Update with latest details, whatever happened. + e := p.mgr.state.DB.UpdateEmoji(ctx, p.emoji) + if e != nil { + log.Errorf(ctx, "error updating emoji in db: %v", e) + } + // Store final values. p.done = true p.err = err @@ -111,39 +123,31 @@ func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, erro // Attempt to store media and calculate // full-size media attachment details. + // + // This will update p.emoji as it goes. if err = p.store(ctx); err != nil { return err } // Finish processing by reloading media into // memory to get dimension and generate a thumb. + // + // This will update p.emoji as it goes. if err = p.finish(ctx); err != nil { - return err - } - - if p.existing { - // Existing emoji we're updating, so only update. - err = p.mgr.state.DB.UpdateEmoji(ctx, p.emoji) - return err + return err //nolint:revive } - // New emoji media, first time caching. - err = p.mgr.state.DB.PutEmoji(ctx, p.emoji) - return err + return nil }) - - if err != nil { - return nil, done, err - } - - return p.emoji, done, nil + emoji = p.emoji + return } // store calls the data function attached to p if it hasn't been called yet, // and updates the underlying attachment fields as necessary. It will then stream // bytes from p's reader directly into storage so that it can be retrieved later. func (p *ProcessingEmoji) store(ctx context.Context) error { - // Load media from provided data fn. + // Load media from provided data fun rc, sz, err := p.dataFn(ctx) if err != nil { return gtserror.Newf("error executing data function: %w", err) @@ -168,8 +172,9 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { // Check that provided size isn't beyond max. We check beforehand // so that we don't attempt to stream the emoji into storage if not needed. - if size := bytesize.Size(sz); sz > 0 && size > maxSize { - return gtserror.Newf("given emoji size %s greater than max allowed %s", size, maxSize) + if sz > 0 && sz > int64(maxSize) { + sz := bytesize.Size(sz) // improves log readability + return gtserror.Newf("given emoji size %s greater than max allowed %s", sz, maxSize) } // Prepare to read bytes from @@ -196,14 +201,14 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { // Initial file size was misreported, so we didn't read // fully into hdrBuf. Reslice it to the size we did read. - log.Warnf(ctx, - "recovered from misreported file size; reported %d; read %d", - fileSize, n, - ) hdrBuf = hdrBuf[:n] + fileSize = n + p.emoji.ImageFileSize = fileSize } // Parse file type info from header buffer. + // This should only ever error if the buffer + // is empty (ie., the attachment is 0 bytes). info, err := filetype.Match(hdrBuf) if err != nil { return gtserror.Newf("error parsing file type: %w", err) @@ -227,10 +232,13 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { pathID = p.emoji.ID } - // Determine instance account ID from already generated image static path. - instanceAccID := regexes.FilePath.FindStringSubmatch(p.emoji.ImageStaticPath)[1] + // Determine instance account ID from generated image static path. + instanceAccID, ok := getInstanceAccountID(p.emoji.ImageStaticPath) + if !ok { + return gtserror.Newf("invalid emoji static path; no instance account id: %s", p.emoji.ImageStaticPath) + } - // Calculate emoji file path. + // Calculate final media attachment file path. p.emoji.ImagePath = uris.StoragePathForAttachment( instanceAccID, string(TypeEmoji), @@ -239,32 +247,32 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { info.Extension, ) - // This shouldn't already exist, but we do a check as it's worth logging. + // File shouldn't already exist in storage at this point, + // but we do a check as it's worth logging / cleaning up. if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImagePath); have { - log.Warnf(ctx, "emoji already exists at storage path: %s", p.emoji.ImagePath) + log.Warnf(ctx, "emoji already exists at: %s", p.emoji.ImagePath) // Attempt to remove existing emoji at storage path (might be broken / out-of-date) if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil { - return gtserror.Newf("error removing emoji from storage: %v", err) + return gtserror.Newf("error removing emoji %s from storage: %v", p.emoji.ImagePath, err) } } // Write the final image reader stream to our storage. - wroteSize, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r) + sz, err = p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r) if err != nil { return gtserror.Newf("error writing emoji to storage: %w", err) } - // Once again check size in case none was provided previously. - if size := bytesize.Size(wroteSize); size > maxSize { - if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil { - log.Errorf(ctx, "error removing too-large-emoji from storage: %v", err) - } - - return gtserror.Newf("calculated emoji size %s greater than max allowed %s", size, maxSize) + // Perform final size check in case none was + // given previously, or size was mis-reported. + // (error here will later perform p.cleanup()). + if sz > int64(maxSize) { + sz := bytesize.Size(sz) // improves log readability + return gtserror.Newf("written emoji size %s greater than max allowed %s", sz, maxSize) } - // Fill in remaining attachment data now it's stored. + // Fill in remaining emoji data now it's stored. p.emoji.ImageURL = uris.URIForAttachment( instanceAccID, string(TypeEmoji), @@ -273,14 +281,14 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { info.Extension, ) p.emoji.ImageContentType = info.MIME.Value - p.emoji.ImageFileSize = int(wroteSize) + p.emoji.ImageFileSize = int(sz) p.emoji.Cached = util.Ptr(true) return nil } func (p *ProcessingEmoji) finish(ctx context.Context) error { - // Fetch a stream to the original file in storage. + // Get a stream to the original file for further processing. rc, err := p.mgr.state.Storage.GetStream(ctx, p.emoji.ImagePath) if err != nil { return gtserror.Newf("error loading file from storage: %w", err) @@ -293,32 +301,69 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error { return gtserror.Newf("error decoding image: %w", err) } - // The image should be in-memory by now. + // staticImg should be in-memory by + // now so we're done with storage. if err := rc.Close(); err != nil { return gtserror.Newf("error closing file: %w", err) } - // This shouldn't already exist, but we do a check as it's worth logging. + // Static img shouldn't exist in storage at this point, + // but we do a check as it's worth logging / cleaning up. if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImageStaticPath); have { - log.Warnf(ctx, "static emoji already exists at storage path: %s", p.emoji.ImagePath) + log.Warnf(ctx, "static emoji already exists at: %s", p.emoji.ImageStaticPath) - // Attempt to remove static existing emoji at storage path (might be broken / out-of-date) + // Attempt to remove existing thumbnail (might be broken / out-of-date). if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil { - return gtserror.Newf("error removing static emoji from storage: %v", err) + return gtserror.Newf("error removing static emoji %s from storage: %v", p.emoji.ImageStaticPath, err) } } - // Create an emoji PNG encoder stream. + // Create emoji PNG encoder stream. enc := staticImg.ToPNG() - // Stream-encode the PNG static image into storage. + // Stream-encode the PNG static emoji image into our storage driver. sz, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImageStaticPath, enc) if err != nil { return gtserror.Newf("error stream-encoding static emoji to storage: %w", err) } - // Set written image size. + // Set final written thumb size. p.emoji.ImageStaticFileSize = int(sz) return nil } + +// cleanup will remove any traces of processing emoji from storage, +// and perform any other necessary cleanup steps after failure. +func (p *ProcessingEmoji) cleanup(ctx context.Context) { + var err error + + if p.emoji.ImagePath != "" { + // Ensure emoji file at path is deleted from storage. + err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath) + if err != nil && !storage.IsNotFound(err) { + log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImagePath, err) + } + } + + if p.emoji.ImageStaticPath != "" { + // Ensure emoji static file at path is deleted from storage. + err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath) + if err != nil && !storage.IsNotFound(err) { + log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImageStaticPath, err) + } + } + + // Ensure marked as not cached. + p.emoji.Cached = util.Ptr(false) +} + +// getInstanceAccountID determines the instance account ID from +// emoji static image storage path. returns false on failure. +func getInstanceAccountID(staticPath string) (string, bool) { + matches := regexes.FilePath.FindStringSubmatch(staticPath) + if len(matches) < 2 { + return "", false + } + return matches[1], true +} |