diff options
author | 2023-02-13 18:40:48 +0000 | |
---|---|---|
committer | 2023-02-13 18:40:48 +0000 | |
commit | acc95923da555b2bf17a5638e62e533218c5840a (patch) | |
tree | 7df5d0636137efa5b49298a8f0ced81d35767a5b /internal/media/processingemoji.go | |
parent | [docs] move federating with gotosocial documentation into single file (#1494) (diff) | |
download | gotosocial-acc95923da555b2bf17a5638e62e533218c5840a.tar.xz |
[performance] processing media and scheduled jobs improvements (#1482)
* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling
* improved code comment
* fix worker tryUntil function, update go-runners/go-sched
* make preprocess functions package public, use these where possible to stop doubled up processing
* remove separate emoji worker pool
* limit calls to time.Now() during media preprocessing
* use Processor{} to manage singular runtime of processing media
* ensure workers get started when media manager is used
* improved error setting in processing media, fix media test
* port changes from processingmedia to processing emoji
* finish code commenting
* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced
* linterrrrrrrrrrrrrrrr
---------
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/media/processingemoji.go')
-rw-r--r-- | internal/media/processingemoji.go | 240 |
1 files changed, 72 insertions, 168 deletions
diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go index b68c9dfe1..0a36174b0 100644 --- a/internal/media/processingemoji.go +++ b/internal/media/processingemoji.go @@ -21,18 +21,15 @@ package media import ( "bytes" "context" - "errors" "fmt" "io" - "sync" - "time" "codeberg.org/gruf/go-bytesize" - gostore "codeberg.org/gruf/go-store/v2/storage" + "codeberg.org/gruf/go-errors/v2" + "codeberg.org/gruf/go-runners" "github.com/h2non/filetype" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/uris" ) @@ -46,9 +43,10 @@ type ProcessingEmoji struct { newPathID string // new emoji path ID to use if refreshed dataFn DataFunc // load-data function, returns media stream postFn PostDataCallbackFunc // post data callback function - err error // error encountered during processing - manager *manager // manager instance (access to db / storage) - once sync.Once // once ensures processing only occurs once + done bool // done is set when process finishes with non ctx canceled type error + proc runners.Processor // proc helps synchronize only a singular running processing instance + err error // error stores permanent error value when done + mgr *manager // mgr instance (access to db / storage) } // EmojiID returns the ID of the underlying emoji without blocking processing. @@ -56,40 +54,72 @@ func (p *ProcessingEmoji) EmojiID() string { return p.emoji.ID // immutable, safe outside mutex. } -// LoadEmoji blocks until the static and fullsize image -// has been processed, and then returns the completed emoji. +// LoadEmoji blocks until the static and fullsize image has been processed, and then returns the completed emoji. func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error) { - // only process once. - p.once.Do(func() { - var err error + // Attempt to load synchronously. + emoji, done, err := p.load(ctx) + + if err == nil { + // No issue, return media. + return emoji, nil + } + + if !done { + // Provided context was cancelled, e.g. request cancelled + // early. Queue this item for asynchronous processing. + log.Warnf("reprocessing emoji %s after canceled ctx", p.emoji.ID) + go p.mgr.state.Workers.Media.Enqueue(p.Process) + } + + return nil, err +} + +// Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error. +func (p *ProcessingEmoji) Process(ctx context.Context) { + if _, _, err := p.load(ctx); err != nil { + log.Errorf("error processing emoji: %v", err) + } +} + +// load performs a concurrency-safe load of ProcessingEmoji, only marking itself as complete when returned error is NOT a context cancel. +func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, error) { + var ( + done bool + err error + ) + + err = p.proc.Process(func() error { + if p.done { + // Already proc'd. + return p.err + } defer func() { - if r := recover(); r != nil { - if err != nil { - rOld := r // wrap the panic so we don't lose existing returned error - r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) - } - - // Catch any panics and wrap as error. - err = fmt.Errorf("caught panic: %v", r) + // This is only done when ctx NOT cancelled. + done = err == nil || !errors.Is(err, + context.Canceled, + context.DeadlineExceeded, + ) + + if !done { + return } - if err != nil { - // Store error. - p.err = err - } + // Store final values. + p.done = true + p.err = err }() // Attempt to store media and calculate // full-size media attachment details. if err = p.store(ctx); err != nil { - return + return err } // Finish processing by reloading media into // memory to get dimension and generate a thumb. if err = p.finish(ctx); err != nil { - return + return err } if p.refresh { @@ -110,20 +140,20 @@ func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error } // Existing emoji we're refreshing, so only need to update. - _, err = p.manager.db.UpdateEmoji(ctx, p.emoji, columns...) - return + _, err = p.mgr.state.DB.UpdateEmoji(ctx, p.emoji, columns...) + return err } // New emoji media, first time caching. - err = p.manager.db.PutEmoji(ctx, p.emoji) - return //nolint shutup linter i like this here + err = p.mgr.state.DB.PutEmoji(ctx, p.emoji) + return err }) - if p.err != nil { - return nil, p.err + if err != nil { + return nil, done, err } - return p.emoji, nil + return p.emoji, done, nil } // store calls the data function attached to p if it hasn't been called yet, @@ -220,24 +250,24 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { ) // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.emoji.ImagePath); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImagePath); have { log.Warnf("emoji already exists at storage path: %s", p.emoji.ImagePath) // Attempt to remove existing emoji at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.emoji.ImagePath); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil { return fmt.Errorf("error removing emoji from storage: %v", err) } } // Write the final image reader stream to our storage. - sz, err = p.manager.storage.PutStream(ctx, p.emoji.ImagePath, r) + sz, err = p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r) if err != nil { return fmt.Errorf("error writing emoji to storage: %w", err) } // Once again check size in case none was provided previously. if size := bytesize.Size(sz); size > maxSize { - if err := p.manager.storage.Delete(ctx, p.emoji.ImagePath); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil { log.Errorf("error removing too-large-emoji from storage: %v", err) } return fmt.Errorf("calculated emoji size %s greater than max allowed %s", size, maxSize) @@ -259,7 +289,7 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { func (p *ProcessingEmoji) finish(ctx context.Context) error { // Fetch a stream to the original file in storage. - rc, err := p.manager.storage.GetStream(ctx, p.emoji.ImagePath) + rc, err := p.mgr.state.Storage.GetStream(ctx, p.emoji.ImagePath) if err != nil { return fmt.Errorf("error loading file from storage: %w", err) } @@ -277,11 +307,11 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error { } // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.emoji.ImageStaticPath); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImageStaticPath); have { log.Warnf("static emoji already exists at storage path: %s", p.emoji.ImagePath) // Attempt to remove static existing emoji at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil { return fmt.Errorf("error removing static emoji from storage: %v", err) } } @@ -290,7 +320,7 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error { enc := staticImg.ToPNG() // Stream-encode the PNG static image into storage. - sz, err := p.manager.storage.PutStream(ctx, p.emoji.ImageStaticPath, enc) + sz, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImageStaticPath, enc) if err != nil { return fmt.Errorf("error stream-encoding static emoji to storage: %w", err) } @@ -300,129 +330,3 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error { return nil } - -func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { - instanceAccount, err := m.db.GetInstanceAccount(ctx, "") - if err != nil { - return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err) - } - - var newPathID string - var emoji *gtsmodel.Emoji - if refresh { - emoji, err = m.db.GetEmojiByID(ctx, emojiID) - if err != nil { - return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err) - } - - // if this is a refresh, we will end up with new images - // stored for this emoji, so we can use the postData function - // to perform clean up of the old images from storage - originalPostData := postData - originalImagePath := emoji.ImagePath - originalImageStaticPath := emoji.ImageStaticPath - postData = func(innerCtx context.Context) error { - // trigger the original postData function if it was provided - if originalPostData != nil { - if err := originalPostData(innerCtx); err != nil { - return err - } - } - - l := log.WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain) - l.Debug("postData: cleaning up old emoji files for refreshed emoji") - if err := m.storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, gostore.ErrNotFound) { - l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err) - } - if err := m.storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, gostore.ErrNotFound) { - l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err) - } - - return nil - } - - newPathID, err = id.NewRandomULID() - if err != nil { - return nil, fmt.Errorf("preProcessEmoji: error generating alternateID for emoji refresh: %s", err) - } - - // store + serve static image at new path ID - emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng) - emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, newPathID, mimePng) - - emoji.Shortcode = shortcode - emoji.URI = uri - } else { - disabled := false - visibleInPicker := true - - // populate initial fields on the emoji -- some of these will be overwritten as we proceed - emoji = >smodel.Emoji{ - ID: emojiID, - CreatedAt: time.Now(), - Shortcode: shortcode, - Domain: "", // assume our own domain unless told otherwise - ImageRemoteURL: "", - ImageStaticRemoteURL: "", - ImageURL: "", // we don't know yet - ImageStaticURL: uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png - ImagePath: "", // we don't know yet - ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png - ImageContentType: "", // we don't know yet - ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png - ImageFileSize: 0, - ImageStaticFileSize: 0, - Disabled: &disabled, - URI: uri, - VisibleInPicker: &visibleInPicker, - CategoryID: "", - } - } - - emoji.ImageUpdatedAt = time.Now() - emoji.UpdatedAt = time.Now() - - // check if we have additional info to add to the emoji, - // and overwrite some of the emoji fields if so - if ai != nil { - if ai.CreatedAt != nil { - emoji.CreatedAt = *ai.CreatedAt - } - - if ai.Domain != nil { - emoji.Domain = *ai.Domain - } - - if ai.ImageRemoteURL != nil { - emoji.ImageRemoteURL = *ai.ImageRemoteURL - } - - if ai.ImageStaticRemoteURL != nil { - emoji.ImageStaticRemoteURL = *ai.ImageStaticRemoteURL - } - - if ai.Disabled != nil { - emoji.Disabled = ai.Disabled - } - - if ai.VisibleInPicker != nil { - emoji.VisibleInPicker = ai.VisibleInPicker - } - - if ai.CategoryID != nil { - emoji.CategoryID = *ai.CategoryID - } - } - - processingEmoji := &ProcessingEmoji{ - instAccID: instanceAccount.ID, - emoji: emoji, - refresh: refresh, - newPathID: newPathID, - dataFn: data, - postFn: postData, - manager: m, - } - - return processingEmoji, nil -} |