diff options
author | 2023-02-13 18:40:48 +0000 | |
---|---|---|
committer | 2023-02-13 18:40:48 +0000 | |
commit | acc95923da555b2bf17a5638e62e533218c5840a (patch) | |
tree | 7df5d0636137efa5b49298a8f0ced81d35767a5b /internal/media/processingmedia.go | |
parent | [docs] move federating with gotosocial documentation into single file (#1494) (diff) | |
download | gotosocial-acc95923da555b2bf17a5638e62e533218c5840a.tar.xz |
[performance] processing media and scheduled jobs improvements (#1482)
* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling
* improved code comment
* fix worker tryUntil function, update go-runners/go-sched
* make preprocess functions package public, use these where possible to stop doubled up processing
* remove separate emoji worker pool
* limit calls to time.Now() during media preprocessing
* use Processor{} to manage singular runtime of processing media
* ensure workers get started when media manager is used
* improved error setting in processing media, fix media test
* port changes from processingmedia to processing emoji
* finish code commenting
* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced
* linterrrrrrrrrrrrrrrr
---------
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/media/processingmedia.go')
-rw-r--r-- | internal/media/processingmedia.go | 215 |
1 files changed, 72 insertions, 143 deletions
diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index 34f8dc26b..b4eda4072 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -24,14 +24,14 @@ import ( "fmt" "image/jpeg" "io" - "sync" "time" + "codeberg.org/gruf/go-errors/v2" + "codeberg.org/gruf/go-runners" "github.com/disintegration/imaging" "github.com/h2non/filetype" terminator "github.com/superseriousbusiness/exif-terminator" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/uris" ) @@ -40,12 +40,13 @@ import ( // various functions for retrieving data from the process. type ProcessingMedia struct { media *gtsmodel.MediaAttachment // processing media attachment details - recache bool // recaching existing (uncached) media dataFn DataFunc // load-data function, returns media stream postFn PostDataCallbackFunc // post data callback function - err error // error encountered during processing - manager *manager // manager instance (access to db / storage) - once sync.Once // once ensures processing only occurs once + recache bool // recaching existing (uncached) media + done bool // done is set when process finishes with non ctx canceled type error + proc runners.Processor // proc helps synchronize only a singular running processing instance + err error // error stores permanent error value when done + mgr *manager // mgr instance (access to db / storage) } // AttachmentID returns the ID of the underlying media attachment without blocking processing. @@ -53,58 +54,90 @@ func (p *ProcessingMedia) AttachmentID() string { return p.media.ID // immutable, safe outside mutex. } -// LoadAttachment blocks until the thumbnail and fullsize content -// has been processed, and then returns the completed attachment. +// LoadAttachment blocks until the thumbnail and fullsize content has been processed, and then returns the completed attachment. func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAttachment, error) { - // only process once. - p.once.Do(func() { - var err error + // Attempt to load synchronously. + media, done, err := p.load(ctx) + + if err == nil { + // No issue, return media. + return media, nil + } + + if !done { + // Provided context was cancelled, e.g. request cancelled + // early. Queue this item for asynchronous processing. + log.Warnf("reprocessing media %s after canceled ctx", p.media.ID) + go p.mgr.state.Workers.Media.Enqueue(p.Process) + } + + return nil, err +} + +// Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error. +func (p *ProcessingMedia) Process(ctx context.Context) { + if _, _, err := p.load(ctx); err != nil { + log.Errorf("error processing media: %v", err) + } +} + +// load performs a concurrency-safe load of ProcessingMedia, only marking itself as complete when returned error is NOT a context cancel. +func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment, bool, error) { + var ( + done bool + err error + ) + + err = p.proc.Process(func() error { + if p.done { + // Already proc'd. + return p.err + } defer func() { - if r := recover(); r != nil { - if err != nil { - rOld := r // wrap the panic so we don't lose existing returned error - r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) - } - - // Catch any panics and wrap as error. - err = fmt.Errorf("caught panic: %v", r) + // This is only done when ctx NOT cancelled. + done = err == nil || !errors.Is(err, + context.Canceled, + context.DeadlineExceeded, + ) + + if !done { + return } - if err != nil { - // Store error. - p.err = err - } + // Store final values. + p.done = true + p.err = err }() // Attempt to store media and calculate // full-size media attachment details. if err = p.store(ctx); err != nil { - return + return err } // Finish processing by reloading media into // memory to get dimension and generate a thumb. if err = p.finish(ctx); err != nil { - return + return err } if p.recache { // Existing attachment we're recaching, so only need to update. - err = p.manager.db.UpdateByID(ctx, p.media, p.media.ID) - return + err = p.mgr.state.DB.UpdateByID(ctx, p.media, p.media.ID) + return err } // New attachment, first time caching. - err = p.manager.db.Put(ctx, p.media) - return //nolint shutup linter i like this here + err = p.mgr.state.DB.Put(ctx, p.media) + return err }) - if p.err != nil { - return nil, p.err + if err != nil { + return nil, done, err } - return p.media, nil + return p.media, done, nil } // store calls the data function attached to p if it hasn't been called yet, @@ -186,17 +219,17 @@ func (p *ProcessingMedia) store(ctx context.Context) error { ) // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.media.File.Path); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.media.File.Path); have { log.Warnf("media already exists at storage path: %s", p.media.File.Path) // Attempt to remove existing media at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.media.File.Path); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.media.File.Path); err != nil { return fmt.Errorf("error removing media from storage: %v", err) } } // Write the final image reader stream to our storage. - sz, err = p.manager.storage.PutStream(ctx, p.media.File.Path, r) + sz, err = p.mgr.state.Storage.PutStream(ctx, p.media.File.Path, r) if err != nil { return fmt.Errorf("error writing media to storage: %w", err) } @@ -221,7 +254,7 @@ func (p *ProcessingMedia) store(ctx context.Context) error { func (p *ProcessingMedia) finish(ctx context.Context) error { // Fetch a stream to the original file in storage. - rc, err := p.manager.storage.GetStream(ctx, p.media.File.Path) + rc, err := p.mgr.state.Storage.GetStream(ctx, p.media.File.Path) if err != nil { return fmt.Errorf("error loading file from storage: %w", err) } @@ -299,11 +332,11 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { p.media.Blurhash = hash // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.media.Thumbnail.Path); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.media.Thumbnail.Path); have { log.Warnf("thumbnail already exists at storage path: %s", p.media.Thumbnail.Path) // Attempt to remove existing thumbnail at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.media.Thumbnail.Path); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path); err != nil { return fmt.Errorf("error removing thumbnail from storage: %v", err) } } @@ -314,7 +347,7 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { }) // Stream-encode the JPEG thumbnail image into storage. - sz, err := p.manager.storage.PutStream(ctx, p.media.Thumbnail.Path, enc) + sz, err := p.mgr.state.Storage.PutStream(ctx, p.media.Thumbnail.Path, enc) if err != nil { return fmt.Errorf("error stream-encoding thumbnail to storage: %w", err) } @@ -346,107 +379,3 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { return nil } - -func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { - id, err := id.NewRandomULID() - if err != nil { - return nil, err - } - - avatar := false - header := false - cached := false - - // populate initial fields on the media attachment -- some of these will be overwritten as we proceed - attachment := >smodel.MediaAttachment{ - ID: id, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - StatusID: "", - URL: "", // we don't know yet because it depends on the uncalled DataFunc - RemoteURL: "", - Type: gtsmodel.FileTypeUnknown, // we don't know yet because it depends on the uncalled DataFunc - FileMeta: gtsmodel.FileMeta{}, - AccountID: accountID, - Description: "", - ScheduledStatusID: "", - Blurhash: "", - Processing: gtsmodel.ProcessingStatusReceived, - File: gtsmodel.File{UpdatedAt: time.Now()}, - Thumbnail: gtsmodel.Thumbnail{UpdatedAt: time.Now()}, - Avatar: &avatar, - Header: &header, - Cached: &cached, - } - - // check if we have additional info to add to the attachment, - // and overwrite some of the attachment fields if so - if ai != nil { - if ai.CreatedAt != nil { - attachment.CreatedAt = *ai.CreatedAt - } - - if ai.StatusID != nil { - attachment.StatusID = *ai.StatusID - } - - if ai.RemoteURL != nil { - attachment.RemoteURL = *ai.RemoteURL - } - - if ai.Description != nil { - attachment.Description = *ai.Description - } - - if ai.ScheduledStatusID != nil { - attachment.ScheduledStatusID = *ai.ScheduledStatusID - } - - if ai.Blurhash != nil { - attachment.Blurhash = *ai.Blurhash - } - - if ai.Avatar != nil { - attachment.Avatar = ai.Avatar - } - - if ai.Header != nil { - attachment.Header = ai.Header - } - - if ai.FocusX != nil { - attachment.FileMeta.Focus.X = *ai.FocusX - } - - if ai.FocusY != nil { - attachment.FileMeta.Focus.Y = *ai.FocusY - } - } - - processingMedia := &ProcessingMedia{ - media: attachment, - dataFn: data, - postFn: postData, - manager: m, - } - - return processingMedia, nil -} - -func (m *manager) preProcessRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, id string) (*ProcessingMedia, error) { - // get the existing attachment from database. - attachment, err := m.db.GetAttachmentByID(ctx, id) - if err != nil { - return nil, err - } - - processingMedia := &ProcessingMedia{ - media: attachment, - dataFn: data, - postFn: postData, - manager: m, - recache: true, // indicate it's a recache - } - - return processingMedia, nil -} |