diff options
author | 2023-02-13 18:40:48 +0000 | |
---|---|---|
committer | 2023-02-13 18:40:48 +0000 | |
commit | acc95923da555b2bf17a5638e62e533218c5840a (patch) | |
tree | 7df5d0636137efa5b49298a8f0ced81d35767a5b /internal/media | |
parent | [docs] move federating with gotosocial documentation into single file (#1494) (diff) | |
download | gotosocial-acc95923da555b2bf17a5638e62e533218c5840a.tar.xz |
[performance] processing media and scheduled jobs improvements (#1482)
* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling
* improved code comment
* fix worker tryUntil function, update go-runners/go-sched
* make preprocess functions package public, use these where possible to stop doubled up processing
* remove separate emoji worker pool
* limit calls to time.Now() during media preprocessing
* use Processor{} to manage singular runtime of processing media
* ensure workers get started when media manager is used
* improved error setting in processing media, fix media test
* port changes from processingmedia to processing emoji
* finish code commenting
* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced
* linterrrrrrrrrrrrrrrr
---------
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/media')
-rw-r--r-- | internal/media/cron.go | 73 | ||||
-rw-r--r-- | internal/media/manager.go | 358 | ||||
-rw-r--r-- | internal/media/manager_test.go | 13 | ||||
-rw-r--r-- | internal/media/processingemoji.go | 240 | ||||
-rw-r--r-- | internal/media/processingmedia.go | 215 | ||||
-rw-r--r-- | internal/media/prune.go | 26 | ||||
-rw-r--r-- | internal/media/prune_test.go | 2 | ||||
-rw-r--r-- | internal/media/refetch.go | 10 |
8 files changed, 462 insertions, 475 deletions
diff --git a/internal/media/cron.go b/internal/media/cron.go deleted file mode 100644 index e32a63661..000000000 --- a/internal/media/cron.go +++ /dev/null @@ -1,73 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2023 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -package media - -import ( - "context" - "fmt" - "time" - - "github.com/robfig/cron/v3" - "github.com/superseriousbusiness/gotosocial/internal/config" - "github.com/superseriousbusiness/gotosocial/internal/log" -) - -type cronLogger struct{} - -func (l *cronLogger) Info(msg string, keysAndValues ...interface{}) { - log.Info("media manager cron logger: ", msg, keysAndValues) -} - -func (l *cronLogger) Error(err error, msg string, keysAndValues ...interface{}) { - log.Error("media manager cron logger: ", err, msg, keysAndValues) -} - -func scheduleCleanup(m *manager) error { - pruneCtx, pruneCancel := context.WithCancel(context.Background()) - - c := cron.New(cron.WithLogger(new(cronLogger))) - defer c.Start() - - if _, err := c.AddFunc("@midnight", func() { - if err := m.PruneAll(pruneCtx, config.GetMediaRemoteCacheDays(), true); err != nil { - log.Error(err) - return - } - }); err != nil { - pruneCancel() - return fmt.Errorf("error starting media manager cleanup job: %s", err) - } - - m.stopCronJobs = func() error { - // Try to stop jobs gracefully by waiting til they're finished. - stopCtx := c.Stop() - - select { - case <-stopCtx.Done(): - log.Infof("media manager: cron finished jobs and stopped gracefully") - case <-time.After(1 * time.Minute): - log.Warnf("media manager: cron didn't stop after 60 seconds, force closing jobs") - pruneCancel() - } - - return nil - } - - return nil -} diff --git a/internal/media/manager.go b/internal/media/manager.go index b770a7dcd..ba89aff13 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -20,11 +20,19 @@ package media import ( "context" + "errors" "fmt" + "time" - "github.com/superseriousbusiness/gotosocial/internal/concurrency" - "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/storage" + "codeberg.org/gruf/go-runners" + "codeberg.org/gruf/go-sched" + "codeberg.org/gruf/go-store/v2/storage" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/uris" ) var SupportedMIMETypes = []string{ @@ -42,16 +50,11 @@ var SupportedEmojiMIMETypes = []string{ // Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs. type Manager interface { - // Stop stops the underlying worker pool of the manager. It should be called - // when closing GoToSocial in order to cleanly finish any in-progress jobs. - // It will block until workers are finished processing. - Stop() error - /* PROCESSING FUNCTIONS */ - // ProcessMedia begins the process of decoding and storing the given data as an attachment. + // PreProcessMedia begins the process of decoding and storing the given data as an attachment. // It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting // the finished media, thumbnail, attachment, etc. // @@ -63,8 +66,19 @@ type Manager interface { // accountID should be the account that the media belongs to. // // ai is optional and can be nil. Any additional information about the attachment provided will be put in the database. + // + // Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed. + PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) + + // PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote. + // + // Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed. + PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) + + // ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue. ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) - // ProcessEmoji begins the process of decoding and storing the given data as an emoji. + + // PreProcessEmoji begins the process of decoding and storing the given data as an emoji. // It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting // the finished media, thumbnail, attachment, etc. // @@ -81,10 +95,11 @@ type Manager interface { // // ai is optional and can be nil. Any additional information about the emoji provided will be put in the database. // - // If refresh is true, this indicates that the emoji image has changed and should be updated. + // Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed. + PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) + + // ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue. ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) - // RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote. - RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) /* PRUNING/UNCACHING FUNCTIONS @@ -139,11 +154,7 @@ type Manager interface { } type manager struct { - db db.DB - storage *storage.Driver - emojiWorker *concurrency.WorkerPool[*ProcessingEmoji] - mediaWorker *concurrency.WorkerPool[*ProcessingMedia] - stopCronJobs func() error + state *state.State } // NewManager returns a media manager with the given db and underlying storage. @@ -152,88 +163,299 @@ type manager struct { // a limited number of media will be processed in parallel. The numbers of workers // is determined from the $GOMAXPROCS environment variable (usually no. CPU cores). // See internal/concurrency.NewWorkerPool() documentation for further information. -func NewManager(database db.DB, storage *storage.Driver) (Manager, error) { - m := &manager{ - db: database, - storage: storage, +func NewManager(state *state.State) Manager { + m := &manager{state: state} + scheduleCleanupJobs(m) + return m +} + +func (m *manager) PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { + id, err := id.NewRandomULID() + if err != nil { + return nil, err } - // Prepare the media worker pool. - m.mediaWorker = concurrency.NewWorkerPool[*ProcessingMedia](-1, 10) - m.mediaWorker.SetProcessor(func(ctx context.Context, media *ProcessingMedia) error { - if _, err := media.LoadAttachment(ctx); err != nil { - return fmt.Errorf("error loading media %s: %v", media.AttachmentID(), err) + avatar := false + header := false + cached := false + now := time.Now() + + // populate initial fields on the media attachment -- some of these will be overwritten as we proceed + attachment := >smodel.MediaAttachment{ + ID: id, + CreatedAt: now, + UpdatedAt: now, + StatusID: "", + URL: "", // we don't know yet because it depends on the uncalled DataFunc + RemoteURL: "", + Type: gtsmodel.FileTypeUnknown, // we don't know yet because it depends on the uncalled DataFunc + FileMeta: gtsmodel.FileMeta{}, + AccountID: accountID, + Description: "", + ScheduledStatusID: "", + Blurhash: "", + Processing: gtsmodel.ProcessingStatusReceived, + File: gtsmodel.File{UpdatedAt: now}, + Thumbnail: gtsmodel.Thumbnail{UpdatedAt: now}, + Avatar: &avatar, + Header: &header, + Cached: &cached, + } + + // check if we have additional info to add to the attachment, + // and overwrite some of the attachment fields if so + if ai != nil { + if ai.CreatedAt != nil { + attachment.CreatedAt = *ai.CreatedAt } - return nil - }) - // Prepare the emoji worker pool. - m.emojiWorker = concurrency.NewWorkerPool[*ProcessingEmoji](-1, 10) - m.emojiWorker.SetProcessor(func(ctx context.Context, emoji *ProcessingEmoji) error { - if _, err := emoji.LoadEmoji(ctx); err != nil { - return fmt.Errorf("error loading emoji %s: %v", emoji.EmojiID(), err) + if ai.StatusID != nil { + attachment.StatusID = *ai.StatusID } - return nil - }) - // Start the worker pools. - if err := m.mediaWorker.Start(); err != nil { - return nil, err + if ai.RemoteURL != nil { + attachment.RemoteURL = *ai.RemoteURL + } + + if ai.Description != nil { + attachment.Description = *ai.Description + } + + if ai.ScheduledStatusID != nil { + attachment.ScheduledStatusID = *ai.ScheduledStatusID + } + + if ai.Blurhash != nil { + attachment.Blurhash = *ai.Blurhash + } + + if ai.Avatar != nil { + attachment.Avatar = ai.Avatar + } + + if ai.Header != nil { + attachment.Header = ai.Header + } + + if ai.FocusX != nil { + attachment.FileMeta.Focus.X = *ai.FocusX + } + + if ai.FocusY != nil { + attachment.FileMeta.Focus.Y = *ai.FocusY + } } - if err := m.emojiWorker.Start(); err != nil { - return nil, err + + processingMedia := &ProcessingMedia{ + media: attachment, + dataFn: data, + postFn: postData, + mgr: m, } - // Schedule cron job(s) for clean up. - if err := scheduleCleanup(m); err != nil { + return processingMedia, nil +} + +func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) { + // get the existing attachment from database. + attachment, err := m.state.DB.GetAttachmentByID(ctx, attachmentID) + if err != nil { return nil, err } - return m, nil + processingMedia := &ProcessingMedia{ + media: attachment, + dataFn: data, + postFn: postData, + recache: true, // indicate it's a recache + mgr: m, + } + + return processingMedia, nil } func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { - processingMedia, err := m.preProcessMedia(ctx, data, postData, accountID, ai) + // Create a new processing media object for this media request. + media, err := m.PreProcessMedia(ctx, data, postData, accountID, ai) if err != nil { return nil, err } - m.mediaWorker.Queue(processingMedia) - return processingMedia, nil + + // Attempt to add this media processing item to the worker queue. + _ = m.state.Workers.Media.MustEnqueueCtx(ctx, media.Process) + + return media, nil } -func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { - processingEmoji, err := m.preProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh) +func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { + instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "") if err != nil { - return nil, err + return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err) + } + + var ( + newPathID string + emoji *gtsmodel.Emoji + now = time.Now() + ) + + if refresh { + emoji, err = m.state.DB.GetEmojiByID(ctx, emojiID) + if err != nil { + return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err) + } + + // if this is a refresh, we will end up with new images + // stored for this emoji, so we can use the postData function + // to perform clean up of the old images from storage + originalPostData := postData + originalImagePath := emoji.ImagePath + originalImageStaticPath := emoji.ImageStaticPath + postData = func(innerCtx context.Context) error { + // trigger the original postData function if it was provided + if originalPostData != nil { + if err := originalPostData(innerCtx); err != nil { + return err + } + } + + l := log.WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain) + l.Debug("postData: cleaning up old emoji files for refreshed emoji") + if err := m.state.Storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) { + l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err) + } + if err := m.state.Storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) { + l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err) + } + + return nil + } + + newPathID, err = id.NewRandomULID() + if err != nil { + return nil, fmt.Errorf("preProcessEmoji: error generating alternateID for emoji refresh: %s", err) + } + + // store + serve static image at new path ID + emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng) + emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, newPathID, mimePng) + + emoji.Shortcode = shortcode + emoji.URI = uri + } else { + disabled := false + visibleInPicker := true + + // populate initial fields on the emoji -- some of these will be overwritten as we proceed + emoji = >smodel.Emoji{ + ID: emojiID, + CreatedAt: now, + Shortcode: shortcode, + Domain: "", // assume our own domain unless told otherwise + ImageRemoteURL: "", + ImageStaticRemoteURL: "", + ImageURL: "", // we don't know yet + ImageStaticURL: uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png + ImagePath: "", // we don't know yet + ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png + ImageContentType: "", // we don't know yet + ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png + ImageFileSize: 0, + ImageStaticFileSize: 0, + Disabled: &disabled, + URI: uri, + VisibleInPicker: &visibleInPicker, + CategoryID: "", + } + } + + emoji.ImageUpdatedAt = now + emoji.UpdatedAt = now + + // check if we have additional info to add to the emoji, + // and overwrite some of the emoji fields if so + if ai != nil { + if ai.CreatedAt != nil { + emoji.CreatedAt = *ai.CreatedAt + } + + if ai.Domain != nil { + emoji.Domain = *ai.Domain + } + + if ai.ImageRemoteURL != nil { + emoji.ImageRemoteURL = *ai.ImageRemoteURL + } + + if ai.ImageStaticRemoteURL != nil { + emoji.ImageStaticRemoteURL = *ai.ImageStaticRemoteURL + } + + if ai.Disabled != nil { + emoji.Disabled = ai.Disabled + } + + if ai.VisibleInPicker != nil { + emoji.VisibleInPicker = ai.VisibleInPicker + } + + if ai.CategoryID != nil { + emoji.CategoryID = *ai.CategoryID + } } - m.emojiWorker.Queue(processingEmoji) + + processingEmoji := &ProcessingEmoji{ + instAccID: instanceAccount.ID, + emoji: emoji, + refresh: refresh, + newPathID: newPathID, + dataFn: data, + postFn: postData, + mgr: m, + } + return processingEmoji, nil } -func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) { - processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID) +func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { + // Create a new processing emoji object for this emoji request. + emoji, err := m.PreProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh) if err != nil { return nil, err } - m.mediaWorker.Queue(processingRecache) - return processingRecache, nil + + // Attempt to add this emoji processing item to the worker queue. + _ = m.state.Workers.Media.MustEnqueueCtx(ctx, emoji.Process) + + return emoji, nil } -func (m *manager) Stop() error { - // Stop worker pools. - mediaErr := m.mediaWorker.Stop() - emojiErr := m.emojiWorker.Stop() +func scheduleCleanupJobs(m *manager) { + const day = time.Hour * 24 - var cronErr error - if m.stopCronJobs != nil { - cronErr = m.stopCronJobs() - } + // Calculate closest midnight. + now := time.Now() + midnight := now.Round(day) - if mediaErr != nil { - return mediaErr - } else if emojiErr != nil { - return emojiErr + if midnight.Before(now) { + // since <= 11:59am rounds down. + midnight = midnight.Add(day) } - return cronErr + // Get ctx associated with scheduler run state. + done := m.state.Workers.Scheduler.Done() + doneCtx := runners.CancelCtx(done) + + // TODO: we'll need to do some thinking to make these + // jobs restartable if we want to implement reloads in + // the future that make call to Workers.Stop() -> Workers.Start(). + + // Schedule the PruneAll task to execute every day at midnight. + m.state.Workers.Scheduler.Schedule(sched.NewJob(func(now time.Time) { + err := m.PruneAll(doneCtx, config.GetMediaRemoteCacheDays(), true) + if err != nil { + log.Errorf("error during prune: %v", err) + } + log.Infof("finished pruning all in %s", time.Since(now)) + }).EveryAt(midnight, day)) } diff --git a/internal/media/manager_test.go b/internal/media/manager_test.go index d912c9d87..f4dd8dac7 100644 --- a/internal/media/manager_test.go +++ b/internal/media/manager_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/suite" gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/media" + "github.com/superseriousbusiness/gotosocial/internal/state" gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage" ) @@ -1189,15 +1190,19 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() { panic(err) } + var state state.State + + state.Workers.Start() + defer state.Workers.Stop() + storage := >sstorage.Driver{ KVStore: kv.New(disk), Storage: disk, } + state.Storage = storage + state.DB = suite.db - diskManager, err := media.NewManager(suite.db, storage) - if err != nil { - panic(err) - } + diskManager := media.NewManager(&state) suite.manager = diskManager // process the media with no additional info provided diff --git a/internal/media/processingemoji.go b/internal/media/processingemoji.go index b68c9dfe1..0a36174b0 100644 --- a/internal/media/processingemoji.go +++ b/internal/media/processingemoji.go @@ -21,18 +21,15 @@ package media import ( "bytes" "context" - "errors" "fmt" "io" - "sync" - "time" "codeberg.org/gruf/go-bytesize" - gostore "codeberg.org/gruf/go-store/v2/storage" + "codeberg.org/gruf/go-errors/v2" + "codeberg.org/gruf/go-runners" "github.com/h2non/filetype" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/uris" ) @@ -46,9 +43,10 @@ type ProcessingEmoji struct { newPathID string // new emoji path ID to use if refreshed dataFn DataFunc // load-data function, returns media stream postFn PostDataCallbackFunc // post data callback function - err error // error encountered during processing - manager *manager // manager instance (access to db / storage) - once sync.Once // once ensures processing only occurs once + done bool // done is set when process finishes with non ctx canceled type error + proc runners.Processor // proc helps synchronize only a singular running processing instance + err error // error stores permanent error value when done + mgr *manager // mgr instance (access to db / storage) } // EmojiID returns the ID of the underlying emoji without blocking processing. @@ -56,40 +54,72 @@ func (p *ProcessingEmoji) EmojiID() string { return p.emoji.ID // immutable, safe outside mutex. } -// LoadEmoji blocks until the static and fullsize image -// has been processed, and then returns the completed emoji. +// LoadEmoji blocks until the static and fullsize image has been processed, and then returns the completed emoji. func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error) { - // only process once. - p.once.Do(func() { - var err error + // Attempt to load synchronously. + emoji, done, err := p.load(ctx) + + if err == nil { + // No issue, return media. + return emoji, nil + } + + if !done { + // Provided context was cancelled, e.g. request cancelled + // early. Queue this item for asynchronous processing. + log.Warnf("reprocessing emoji %s after canceled ctx", p.emoji.ID) + go p.mgr.state.Workers.Media.Enqueue(p.Process) + } + + return nil, err +} + +// Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error. +func (p *ProcessingEmoji) Process(ctx context.Context) { + if _, _, err := p.load(ctx); err != nil { + log.Errorf("error processing emoji: %v", err) + } +} + +// load performs a concurrency-safe load of ProcessingEmoji, only marking itself as complete when returned error is NOT a context cancel. +func (p *ProcessingEmoji) load(ctx context.Context) (*gtsmodel.Emoji, bool, error) { + var ( + done bool + err error + ) + + err = p.proc.Process(func() error { + if p.done { + // Already proc'd. + return p.err + } defer func() { - if r := recover(); r != nil { - if err != nil { - rOld := r // wrap the panic so we don't lose existing returned error - r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) - } - - // Catch any panics and wrap as error. - err = fmt.Errorf("caught panic: %v", r) + // This is only done when ctx NOT cancelled. + done = err == nil || !errors.Is(err, + context.Canceled, + context.DeadlineExceeded, + ) + + if !done { + return } - if err != nil { - // Store error. - p.err = err - } + // Store final values. + p.done = true + p.err = err }() // Attempt to store media and calculate // full-size media attachment details. if err = p.store(ctx); err != nil { - return + return err } // Finish processing by reloading media into // memory to get dimension and generate a thumb. if err = p.finish(ctx); err != nil { - return + return err } if p.refresh { @@ -110,20 +140,20 @@ func (p *ProcessingEmoji) LoadEmoji(ctx context.Context) (*gtsmodel.Emoji, error } // Existing emoji we're refreshing, so only need to update. - _, err = p.manager.db.UpdateEmoji(ctx, p.emoji, columns...) - return + _, err = p.mgr.state.DB.UpdateEmoji(ctx, p.emoji, columns...) + return err } // New emoji media, first time caching. - err = p.manager.db.PutEmoji(ctx, p.emoji) - return //nolint shutup linter i like this here + err = p.mgr.state.DB.PutEmoji(ctx, p.emoji) + return err }) - if p.err != nil { - return nil, p.err + if err != nil { + return nil, done, err } - return p.emoji, nil + return p.emoji, done, nil } // store calls the data function attached to p if it hasn't been called yet, @@ -220,24 +250,24 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { ) // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.emoji.ImagePath); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImagePath); have { log.Warnf("emoji already exists at storage path: %s", p.emoji.ImagePath) // Attempt to remove existing emoji at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.emoji.ImagePath); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil { return fmt.Errorf("error removing emoji from storage: %v", err) } } // Write the final image reader stream to our storage. - sz, err = p.manager.storage.PutStream(ctx, p.emoji.ImagePath, r) + sz, err = p.mgr.state.Storage.PutStream(ctx, p.emoji.ImagePath, r) if err != nil { return fmt.Errorf("error writing emoji to storage: %w", err) } // Once again check size in case none was provided previously. if size := bytesize.Size(sz); size > maxSize { - if err := p.manager.storage.Delete(ctx, p.emoji.ImagePath); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImagePath); err != nil { log.Errorf("error removing too-large-emoji from storage: %v", err) } return fmt.Errorf("calculated emoji size %s greater than max allowed %s", size, maxSize) @@ -259,7 +289,7 @@ func (p *ProcessingEmoji) store(ctx context.Context) error { func (p *ProcessingEmoji) finish(ctx context.Context) error { // Fetch a stream to the original file in storage. - rc, err := p.manager.storage.GetStream(ctx, p.emoji.ImagePath) + rc, err := p.mgr.state.Storage.GetStream(ctx, p.emoji.ImagePath) if err != nil { return fmt.Errorf("error loading file from storage: %w", err) } @@ -277,11 +307,11 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error { } // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.emoji.ImageStaticPath); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.emoji.ImageStaticPath); have { log.Warnf("static emoji already exists at storage path: %s", p.emoji.ImagePath) // Attempt to remove static existing emoji at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.emoji.ImageStaticPath); err != nil { return fmt.Errorf("error removing static emoji from storage: %v", err) } } @@ -290,7 +320,7 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error { enc := staticImg.ToPNG() // Stream-encode the PNG static image into storage. - sz, err := p.manager.storage.PutStream(ctx, p.emoji.ImageStaticPath, enc) + sz, err := p.mgr.state.Storage.PutStream(ctx, p.emoji.ImageStaticPath, enc) if err != nil { return fmt.Errorf("error stream-encoding static emoji to storage: %w", err) } @@ -300,129 +330,3 @@ func (p *ProcessingEmoji) finish(ctx context.Context) error { return nil } - -func (m *manager) preProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { - instanceAccount, err := m.db.GetInstanceAccount(ctx, "") - if err != nil { - return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err) - } - - var newPathID string - var emoji *gtsmodel.Emoji - if refresh { - emoji, err = m.db.GetEmojiByID(ctx, emojiID) - if err != nil { - return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err) - } - - // if this is a refresh, we will end up with new images - // stored for this emoji, so we can use the postData function - // to perform clean up of the old images from storage - originalPostData := postData - originalImagePath := emoji.ImagePath - originalImageStaticPath := emoji.ImageStaticPath - postData = func(innerCtx context.Context) error { - // trigger the original postData function if it was provided - if originalPostData != nil { - if err := originalPostData(innerCtx); err != nil { - return err - } - } - - l := log.WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain) - l.Debug("postData: cleaning up old emoji files for refreshed emoji") - if err := m.storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, gostore.ErrNotFound) { - l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err) - } - if err := m.storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, gostore.ErrNotFound) { - l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err) - } - - return nil - } - - newPathID, err = id.NewRandomULID() - if err != nil { - return nil, fmt.Errorf("preProcessEmoji: error generating alternateID for emoji refresh: %s", err) - } - - // store + serve static image at new path ID - emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng) - emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, newPathID, mimePng) - - emoji.Shortcode = shortcode - emoji.URI = uri - } else { - disabled := false - visibleInPicker := true - - // populate initial fields on the emoji -- some of these will be overwritten as we proceed - emoji = >smodel.Emoji{ - ID: emojiID, - CreatedAt: time.Now(), - Shortcode: shortcode, - Domain: "", // assume our own domain unless told otherwise - ImageRemoteURL: "", - ImageStaticRemoteURL: "", - ImageURL: "", // we don't know yet - ImageStaticURL: uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png - ImagePath: "", // we don't know yet - ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png - ImageContentType: "", // we don't know yet - ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png - ImageFileSize: 0, - ImageStaticFileSize: 0, - Disabled: &disabled, - URI: uri, - VisibleInPicker: &visibleInPicker, - CategoryID: "", - } - } - - emoji.ImageUpdatedAt = time.Now() - emoji.UpdatedAt = time.Now() - - // check if we have additional info to add to the emoji, - // and overwrite some of the emoji fields if so - if ai != nil { - if ai.CreatedAt != nil { - emoji.CreatedAt = *ai.CreatedAt - } - - if ai.Domain != nil { - emoji.Domain = *ai.Domain - } - - if ai.ImageRemoteURL != nil { - emoji.ImageRemoteURL = *ai.ImageRemoteURL - } - - if ai.ImageStaticRemoteURL != nil { - emoji.ImageStaticRemoteURL = *ai.ImageStaticRemoteURL - } - - if ai.Disabled != nil { - emoji.Disabled = ai.Disabled - } - - if ai.VisibleInPicker != nil { - emoji.VisibleInPicker = ai.VisibleInPicker - } - - if ai.CategoryID != nil { - emoji.CategoryID = *ai.CategoryID - } - } - - processingEmoji := &ProcessingEmoji{ - instAccID: instanceAccount.ID, - emoji: emoji, - refresh: refresh, - newPathID: newPathID, - dataFn: data, - postFn: postData, - manager: m, - } - - return processingEmoji, nil -} diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index 34f8dc26b..b4eda4072 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -24,14 +24,14 @@ import ( "fmt" "image/jpeg" "io" - "sync" "time" + "codeberg.org/gruf/go-errors/v2" + "codeberg.org/gruf/go-runners" "github.com/disintegration/imaging" "github.com/h2non/filetype" terminator "github.com/superseriousbusiness/exif-terminator" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/uris" ) @@ -40,12 +40,13 @@ import ( // various functions for retrieving data from the process. type ProcessingMedia struct { media *gtsmodel.MediaAttachment // processing media attachment details - recache bool // recaching existing (uncached) media dataFn DataFunc // load-data function, returns media stream postFn PostDataCallbackFunc // post data callback function - err error // error encountered during processing - manager *manager // manager instance (access to db / storage) - once sync.Once // once ensures processing only occurs once + recache bool // recaching existing (uncached) media + done bool // done is set when process finishes with non ctx canceled type error + proc runners.Processor // proc helps synchronize only a singular running processing instance + err error // error stores permanent error value when done + mgr *manager // mgr instance (access to db / storage) } // AttachmentID returns the ID of the underlying media attachment without blocking processing. @@ -53,58 +54,90 @@ func (p *ProcessingMedia) AttachmentID() string { return p.media.ID // immutable, safe outside mutex. } -// LoadAttachment blocks until the thumbnail and fullsize content -// has been processed, and then returns the completed attachment. +// LoadAttachment blocks until the thumbnail and fullsize content has been processed, and then returns the completed attachment. func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAttachment, error) { - // only process once. - p.once.Do(func() { - var err error + // Attempt to load synchronously. + media, done, err := p.load(ctx) + + if err == nil { + // No issue, return media. + return media, nil + } + + if !done { + // Provided context was cancelled, e.g. request cancelled + // early. Queue this item for asynchronous processing. + log.Warnf("reprocessing media %s after canceled ctx", p.media.ID) + go p.mgr.state.Workers.Media.Enqueue(p.Process) + } + + return nil, err +} + +// Process allows the receiving object to fit the runners.WorkerFunc signature. It performs a (blocking) load and logs on error. +func (p *ProcessingMedia) Process(ctx context.Context) { + if _, _, err := p.load(ctx); err != nil { + log.Errorf("error processing media: %v", err) + } +} + +// load performs a concurrency-safe load of ProcessingMedia, only marking itself as complete when returned error is NOT a context cancel. +func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment, bool, error) { + var ( + done bool + err error + ) + + err = p.proc.Process(func() error { + if p.done { + // Already proc'd. + return p.err + } defer func() { - if r := recover(); r != nil { - if err != nil { - rOld := r // wrap the panic so we don't lose existing returned error - r = fmt.Errorf("panic occured after error %q: %v", err.Error(), rOld) - } - - // Catch any panics and wrap as error. - err = fmt.Errorf("caught panic: %v", r) + // This is only done when ctx NOT cancelled. + done = err == nil || !errors.Is(err, + context.Canceled, + context.DeadlineExceeded, + ) + + if !done { + return } - if err != nil { - // Store error. - p.err = err - } + // Store final values. + p.done = true + p.err = err }() // Attempt to store media and calculate // full-size media attachment details. if err = p.store(ctx); err != nil { - return + return err } // Finish processing by reloading media into // memory to get dimension and generate a thumb. if err = p.finish(ctx); err != nil { - return + return err } if p.recache { // Existing attachment we're recaching, so only need to update. - err = p.manager.db.UpdateByID(ctx, p.media, p.media.ID) - return + err = p.mgr.state.DB.UpdateByID(ctx, p.media, p.media.ID) + return err } // New attachment, first time caching. - err = p.manager.db.Put(ctx, p.media) - return //nolint shutup linter i like this here + err = p.mgr.state.DB.Put(ctx, p.media) + return err }) - if p.err != nil { - return nil, p.err + if err != nil { + return nil, done, err } - return p.media, nil + return p.media, done, nil } // store calls the data function attached to p if it hasn't been called yet, @@ -186,17 +219,17 @@ func (p *ProcessingMedia) store(ctx context.Context) error { ) // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.media.File.Path); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.media.File.Path); have { log.Warnf("media already exists at storage path: %s", p.media.File.Path) // Attempt to remove existing media at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.media.File.Path); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.media.File.Path); err != nil { return fmt.Errorf("error removing media from storage: %v", err) } } // Write the final image reader stream to our storage. - sz, err = p.manager.storage.PutStream(ctx, p.media.File.Path, r) + sz, err = p.mgr.state.Storage.PutStream(ctx, p.media.File.Path, r) if err != nil { return fmt.Errorf("error writing media to storage: %w", err) } @@ -221,7 +254,7 @@ func (p *ProcessingMedia) store(ctx context.Context) error { func (p *ProcessingMedia) finish(ctx context.Context) error { // Fetch a stream to the original file in storage. - rc, err := p.manager.storage.GetStream(ctx, p.media.File.Path) + rc, err := p.mgr.state.Storage.GetStream(ctx, p.media.File.Path) if err != nil { return fmt.Errorf("error loading file from storage: %w", err) } @@ -299,11 +332,11 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { p.media.Blurhash = hash // This shouldn't already exist, but we do a check as it's worth logging. - if have, _ := p.manager.storage.Has(ctx, p.media.Thumbnail.Path); have { + if have, _ := p.mgr.state.Storage.Has(ctx, p.media.Thumbnail.Path); have { log.Warnf("thumbnail already exists at storage path: %s", p.media.Thumbnail.Path) // Attempt to remove existing thumbnail at storage path (might be broken / out-of-date) - if err := p.manager.storage.Delete(ctx, p.media.Thumbnail.Path); err != nil { + if err := p.mgr.state.Storage.Delete(ctx, p.media.Thumbnail.Path); err != nil { return fmt.Errorf("error removing thumbnail from storage: %v", err) } } @@ -314,7 +347,7 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { }) // Stream-encode the JPEG thumbnail image into storage. - sz, err := p.manager.storage.PutStream(ctx, p.media.Thumbnail.Path, enc) + sz, err := p.mgr.state.Storage.PutStream(ctx, p.media.Thumbnail.Path, enc) if err != nil { return fmt.Errorf("error stream-encoding thumbnail to storage: %w", err) } @@ -346,107 +379,3 @@ func (p *ProcessingMedia) finish(ctx context.Context) error { return nil } - -func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { - id, err := id.NewRandomULID() - if err != nil { - return nil, err - } - - avatar := false - header := false - cached := false - - // populate initial fields on the media attachment -- some of these will be overwritten as we proceed - attachment := >smodel.MediaAttachment{ - ID: id, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - StatusID: "", - URL: "", // we don't know yet because it depends on the uncalled DataFunc - RemoteURL: "", - Type: gtsmodel.FileTypeUnknown, // we don't know yet because it depends on the uncalled DataFunc - FileMeta: gtsmodel.FileMeta{}, - AccountID: accountID, - Description: "", - ScheduledStatusID: "", - Blurhash: "", - Processing: gtsmodel.ProcessingStatusReceived, - File: gtsmodel.File{UpdatedAt: time.Now()}, - Thumbnail: gtsmodel.Thumbnail{UpdatedAt: time.Now()}, - Avatar: &avatar, - Header: &header, - Cached: &cached, - } - - // check if we have additional info to add to the attachment, - // and overwrite some of the attachment fields if so - if ai != nil { - if ai.CreatedAt != nil { - attachment.CreatedAt = *ai.CreatedAt - } - - if ai.StatusID != nil { - attachment.StatusID = *ai.StatusID - } - - if ai.RemoteURL != nil { - attachment.RemoteURL = *ai.RemoteURL - } - - if ai.Description != nil { - attachment.Description = *ai.Description - } - - if ai.ScheduledStatusID != nil { - attachment.ScheduledStatusID = *ai.ScheduledStatusID - } - - if ai.Blurhash != nil { - attachment.Blurhash = *ai.Blurhash - } - - if ai.Avatar != nil { - attachment.Avatar = ai.Avatar - } - - if ai.Header != nil { - attachment.Header = ai.Header - } - - if ai.FocusX != nil { - attachment.FileMeta.Focus.X = *ai.FocusX - } - - if ai.FocusY != nil { - attachment.FileMeta.Focus.Y = *ai.FocusY - } - } - - processingMedia := &ProcessingMedia{ - media: attachment, - dataFn: data, - postFn: postData, - manager: m, - } - - return processingMedia, nil -} - -func (m *manager) preProcessRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, id string) (*ProcessingMedia, error) { - // get the existing attachment from database. - attachment, err := m.db.GetAttachmentByID(ctx, id) - if err != nil { - return nil, err - } - - processingMedia := &ProcessingMedia{ - media: attachment, - dataFn: data, - postFn: postData, - manager: m, - recache: true, // indicate it's a recache - } - - return processingMedia, nil -} diff --git a/internal/media/prune.go b/internal/media/prune.go index 7335feb7a..3509e249d 100644 --- a/internal/media/prune.go +++ b/internal/media/prune.go @@ -72,7 +72,7 @@ func (m *manager) PruneAll(ctx context.Context, mediaCacheRemoteDays int, blocki log.Infof("pruned %d orphaned media", pruned) } - if err := m.storage.Storage.Clean(innerCtx); err != nil { + if err := m.state.Storage.Storage.Clean(innerCtx); err != nil { errs = append(errs, fmt.Sprintf("error cleaning storage: (%s)", err)) } else { log.Info("cleaned storage") @@ -116,7 +116,7 @@ func (m *manager) PruneUnusedRemote(ctx context.Context, dry bool) (int, error) } } - for attachments, err = m.db.GetAvatarsAndHeaders(ctx, maxID, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.db.GetAvatarsAndHeaders(ctx, maxID, selectPruneLimit) { + for attachments, err = m.state.DB.GetAvatarsAndHeaders(ctx, maxID, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.state.DB.GetAvatarsAndHeaders(ctx, maxID, selectPruneLimit) { maxID = attachments[len(attachments)-1].ID // use the id of the last attachment in the slice as the next 'maxID' value // Prune each attachment that meets one of the following criteria: @@ -157,7 +157,7 @@ func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) { return false } - iterator, err := m.storage.Iterator(ctx, match) // make sure this iterator is always released + iterator, err := m.state.Storage.Iterator(ctx, match) // make sure this iterator is always released if err != nil { return 0, fmt.Errorf("PruneOrphaned: error getting storage iterator: %w", err) } @@ -172,7 +172,7 @@ func (m *manager) PruneOrphaned(ctx context.Context, dry bool) (int, error) { // Emojis are stored under the instance account, // so we need the ID of the instance account for // the next part. - instanceAccount, err := m.db.GetInstanceAccount(ctx, "") + instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "") if err != nil { iterator.Release() return 0, fmt.Errorf("PruneOrphaned: error getting instance account: %w", err) @@ -223,7 +223,7 @@ func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID st // Look for keys in storage that we don't have an attachment for. switch Type(mediaType) { case TypeAttachment, TypeHeader, TypeAvatar: - if _, err := m.db.GetAttachmentByID(ctx, mediaID); err != nil { + if _, err := m.state.DB.GetAttachmentByID(ctx, mediaID); err != nil { if !errors.Is(err, db.ErrNoEntries) { return false, fmt.Errorf("error calling GetAttachmentByID: %w", err) } @@ -234,7 +234,7 @@ func (m *manager) orphaned(ctx context.Context, key string, instanceAccountID st // the MEDIA_ID part of the key for emojis will not necessarily correspond // to the file that's currently being used as the emoji image. staticURL := uris.GenerateURIForAttachment(instanceAccountID, string(TypeEmoji), string(SizeStatic), mediaID, mimePng) - if _, err := m.db.GetEmojiByStaticURL(ctx, staticURL); err != nil { + if _, err := m.state.DB.GetEmojiByStaticURL(ctx, staticURL); err != nil { if !errors.Is(err, db.ErrNoEntries) { return false, fmt.Errorf("error calling GetEmojiByStaticURL: %w", err) } @@ -254,7 +254,7 @@ func (m *manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool if dry { // Dry run, just count eligible entries without removing them. - return m.db.CountRemoteOlderThan(ctx, olderThan) + return m.state.DB.CountRemoteOlderThan(ctx, olderThan) } var ( @@ -263,7 +263,7 @@ func (m *manager) UncacheRemote(ctx context.Context, olderThanDays int, dry bool err error ) - for attachments, err = m.db.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.db.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit) { + for attachments, err = m.state.DB.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.state.DB.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit) { olderThan = attachments[len(attachments)-1].CreatedAt // use the created time of the last attachment in the slice as the next 'olderThan' value for _, attachment := range attachments { @@ -287,7 +287,7 @@ func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) { if dry { // Dry run, just count eligible entries without removing them. - return m.db.CountLocalUnattachedOlderThan(ctx, olderThan) + return m.state.DB.CountLocalUnattachedOlderThan(ctx, olderThan) } var ( @@ -296,7 +296,7 @@ func (m *manager) PruneUnusedLocal(ctx context.Context, dry bool) (int, error) { err error ) - for attachments, err = m.db.GetLocalUnattachedOlderThan(ctx, olderThan, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.db.GetLocalUnattachedOlderThan(ctx, olderThan, selectPruneLimit) { + for attachments, err = m.state.DB.GetLocalUnattachedOlderThan(ctx, olderThan, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.state.DB.GetLocalUnattachedOlderThan(ctx, olderThan, selectPruneLimit) { olderThan = attachments[len(attachments)-1].CreatedAt // use the created time of the last attachment in the slice as the next 'olderThan' value for _, attachment := range attachments { @@ -325,7 +325,7 @@ func (m *manager) deleteAttachment(ctx context.Context, attachment *gtsmodel.Med } // Delete attachment completely. - return m.db.DeleteByID(ctx, attachment.ID, attachment) + return m.state.DB.DeleteByID(ctx, attachment.ID, attachment) } func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.MediaAttachment) error { @@ -337,14 +337,14 @@ func (m *manager) uncacheAttachment(ctx context.Context, attachment *gtsmodel.Me attachment.UpdatedAt = time.Now() cached := false attachment.Cached = &cached - return m.db.UpdateByID(ctx, attachment, attachment.ID, "updated_at", "cached") + return m.state.DB.UpdateByID(ctx, attachment, attachment.ID, "updated_at", "cached") } func (m *manager) removeFiles(ctx context.Context, keys ...string) error { errs := make(gtserror.MultiError, 0, len(keys)) for _, key := range keys { - if err := m.storage.Delete(ctx, key); err != nil && !errors.Is(err, storage.ErrNotFound) { + if err := m.state.Storage.Delete(ctx, key); err != nil && !errors.Is(err, storage.ErrNotFound) { errs = append(errs, "storage error removing "+key+": "+err.Error()) } } diff --git a/internal/media/prune_test.go b/internal/media/prune_test.go index ed040913f..d96c60ef5 100644 --- a/internal/media/prune_test.go +++ b/internal/media/prune_test.go @@ -313,7 +313,7 @@ func (suite *PruneTestSuite) TestUncacheAndRecache() { testStatusAttachment, testHeader, } { - processingRecache, err := suite.manager.RecacheMedia(ctx, data, nil, original.ID) + processingRecache, err := suite.manager.PreProcessMediaRecache(ctx, data, nil, original.ID) suite.NoError(err) // synchronously load the recached attachment diff --git a/internal/media/refetch.go b/internal/media/refetch.go index e29b4587f..3d572e4b9 100644 --- a/internal/media/refetch.go +++ b/internal/media/refetch.go @@ -47,7 +47,7 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM // page through emojis 20 at a time, looking for those with missing images for { // Fetch next block of emojis from database - emojis, err := m.db.GetEmojis(ctx, domain, false, true, "", maxShortcodeDomain, "", 20) + emojis, err := m.state.DB.GetEmojis(ctx, domain, false, true, "", maxShortcodeDomain, "", 20) if err != nil { if !errors.Is(err, db.ErrNoEntries) { // an actual error has occurred @@ -86,7 +86,7 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM var totalRefetched int for _, emojiID := range refetchIDs { - emoji, err := m.db.GetEmojiByID(ctx, emojiID) + emoji, err := m.state.DB.GetEmojiByID(ctx, emojiID) if err != nil { // this shouldn't happen--since we know we have the emoji--so return if it does return 0, fmt.Errorf("error getting emoji %s: %w", emojiID, err) @@ -108,7 +108,7 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM return dereferenceMedia(ctx, emojiImageIRI) } - processingEmoji, err := m.ProcessEmoji(ctx, dataFunc, nil, emoji.Shortcode, emoji.ID, emoji.URI, &AdditionalEmojiInfo{ + processingEmoji, err := m.PreProcessEmoji(ctx, dataFunc, nil, emoji.Shortcode, emoji.ID, emoji.URI, &AdditionalEmojiInfo{ Domain: &emoji.Domain, ImageRemoteURL: &emoji.ImageRemoteURL, ImageStaticRemoteURL: &emoji.ImageStaticRemoteURL, @@ -133,13 +133,13 @@ func (m *manager) RefetchEmojis(ctx context.Context, domain string, dereferenceM } func (m *manager) emojiRequiresRefetch(ctx context.Context, emoji *gtsmodel.Emoji) (bool, error) { - if has, err := m.storage.Has(ctx, emoji.ImagePath); err != nil { + if has, err := m.state.Storage.Has(ctx, emoji.ImagePath); err != nil { return false, err } else if !has { return true, nil } - if has, err := m.storage.Has(ctx, emoji.ImageStaticPath); err != nil { + if has, err := m.state.Storage.Has(ctx, emoji.ImageStaticPath); err != nil { return false, err } else if !has { return true, nil |