summaryrefslogtreecommitdiff
path: root/internal/media/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/media/manager.go')
-rw-r--r--internal/media/manager.go358
1 files changed, 290 insertions, 68 deletions
diff --git a/internal/media/manager.go b/internal/media/manager.go
index b770a7dcd..ba89aff13 100644
--- a/internal/media/manager.go
+++ b/internal/media/manager.go
@@ -20,11 +20,19 @@ package media
import (
"context"
+ "errors"
"fmt"
+ "time"
- "github.com/superseriousbusiness/gotosocial/internal/concurrency"
- "github.com/superseriousbusiness/gotosocial/internal/db"
- "github.com/superseriousbusiness/gotosocial/internal/storage"
+ "codeberg.org/gruf/go-runners"
+ "codeberg.org/gruf/go-sched"
+ "codeberg.org/gruf/go-store/v2/storage"
+ "github.com/superseriousbusiness/gotosocial/internal/config"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/uris"
)
var SupportedMIMETypes = []string{
@@ -42,16 +50,11 @@ var SupportedEmojiMIMETypes = []string{
// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
type Manager interface {
- // Stop stops the underlying worker pool of the manager. It should be called
- // when closing GoToSocial in order to cleanly finish any in-progress jobs.
- // It will block until workers are finished processing.
- Stop() error
-
/*
PROCESSING FUNCTIONS
*/
- // ProcessMedia begins the process of decoding and storing the given data as an attachment.
+ // PreProcessMedia begins the process of decoding and storing the given data as an attachment.
// It will return a pointer to a ProcessingMedia struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
@@ -63,8 +66,19 @@ type Manager interface {
// accountID should be the account that the media belongs to.
//
// ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
+ //
+ // Note: unlike ProcessMedia, this will NOT queue the media to be asynchronously processed.
+ PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
+
+ // PreProcessMediaRecache refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
+ //
+ // Note: unlike ProcessMedia, this will NOT queue the media to be asychronously processed.
+ PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error)
+
+ // ProcessMedia will call PreProcessMedia, followed by queuing the media to be processing in the media worker queue.
ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
- // ProcessEmoji begins the process of decoding and storing the given data as an emoji.
+
+ // PreProcessEmoji begins the process of decoding and storing the given data as an emoji.
// It will return a pointer to a ProcessingEmoji struct upon which further actions can be performed, such as getting
// the finished media, thumbnail, attachment, etc.
//
@@ -81,10 +95,11 @@ type Manager interface {
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
//
- // If refresh is true, this indicates that the emoji image has changed and should be updated.
+ // Note: unlike ProcessEmoji, this will NOT queue the emoji to be asynchronously processed.
+ PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error)
+
+ // ProcessEmoji will call PreProcessEmoji, followed by queuing the emoji to be processing in the emoji worker queue.
ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error)
- // RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
- RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error)
/*
PRUNING/UNCACHING FUNCTIONS
@@ -139,11 +154,7 @@ type Manager interface {
}
type manager struct {
- db db.DB
- storage *storage.Driver
- emojiWorker *concurrency.WorkerPool[*ProcessingEmoji]
- mediaWorker *concurrency.WorkerPool[*ProcessingMedia]
- stopCronJobs func() error
+ state *state.State
}
// NewManager returns a media manager with the given db and underlying storage.
@@ -152,88 +163,299 @@ type manager struct {
// a limited number of media will be processed in parallel. The numbers of workers
// is determined from the $GOMAXPROCS environment variable (usually no. CPU cores).
// See internal/concurrency.NewWorkerPool() documentation for further information.
-func NewManager(database db.DB, storage *storage.Driver) (Manager, error) {
- m := &manager{
- db: database,
- storage: storage,
+func NewManager(state *state.State) Manager {
+ m := &manager{state: state}
+ scheduleCleanupJobs(m)
+ return m
+}
+
+func (m *manager) PreProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
+ id, err := id.NewRandomULID()
+ if err != nil {
+ return nil, err
}
- // Prepare the media worker pool.
- m.mediaWorker = concurrency.NewWorkerPool[*ProcessingMedia](-1, 10)
- m.mediaWorker.SetProcessor(func(ctx context.Context, media *ProcessingMedia) error {
- if _, err := media.LoadAttachment(ctx); err != nil {
- return fmt.Errorf("error loading media %s: %v", media.AttachmentID(), err)
+ avatar := false
+ header := false
+ cached := false
+ now := time.Now()
+
+ // populate initial fields on the media attachment -- some of these will be overwritten as we proceed
+ attachment := &gtsmodel.MediaAttachment{
+ ID: id,
+ CreatedAt: now,
+ UpdatedAt: now,
+ StatusID: "",
+ URL: "", // we don't know yet because it depends on the uncalled DataFunc
+ RemoteURL: "",
+ Type: gtsmodel.FileTypeUnknown, // we don't know yet because it depends on the uncalled DataFunc
+ FileMeta: gtsmodel.FileMeta{},
+ AccountID: accountID,
+ Description: "",
+ ScheduledStatusID: "",
+ Blurhash: "",
+ Processing: gtsmodel.ProcessingStatusReceived,
+ File: gtsmodel.File{UpdatedAt: now},
+ Thumbnail: gtsmodel.Thumbnail{UpdatedAt: now},
+ Avatar: &avatar,
+ Header: &header,
+ Cached: &cached,
+ }
+
+ // check if we have additional info to add to the attachment,
+ // and overwrite some of the attachment fields if so
+ if ai != nil {
+ if ai.CreatedAt != nil {
+ attachment.CreatedAt = *ai.CreatedAt
}
- return nil
- })
- // Prepare the emoji worker pool.
- m.emojiWorker = concurrency.NewWorkerPool[*ProcessingEmoji](-1, 10)
- m.emojiWorker.SetProcessor(func(ctx context.Context, emoji *ProcessingEmoji) error {
- if _, err := emoji.LoadEmoji(ctx); err != nil {
- return fmt.Errorf("error loading emoji %s: %v", emoji.EmojiID(), err)
+ if ai.StatusID != nil {
+ attachment.StatusID = *ai.StatusID
}
- return nil
- })
- // Start the worker pools.
- if err := m.mediaWorker.Start(); err != nil {
- return nil, err
+ if ai.RemoteURL != nil {
+ attachment.RemoteURL = *ai.RemoteURL
+ }
+
+ if ai.Description != nil {
+ attachment.Description = *ai.Description
+ }
+
+ if ai.ScheduledStatusID != nil {
+ attachment.ScheduledStatusID = *ai.ScheduledStatusID
+ }
+
+ if ai.Blurhash != nil {
+ attachment.Blurhash = *ai.Blurhash
+ }
+
+ if ai.Avatar != nil {
+ attachment.Avatar = ai.Avatar
+ }
+
+ if ai.Header != nil {
+ attachment.Header = ai.Header
+ }
+
+ if ai.FocusX != nil {
+ attachment.FileMeta.Focus.X = *ai.FocusX
+ }
+
+ if ai.FocusY != nil {
+ attachment.FileMeta.Focus.Y = *ai.FocusY
+ }
}
- if err := m.emojiWorker.Start(); err != nil {
- return nil, err
+
+ processingMedia := &ProcessingMedia{
+ media: attachment,
+ dataFn: data,
+ postFn: postData,
+ mgr: m,
}
- // Schedule cron job(s) for clean up.
- if err := scheduleCleanup(m); err != nil {
+ return processingMedia, nil
+}
+
+func (m *manager) PreProcessMediaRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
+ // get the existing attachment from database.
+ attachment, err := m.state.DB.GetAttachmentByID(ctx, attachmentID)
+ if err != nil {
return nil, err
}
- return m, nil
+ processingMedia := &ProcessingMedia{
+ media: attachment,
+ dataFn: data,
+ postFn: postData,
+ recache: true, // indicate it's a recache
+ mgr: m,
+ }
+
+ return processingMedia, nil
}
func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
- processingMedia, err := m.preProcessMedia(ctx, data, postData, accountID, ai)
+ // Create a new processing media object for this media request.
+ media, err := m.PreProcessMedia(ctx, data, postData, accountID, ai)
if err != nil {
return nil, err
}
- m.mediaWorker.Queue(processingMedia)
- return processingMedia, nil
+
+ // Attempt to add this media processing item to the worker queue.
+ _ = m.state.Workers.Media.MustEnqueueCtx(ctx, media.Process)
+
+ return media, nil
}
-func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
- processingEmoji, err := m.preProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh)
+func (m *manager) PreProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, emojiID string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
+ instanceAccount, err := m.state.DB.GetInstanceAccount(ctx, "")
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("preProcessEmoji: error fetching this instance account from the db: %s", err)
+ }
+
+ var (
+ newPathID string
+ emoji *gtsmodel.Emoji
+ now = time.Now()
+ )
+
+ if refresh {
+ emoji, err = m.state.DB.GetEmojiByID(ctx, emojiID)
+ if err != nil {
+ return nil, fmt.Errorf("preProcessEmoji: error fetching emoji to refresh from the db: %s", err)
+ }
+
+ // if this is a refresh, we will end up with new images
+ // stored for this emoji, so we can use the postData function
+ // to perform clean up of the old images from storage
+ originalPostData := postData
+ originalImagePath := emoji.ImagePath
+ originalImageStaticPath := emoji.ImageStaticPath
+ postData = func(innerCtx context.Context) error {
+ // trigger the original postData function if it was provided
+ if originalPostData != nil {
+ if err := originalPostData(innerCtx); err != nil {
+ return err
+ }
+ }
+
+ l := log.WithField("shortcode@domain", emoji.Shortcode+"@"+emoji.Domain)
+ l.Debug("postData: cleaning up old emoji files for refreshed emoji")
+ if err := m.state.Storage.Delete(innerCtx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) {
+ l.Errorf("postData: error cleaning up old emoji image at %s for refreshed emoji: %s", originalImagePath, err)
+ }
+ if err := m.state.Storage.Delete(innerCtx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) {
+ l.Errorf("postData: error cleaning up old emoji static image at %s for refreshed emoji: %s", originalImageStaticPath, err)
+ }
+
+ return nil
+ }
+
+ newPathID, err = id.NewRandomULID()
+ if err != nil {
+ return nil, fmt.Errorf("preProcessEmoji: error generating alternateID for emoji refresh: %s", err)
+ }
+
+ // store + serve static image at new path ID
+ emoji.ImageStaticURL = uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), newPathID, mimePng)
+ emoji.ImageStaticPath = fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, newPathID, mimePng)
+
+ emoji.Shortcode = shortcode
+ emoji.URI = uri
+ } else {
+ disabled := false
+ visibleInPicker := true
+
+ // populate initial fields on the emoji -- some of these will be overwritten as we proceed
+ emoji = &gtsmodel.Emoji{
+ ID: emojiID,
+ CreatedAt: now,
+ Shortcode: shortcode,
+ Domain: "", // assume our own domain unless told otherwise
+ ImageRemoteURL: "",
+ ImageStaticRemoteURL: "",
+ ImageURL: "", // we don't know yet
+ ImageStaticURL: uris.GenerateURIForAttachment(instanceAccount.ID, string(TypeEmoji), string(SizeStatic), emojiID, mimePng), // all static emojis are encoded as png
+ ImagePath: "", // we don't know yet
+ ImageStaticPath: fmt.Sprintf("%s/%s/%s/%s.%s", instanceAccount.ID, TypeEmoji, SizeStatic, emojiID, mimePng), // all static emojis are encoded as png
+ ImageContentType: "", // we don't know yet
+ ImageStaticContentType: mimeImagePng, // all static emojis are encoded as png
+ ImageFileSize: 0,
+ ImageStaticFileSize: 0,
+ Disabled: &disabled,
+ URI: uri,
+ VisibleInPicker: &visibleInPicker,
+ CategoryID: "",
+ }
+ }
+
+ emoji.ImageUpdatedAt = now
+ emoji.UpdatedAt = now
+
+ // check if we have additional info to add to the emoji,
+ // and overwrite some of the emoji fields if so
+ if ai != nil {
+ if ai.CreatedAt != nil {
+ emoji.CreatedAt = *ai.CreatedAt
+ }
+
+ if ai.Domain != nil {
+ emoji.Domain = *ai.Domain
+ }
+
+ if ai.ImageRemoteURL != nil {
+ emoji.ImageRemoteURL = *ai.ImageRemoteURL
+ }
+
+ if ai.ImageStaticRemoteURL != nil {
+ emoji.ImageStaticRemoteURL = *ai.ImageStaticRemoteURL
+ }
+
+ if ai.Disabled != nil {
+ emoji.Disabled = ai.Disabled
+ }
+
+ if ai.VisibleInPicker != nil {
+ emoji.VisibleInPicker = ai.VisibleInPicker
+ }
+
+ if ai.CategoryID != nil {
+ emoji.CategoryID = *ai.CategoryID
+ }
}
- m.emojiWorker.Queue(processingEmoji)
+
+ processingEmoji := &ProcessingEmoji{
+ instAccID: instanceAccount.ID,
+ emoji: emoji,
+ refresh: refresh,
+ newPathID: newPathID,
+ dataFn: data,
+ postFn: postData,
+ mgr: m,
+ }
+
return processingEmoji, nil
}
-func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
- processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID)
+func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo, refresh bool) (*ProcessingEmoji, error) {
+ // Create a new processing emoji object for this emoji request.
+ emoji, err := m.PreProcessEmoji(ctx, data, postData, shortcode, id, uri, ai, refresh)
if err != nil {
return nil, err
}
- m.mediaWorker.Queue(processingRecache)
- return processingRecache, nil
+
+ // Attempt to add this emoji processing item to the worker queue.
+ _ = m.state.Workers.Media.MustEnqueueCtx(ctx, emoji.Process)
+
+ return emoji, nil
}
-func (m *manager) Stop() error {
- // Stop worker pools.
- mediaErr := m.mediaWorker.Stop()
- emojiErr := m.emojiWorker.Stop()
+func scheduleCleanupJobs(m *manager) {
+ const day = time.Hour * 24
- var cronErr error
- if m.stopCronJobs != nil {
- cronErr = m.stopCronJobs()
- }
+ // Calculate closest midnight.
+ now := time.Now()
+ midnight := now.Round(day)
- if mediaErr != nil {
- return mediaErr
- } else if emojiErr != nil {
- return emojiErr
+ if midnight.Before(now) {
+ // since <= 11:59am rounds down.
+ midnight = midnight.Add(day)
}
- return cronErr
+ // Get ctx associated with scheduler run state.
+ done := m.state.Workers.Scheduler.Done()
+ doneCtx := runners.CancelCtx(done)
+
+ // TODO: we'll need to do some thinking to make these
+ // jobs restartable if we want to implement reloads in
+ // the future that make call to Workers.Stop() -> Workers.Start().
+
+ // Schedule the PruneAll task to execute every day at midnight.
+ m.state.Workers.Scheduler.Schedule(sched.NewJob(func(now time.Time) {
+ err := m.PruneAll(doneCtx, config.GetMediaRemoteCacheDays(), true)
+ if err != nil {
+ log.Errorf("error during prune: %v", err)
+ }
+ log.Infof("finished pruning all in %s", time.Since(now))
+ }).EveryAt(midnight, day))
}