diff options
author | 2023-02-13 18:40:48 +0000 | |
---|---|---|
committer | 2023-02-13 18:40:48 +0000 | |
commit | acc95923da555b2bf17a5638e62e533218c5840a (patch) | |
tree | 7df5d0636137efa5b49298a8f0ced81d35767a5b /internal/media/manager.go | |
parent | [docs] move federating with gotosocial documentation into single file (#1494) (diff) | |
download | gotosocial-acc95923da555b2bf17a5638e62e533218c5840a.tar.xz |
[performance] processing media and scheduled jobs improvements (#1482)
* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling
* improved code comment
* fix worker tryUntil function, update go-runners/go-sched
* make preprocess functions package public, use these where possible to stop doubled up processing
* remove separate emoji worker pool
* limit calls to time.Now() during media preprocessing
* use Processor{} to manage singular runtime of processing media
* ensure workers get started when media manager is used
* improved error setting in processing media, fix media test
* port changes from processingmedia to processing emoji
* finish code commenting
* finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced
* linterrrrrrrrrrrrrrrr
---------
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/media/manager.go')
-rw-r--r-- | internal/media/manager.go | 358 |
1 files changed, 290 insertions, 68 deletions
diff --git a/internal/media/manager.go b/internal/media/manager.go index b770a7dcd..ba89aff13 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -20,11 +20,19 @@ package media import ( "context" + "errors" "fmt" + "time" - "github.com/superseriousbusiness/gotosocial/internal/concurrency" - "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/storage" + "codeberg.org/gruf/go-runners" + "codeberg.org/gruf/go-sched" + "codeberg.org/gruf/go-store/v2/storage" + "github.com/superseriousbusiness/gotosocial/internal/config" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/uris" ) var SupportedMIMETypes = []string{ @@ -42,16 +50,11 @@ var SupportedEmojiMIMETypes = []string{ // Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs. type Manager interface { - // Stop stops the underlying worker pool of the manager. It should be called - // when closing GoToSocial in order to cleanly finish any in-progress jobs. - // It will block until workers are finished processing. - Stop() error - /* PROCESSING FUNCTIONS */ - // ProcessMedia begins the process of decoding and storing the given data as an attachment. + // PreProcessMedia begins the process of decoding and storing the given data as an attachment. // It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting // the finished media, thumbnail, attachment, etc. // @@ -63,8 +66,19 @@ type Manager interface { // accountID should be the account that the media belongs to. // // ai is optional and can be nil. Any additional information about the attachment provided will be put in the database. + // + // Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed. + PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) + + // PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote. + // + // Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed. + PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) + + // ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue. ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) - // ProcessEmoji begins the process of decoding and storing the given data as an emoji. + + // PreProcessEmoji begins the process of decoding and storing the given data as an emoji. // It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting // the finished media, thumbnail, attachment, etc. // @@ -81,10 +95,11 @@ type Manager interface { // // ai is optional and can be nil. Any additional information about the emoji provided will be put in the database. // - // If refresh is true, this indicates that the emoji image has changed and should be updated. + // Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed. + PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) + + // ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue. ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) - // RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote. - RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) /* PRUNING/UNCACHING FUNCTIONS @@ -139,11 +154,7 @@ type Manager interface { } type manager struct { - db db.DB - storage *storage.Driver - emojiWorker *concurrency.WorkerPool[*ProcessingEmoji] - mediaWorker *concurrency.WorkerPool[*ProcessingMedia] - stopCronJobs func() error + state *state.State } // NewManager returns a media manager with the given db and underlying storage. @@ -152,88 +163,299 @@ type manager struct { // a limited number of media will be processed in parallel. The numbers of workers // is determined from the $GOMAXPROCS environment variable (usually no. CPU cores). // See internal/concurrency.NewWorkerPool() documentation for further information. -func NewManager(database db.DB, storage *storage.Driver) (Manager, error) { - m := &manager{ - db: database, - storage: storage, +func NewManager(state *state.State) Manager { + m := &manager{state: state} + scheduleCleanupJobs(m) + return m +} + +func (m *manager) PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { + id, err := id.NewRandomULID() + if err != nil { + return nil, err } - // Prepare the media worker pool. - m.mediaWorker = concurrency.NewWorkerPool[*ProcessingMedia](-1, 10) - m.mediaWorker.SetProcessor(func(ctx context.Context, media *ProcessingMedia) error { - if _, err := media.LoadAttachment(ctx); err != nil { - return fmt.Errorf("error loading media %s: %v", media.AttachmentID(), err) + avatar := false + header := false + cached := false + now := time.Now() + + // populate initial fields on the media attachment -- some of these will be overwritten as we proceed + attachment := >smodel.MediaAttachment{ + ID: id, + CreatedAt: now, + UpdatedAt: now, + StatusID: "", + URL: "", // we don't know yet because it depends on the uncalled DataFunc + RemoteURL: "", + Type: gtsmodel.FileTypeUnknown, // we don't know yet because it depends on the uncalled DataFunc + FileMeta: gtsmodel.FileMeta{}, + AccountID: accountID, + Description: "", + ScheduledStatusID: "", + Blurhash: "", + Processing: gtsmodel.ProcessingStatusReceived, + File: gtsmodel.File{UpdatedAt: now}, + Thumbnail: gtsmodel.Thumbnail{UpdatedAt: now}, + Avatar: &avatar, + Header: &header, + Cached: &cached, + } + + // check if we have additional info to add to the attachment, + // and overwrite some of the attachment fields if so + if ai != nil { + if ai.CreatedAt != nil { + attachment.CreatedAt = *ai.CreatedAt } - return nil - }) - // Prepare the emoji worker pool. - m.emojiWorker = concurrency.NewWorkerPool[*ProcessingEmoji](-1, 10) - m.emojiWorker.SetProcessor(func(ctx context.Context, emoji *ProcessingEmoji) error { - if _, err := emoji.LoadEmoji(ctx); err != nil { - return fmt.Errorf("error loading emoji %s: %v", emoji.EmojiID(), err) + if ai.StatusID != nil { + attachment.StatusID = *ai.StatusID } - return nil - }) - // Start the worker pools. - if err := m.mediaWorker.Start(); err != nil { - return nil, err + if ai.RemoteURL != nil { + attachment.RemoteURL = *ai.RemoteURL + } + + if ai.Description != nil { + attachment.Description = *ai.Description + } + + if ai.ScheduledStatusID != nil { + attachment.ScheduledStatusID = *ai.ScheduledStatusID + } + + if ai.Blurhash != nil { + attachment.Blurhash = *ai.Blurhash + } + + if ai.Avatar != nil { + attachment.Avatar = ai.Avatar + } + + if ai.Header != nil { + attachment.Header = ai.Header + } + + if ai.FocusX != nil { + attachment.FileMeta.Focus.X = *ai.FocusX + } + + if ai.FocusY != nil { + attachment.FileMeta.Focus.Y = *ai.FocusY + } } - if err := m.emojiWorker.Start(); err != nil { - return nil, err + + processingMedia := &ProcessingMedia{ + media: attachment, + dataFn: data, + postFn: postData, + mgr: m, } - // Schedule cron job(s) for clean up. - if err := scheduleCleanup(m); err != nil { + return processingMedia, nil +} + +func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) { + // get the existing attachment from database. + attachment, err := m.state.DB.GetAttachmentByID(ctx, attachmentID) + if err != nil { return nil, err } - return m, nil + processingMedia := &ProcessingMedia{ + media: attachment, + dataFn: data, + postFn: postData, + recache: true, // indicate it's a recache + mgr: m, + } + + return processingMedia, nil } func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) { - processingMedia, err := m.preProcessMedia(ctx, data, postData, accountID, ai) + // Create a new processing media object for this media request. + media, err := m.PreProcessMedia(ctx, data, postData, accountID, ai) if err != nil { return nil, err } - m.mediaWorker.Queue(processingMedia) - return processingMedia, nil + + // Attempt to add this media processing item to the worker queue. + _ = m.state.Workers.Media.MustEnqueueCtx(ctx, media.Process) + + return media, nil } -func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { - processingEmoji, err := m.preProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh) +func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { + instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "") if err != nil { - return nil, err + return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err) + } + + var ( + newPathID string + emoji *gtsmodel.Emoji + now = time.Now() + ) + + if refresh { + emoji, err = m.state.DB.GetEmojiByID(ctx, emojiID) + if err != nil { + return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err) + } + + // if this is a refresh, we will end up with new images + // stored for this emoji, so we can use the postData function + // to perform clean up of the old images from storage + originalPostData := postData + originalImagePath := emoji.ImagePath + originalImageStaticPath := emoji.ImageStaticPath + postData = func(innerCtx context.Context) error { + // trigger the original postData function if it was provided + if originalPostData != nil { + if err := originalPostData(innerCtx); err != nil { + return err + } + } + + l := log.WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain) + l.Debug("postData: cleaning up old emoji files for refreshed emoji") + if err := m.state.Storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) { + l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err) + } + if err := m.state.Storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) { + l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err) + } + + return nil + } + + newPathID, err = id.NewRandomULID() + if err != nil { + return nil, fmt.Errorf("preProcessEmoji: error generating alternateID for emoji refresh: %s", err) + } + + // store + serve static image at new path ID + emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng) + emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, newPathID, mimePng) + + emoji.Shortcode = shortcode + emoji.URI = uri + } else { + disabled := false + visibleInPicker := true + + // populate initial fields on the emoji -- some of these will be overwritten as we proceed + emoji = >smodel.Emoji{ + ID: emojiID, + CreatedAt: now, + Shortcode: shortcode, + Domain: "", // assume our own domain unless told otherwise + ImageRemoteURL: "", + ImageStaticRemoteURL: "", + ImageURL: "", // we don't know yet + ImageStaticURL: uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png + ImagePath: "", // we don't know yet + ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png + ImageContentType: "", // we don't know yet + ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png + ImageFileSize: 0, + ImageStaticFileSize: 0, + Disabled: &disabled, + URI: uri, + VisibleInPicker: &visibleInPicker, + CategoryID: "", + } + } + + emoji.ImageUpdatedAt = now + emoji.UpdatedAt = now + + // check if we have additional info to add to the emoji, + // and overwrite some of the emoji fields if so + if ai != nil { + if ai.CreatedAt != nil { + emoji.CreatedAt = *ai.CreatedAt + } + + if ai.Domain != nil { + emoji.Domain = *ai.Domain + } + + if ai.ImageRemoteURL != nil { + emoji.ImageRemoteURL = *ai.ImageRemoteURL + } + + if ai.ImageStaticRemoteURL != nil { + emoji.ImageStaticRemoteURL = *ai.ImageStaticRemoteURL + } + + if ai.Disabled != nil { + emoji.Disabled = ai.Disabled + } + + if ai.VisibleInPicker != nil { + emoji.VisibleInPicker = ai.VisibleInPicker + } + + if ai.CategoryID != nil { + emoji.CategoryID = *ai.CategoryID + } } - m.emojiWorker.Queue(processingEmoji) + + processingEmoji := &ProcessingEmoji{ + instAccID: instanceAccount.ID, + emoji: emoji, + refresh: refresh, + newPathID: newPathID, + dataFn: data, + postFn: postData, + mgr: m, + } + return processingEmoji, nil } -func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) { - processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID) +func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) { + // Create a new processing emoji object for this emoji request. + emoji, err := m.PreProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh) if err != nil { return nil, err } - m.mediaWorker.Queue(processingRecache) - return processingRecache, nil + + // Attempt to add this emoji processing item to the worker queue. + _ = m.state.Workers.Media.MustEnqueueCtx(ctx, emoji.Process) + + return emoji, nil } -func (m *manager) Stop() error { - // Stop worker pools. - mediaErr := m.mediaWorker.Stop() - emojiErr := m.emojiWorker.Stop() +func scheduleCleanupJobs(m *manager) { + const day = time.Hour * 24 - var cronErr error - if m.stopCronJobs != nil { - cronErr = m.stopCronJobs() - } + // Calculate closest midnight. + now := time.Now() + midnight := now.Round(day) - if mediaErr != nil { - return mediaErr - } else if emojiErr != nil { - return emojiErr + if midnight.Before(now) { + // since <= 11:59am rounds down. + midnight = midnight.Add(day) } - return cronErr + // Get ctx associated with scheduler run state. + done := m.state.Workers.Scheduler.Done() + doneCtx := runners.CancelCtx(done) + + // TODO: we'll need to do some thinking to make these + // jobs restartable if we want to implement reloads in + // the future that make call to Workers.Stop() -> Workers.Start(). + + // Schedule the PruneAll task to execute every day at midnight. + m.state.Workers.Scheduler.Schedule(sched.NewJob(func(now time.Time) { + err := m.PruneAll(doneCtx, config.GetMediaRemoteCacheDays(), true) + if err != nil { + log.Errorf("error during prune: %v", err) + } + log.Infof("finished pruning all in %s", time.Since(now)) + }).EveryAt(midnight, day)) } |