summaryrefslogtreecommitdiff
path: root/internal/media/processingemoji.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2024-06-26 15:01:16 +0000
committerLibravatar GitHub <noreply@github.com>2024-06-26 16:01:16 +0100
commit21bb324156f582e918a097ea744e52fc21b2ddf4 (patch)
tree50db5cfd42e26224591f59ff62de14a3715677b5 /internal/media/processingemoji.go
parent[docs] restructure federation section (#3038) (diff)
downloadgotosocial-21bb324156f582e918a097ea744e52fc21b2ddf4.tar.xz
[chore] media and emoji refactoring (#3000)
* start updating media manager interface ready for storing attachments / emoji right away * store emoji and media as uncached immediately, then (re-)cache on Processing{}.Load() * remove now unused media workers * fix tests and issues * fix another test! * fix emoji activitypub uri setting behaviour, fix remainder of test compilation issues * fix more tests * fix (most of) remaining tests, add debouncing to repeatedly failing media / emojis * whoops, rebase issue * remove kim's whacky experiments * do some reshuffling, ensure emoji uri gets set * ensure marked as not cached on cleanup * tweaks to media / emoji processing to handle context canceled better * ensure newly fetched emojis actually get set in returned slice * use different varnames to be a bit more obvious * move emoji refresh rate limiting to dereferencer * add exported dereferencer functions for remote media, use these for recaching in processor * add check for nil attachment in updateAttachment() * remove unused emoji and media fields + columns * see previous commit * fix old migrations expecting image_updated_at to exists (from copies of old models) * remove freshness checking code (seems to be broken...) * fix error arg causing nil ptr exception * finish documentating functions with comments, slight tweaks to media / emoji deref error logic * remove some extra unneeded boolean checking * finish writing documentation (code comments) for exported media manager methods * undo changes to migration snapshot gtsmodels, updated failing migration to have its own snapshot * move doesColumnExist() to util.go in migrations package
Diffstat (limited to 'internal/media/processingemoji.go')
-rw-r--r--internal/media/processingemoji.go207
1 files changed, 126 insertions, 81 deletions
diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go
index b62c4f76e..d61043523 100644
--- a/internal/media/processingemoji.go
+++ b/internal/media/processingemoji.go
@@ -24,14 +24,16 @@ import (
"slices"
"codeberg.org/gruf/go-bytesize"
- "codeberg.org/gruf/go-errors/v2"
+ errorsv2 "codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-runners"
"github.com/h2non/filetype"
"github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/regexes"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
"github.com/superseriousbusiness/gotosocial/internal/uris"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
@@ -40,7 +42,6 @@ import (
// various functions for retrieving data from the process.
type ProcessingEmoji struct {
emoji *gtsmodel.Emoji // processing emoji details
- existing bool // indicates whether this is an existing emoji ID being refreshed / recached
newPathID string // new emoji path ID to use when being refreshed
dataFn DataFunc // load-data function, returns media stream
done bool // done is set when process finishes with non ctx canceled type error
@@ -49,61 +50,72 @@ type ProcessingEmoji struct {
mgr *Manager // mgr instance (access to db / storage)
}
-// EmojiID returns the ID of the underlying emoji without blocking processing.
-func (p *ProcessingEmoji) EmojiID() string {
+// ID returns the ID of the underlying emoji.
+func (p *ProcessingEmoji) ID() string {
return p.emoji.ID // immutable, safe outside mutex.
}
// LoadEmoji blocks until the static and fullsize image has been processed, and then returns the completed emoji.
-func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error) {
- // Attempt to load synchronously.
+func (p *ProcessingEmoji) Load(ctx context.Context) (*gtsmodel.Emoji, error) {
emoji, done, err := p.load(ctx)
- if err == nil {
- // No issue, return media.
- return emoji, nil
- }
-
if !done {
- // Provided context was cancelled, e.g. request cancelled
- // early. Queue this item for asynchronous processing.
- log.Warnf(ctx, "reprocessing emoji %s after canceled ctx", p.emoji.ID)
- p.mgr.state.Workers.Media.Queue.Push(p.Process)
- }
-
- return nil, err
-}
-
-// Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error.
-func (p *ProcessingEmoji) Process(ctx context.Context) {
- if _, _, err := p.load(ctx); err != nil {
- log.Errorf(ctx, "error processing emoji: %v", err)
+ // On a context-canceled error (marked as !done), requeue for loading.
+ p.mgr.state.Workers.Dereference.Queue.Push(func(ctx context.Context) {
+ if _, _, err := p.load(ctx); err != nil {
+ log.Errorf(ctx, "error loading emoji: %v", err)
+ }
+ })
}
+ return emoji, err
}
-// load performs a concurrency-safe load of ProcessingEmoji, only marking itself as complete when returned error is NOT a context cancel.
-func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, error) {
- var (
- done bool
- err error
- )
-
+// load is the package private form of load() that is wrapped to catch context canceled.
+func (p *ProcessingEmoji) load(ctx context.Context) (
+ emoji *gtsmodel.Emoji,
+ done bool,
+ err error,
+) {
err = p.proc.Process(func() error {
- if p.done {
+ if done = p.done; done {
// Already proc'd.
return p.err
}
defer func() {
// This is only done when ctx NOT cancelled.
- done = err == nil || !errors.IsV2(err,
+ done = (err == nil || !errorsv2.IsV2(err,
context.Canceled,
context.DeadlineExceeded,
+ ))
+
+ if !done {
+ return
+ }
+
+ // Anything from here, we
+ // need to ensure happens
+ // (i.e. no ctx canceled).
+ ctx = gtscontext.WithValues(
+ context.Background(),
+ ctx, // values
)
+ // On error, clean
+ // downloaded files.
+ if err != nil {
+ p.cleanup(ctx)
+ }
+
if !done {
return
}
+ // Update with latest details, whatever happened.
+ e := p.mgr.state.DB.UpdateEmoji(ctx, p.emoji)
+ if e != nil {
+ log.Errorf(ctx, "error updating emoji in db: %v", e)
+ }
+
// Store final values.
p.done = true
p.err = err
@@ -111,39 +123,31 @@ func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, erro
// Attempt to store media and calculate
// full-size media attachment details.
+ //
+ // This will update p.emoji as it goes.
if err = p.store(ctx); err != nil {
return err
}
// Finish processing by reloading media into
// memory to get dimension and generate a thumb.
+ //
+ // This will update p.emoji as it goes.
if err = p.finish(ctx); err != nil {
- return err
- }
-
- if p.existing {
- // Existing emoji we're updating, so only update.
- err = p.mgr.state.DB.UpdateEmoji(ctx, p.emoji)
- return err
+ return err //nolint:revive
}
- // New emoji media, first time caching.
- err = p.mgr.state.DB.PutEmoji(ctx, p.emoji)
- return err
+ return nil
})
-
- if err != nil {
- return nil, done, err
- }
-
- return p.emoji, done, nil
+ emoji = p.emoji
+ return
}
// store calls the data function attached to p if it hasn't been called yet,
// and updates the underlying attachment fields as necessary. It will then stream
// bytes from p's reader directly into storage so that it can be retrieved later.
func (p *ProcessingEmoji) store(ctx context.Context) error {
- // Load media from provided data fn.
+ // Load media from provided data fun
rc, sz, err := p.dataFn(ctx)
if err != nil {
return gtserror.Newf("error executing data function: %w", err)
@@ -168,8 +172,9 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
// Check that provided size isn't beyond max. We check beforehand
// so that we don't attempt to stream the emoji into storage if not needed.
- if size := bytesize.Size(sz); sz > 0 && size > maxSize {
- return gtserror.Newf("given emoji size %s greater than max allowed %s", size, maxSize)
+ if sz > 0 && sz > int64(maxSize) {
+ sz := bytesize.Size(sz) // improves log readability
+ return gtserror.Newf("given emoji size %s greater than max allowed %s", sz, maxSize)
}
// Prepare to read bytes from
@@ -196,14 +201,14 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
// Initial file size was misreported, so we didn't read
// fully into hdrBuf. Reslice it to the size we did read.
- log.Warnf(ctx,
- "recovered from misreported file size; reported %d; read %d",
- fileSize, n,
- )
hdrBuf = hdrBuf[:n]
+ fileSize = n
+ p.emoji.ImageFileSize = fileSize
}
// Parse file type info from header buffer.
+ // This should only ever error if the buffer
+ // is empty (ie., the attachment is 0 bytes).
info, err := filetype.Match(hdrBuf)
if err != nil {
return gtserror.Newf("error parsing file type: %w", err)
@@ -227,10 +232,13 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
pathID = p.emoji.ID
}
- // Determine instance account ID from already generated image static path.
- instanceAccID := regexes.FilePath.FindStringSubmatch(p.emoji.ImageStaticPath)[1]
+ // Determine instance account ID from generated image static path.
+ instanceAccID, ok := getInstanceAccountID(p.emoji.ImageStaticPath)
+ if !ok {
+ return gtserror.Newf("invalid emoji static path; no instance account id: %s", p.emoji.ImageStaticPath)
+ }
- // Calculate emoji file path.
+ // Calculate final media attachment file path.
p.emoji.ImagePath = uris.StoragePathForAttachment(
instanceAccID,
string(TypeEmoji),
@@ -239,32 +247,32 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
info.Extension,
)
- // This shouldn't already exist, but we do a check as it's worth logging.
+ // File shouldn't already exist in storage at this point,
+ // but we do a check as it's worth logging / cleaning up.
if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImagePath); have {
- log.Warnf(ctx, "emoji already exists at storage path: %s", p.emoji.ImagePath)
+ log.Warnf(ctx, "emoji already exists at: %s", p.emoji.ImagePath)
// Attempt to remove existing emoji at storage path (might be broken / out-of-date)
if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil {
- return gtserror.Newf("error removing emoji from storage: %v", err)
+ return gtserror.Newf("error removing emoji %s from storage: %v", p.emoji.ImagePath, err)
}
}
// Write the final image reader stream to our storage.
- wroteSize, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r)
+ sz, err = p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r)
if err != nil {
return gtserror.Newf("error writing emoji to storage: %w", err)
}
- // Once again check size in case none was provided previously.
- if size := bytesize.Size(wroteSize); size > maxSize {
- if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil {
- log.Errorf(ctx, "error removing too-large-emoji from storage: %v", err)
- }
-
- return gtserror.Newf("calculated emoji size %s greater than max allowed %s", size, maxSize)
+ // Perform final size check in case none was
+ // given previously, or size was mis-reported.
+ // (error here will later perform p.cleanup()).
+ if sz > int64(maxSize) {
+ sz := bytesize.Size(sz) // improves log readability
+ return gtserror.Newf("written emoji size %s greater than max allowed %s", sz, maxSize)
}
- // Fill in remaining attachment data now it's stored.
+ // Fill in remaining emoji data now it's stored.
p.emoji.ImageURL = uris.URIForAttachment(
instanceAccID,
string(TypeEmoji),
@@ -273,14 +281,14 @@ func (p *ProcessingEmoji) store(ctx context.Context) error {
info.Extension,
)
p.emoji.ImageContentType = info.MIME.Value
- p.emoji.ImageFileSize = int(wroteSize)
+ p.emoji.ImageFileSize = int(sz)
p.emoji.Cached = util.Ptr(true)
return nil
}
func (p *ProcessingEmoji) finish(ctx context.Context) error {
- // Fetch a stream to the original file in storage.
+ // Get a stream to the original file for further processing.
rc, err := p.mgr.state.Storage.GetStream(ctx, p.emoji.ImagePath)
if err != nil {
return gtserror.Newf("error loading file from storage: %w", err)
@@ -293,32 +301,69 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error {
return gtserror.Newf("error decoding image: %w", err)
}
- // The image should be in-memory by now.
+ // staticImg should be in-memory by
+ // now so we're done with storage.
if err := rc.Close(); err != nil {
return gtserror.Newf("error closing file: %w", err)
}
- // This shouldn't already exist, but we do a check as it's worth logging.
+ // Static img shouldn't exist in storage at this point,
+ // but we do a check as it's worth logging / cleaning up.
if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImageStaticPath); have {
- log.Warnf(ctx, "static emoji already exists at storage path: %s", p.emoji.ImagePath)
+ log.Warnf(ctx, "static emoji already exists at: %s", p.emoji.ImageStaticPath)
- // Attempt to remove static existing emoji at storage path (might be broken / out-of-date)
+ // Attempt to remove existing thumbnail (might be broken / out-of-date).
if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil {
- return gtserror.Newf("error removing static emoji from storage: %v", err)
+ return gtserror.Newf("error removing static emoji %s from storage: %v", p.emoji.ImageStaticPath, err)
}
}
- // Create an emoji PNG encoder stream.
+ // Create emoji PNG encoder stream.
enc := staticImg.ToPNG()
- // Stream-encode the PNG static image into storage.
+ // Stream-encode the PNG static emoji image into our storage driver.
sz, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImageStaticPath, enc)
if err != nil {
return gtserror.Newf("error stream-encoding static emoji to storage: %w", err)
}
- // Set written image size.
+ // Set final written thumb size.
p.emoji.ImageStaticFileSize = int(sz)
return nil
}
+
+// cleanup will remove any traces of processing emoji from storage,
+// and perform any other necessary cleanup steps after failure.
+func (p *ProcessingEmoji) cleanup(ctx context.Context) {
+ var err error
+
+ if p.emoji.ImagePath != "" {
+ // Ensure emoji file at path is deleted from storage.
+ err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath)
+ if err != nil && !storage.IsNotFound(err) {
+ log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImagePath, err)
+ }
+ }
+
+ if p.emoji.ImageStaticPath != "" {
+ // Ensure emoji static file at path is deleted from storage.
+ err = p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath)
+ if err != nil && !storage.IsNotFound(err) {
+ log.Errorf(ctx, "error deleting %s: %v", p.emoji.ImageStaticPath, err)
+ }
+ }
+
+ // Ensure marked as not cached.
+ p.emoji.Cached = util.Ptr(false)
+}
+
+// getInstanceAccountID determines the instance account ID from
+// emoji static image storage path. returns false on failure.
+func getInstanceAccountID(staticPath string) (string, bool) {
+ matches := regexes.FilePath.FindStringSubmatch(staticPath)
+ if len(matches) < 2 {
+ return "", false
+ }
+ return matches[1], true
+}