diff options
author | 2022-03-07 11:08:26 +0100 | |
---|---|---|
committer | 2022-03-07 11:08:26 +0100 | |
commit | 07727753b96d209406783e5e539725bcdafebdc7 (patch) | |
tree | b32f11cbc304d633ed0acd8f84b4c11e909bb5f3 /internal/media/manager.go | |
parent | [documentation] Creates Docker documentation and docker-compose.yaml (#416) (diff) | |
download | gotosocial-07727753b96d209406783e5e539725bcdafebdc7.tar.xz |
[feature] Clean up/uncache remote media (#407)
* Add whereNotEmptyAndNotNull
* Add GetRemoteOlderThanDays
* Add GetRemoteOlderThanDays
* Add PruneRemote to Manager interface
* Start implementing PruneRemote
* add new attachment + status to tests
* fix up and test GetRemoteOlderThan
* fix bad import
* PruneRemote: return number pruned
* add Cached column to mediaattachment
* update + test pruneRemote
* update mediaTest
* use Cached column
* upstep bun to latest version
* embed structs in mediaAttachment
* migrate mediaAttachment to new format
* don't default cached to true
* select only remote media
* update db dependencies
* step bun back to last working version
* update pruneRemote to use Cached field
* fix storage path of test attachments
* add recache logic to manager
* fix trimmed aspect ratio
* test prune and recache
* return errwithcode
* tidy up different paths for emoji vs attachment
* fix incorrect thumbnail type being stored
* expose TransportController to media processor
* implement tee-ing recached content
* add thoughts of dog to test fedi attachments
* test get remote files
* add comment on PruneRemote
* add postData cleanup to recache
* test thumbnail fetching
* add incredible diagram
* go mod tidy
* buffer pipes for recache streaming
* test for client stops reading after 1kb
* add media-remote-cache-days to config
* add cron package
* wrap logrus so it's available to cron
* start and stop cron jobs gracefully
Diffstat (limited to 'internal/media/manager.go')
-rw-r--r-- | internal/media/manager.go | 107 |
1 files changed, 100 insertions, 7 deletions
diff --git a/internal/media/manager.go b/internal/media/manager.go index 3901bae00..deb7e34f7 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -21,11 +21,16 @@ package media import ( "context" "errors" + "fmt" "runtime" + "time" "codeberg.org/gruf/go-runners" "codeberg.org/gruf/go-store/kv" + "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/db" ) @@ -61,6 +66,12 @@ type Manager interface { // // ai is optional and can be nil. Any additional information about the emoji provided will be put in the database. ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) + // RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote. + RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) + // PruneRemote prunes all remote media cached on this instance that's older than the given amount of days. + // 'Pruning' in this context means removing the locally stored data of the attachment (both thumbnail and full size), + // and setting 'cached' to false on the associated attachment. + PruneRemote(ctx context.Context, olderThanDays int) (int, error) // NumWorkers returns the total number of workers available to this manager. NumWorkers() int // QueueSize returns the total capacity of the queue. @@ -76,11 +87,12 @@ type Manager interface { } type manager struct { - db db.DB - storage *kv.KVStore - pool runners.WorkerPool - numWorkers int - queueSize int + db db.DB + storage *kv.KVStore + pool runners.WorkerPool + stopCronJobs func() error + numWorkers int + queueSize int } // NewManager returns a media manager with the given db and underlying storage. @@ -97,8 +109,10 @@ type manager struct { // For a 4 core machine, this will be 2 workers, and a queue length of 20. // For a single or 2-core machine, the media manager will get 1 worker, and a queue of length 10. func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) { - numWorkers := runtime.NumCPU() / 2 + + // configure the worker pool // make sure we always have at least 1 worker even on single-core machines + numWorkers := runtime.NumCPU() / 2 if numWorkers == 0 { numWorkers = 1 } @@ -112,11 +126,61 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) { queueSize: queueSize, } + // start the worker pool if start := m.pool.Start(); !start { return nil, errors.New("could not start worker pool") } logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", numWorkers, queueSize) + // start remote cache cleanup cronjob if configured + cacheCleanupDays := viper.GetInt(config.Keys.MediaRemoteCacheDays) + if cacheCleanupDays != 0 { + // we need a way of cancelling running jobs if the media manager is told to stop + pruneCtx, pruneCancel := context.WithCancel(context.Background()) + + // create a new cron instance and add a function to it + c := cron.New(cron.WithLogger(&logrusWrapper{})) + + pruneFunc := func() { + begin := time.Now() + pruned, err := m.PruneRemote(pruneCtx, cacheCleanupDays) + if err != nil { + logrus.Errorf("media manager: error pruning remote cache: %s", err) + return + } + logrus.Infof("media manager: pruned %d remote cache entries in %s", pruned, time.Since(begin)) + } + + // run every night + entryID, err := c.AddFunc("@midnight", pruneFunc) + if err != nil { + pruneCancel() + return nil, fmt.Errorf("error starting media manager remote cache cleanup job: %s", err) + } + + // since we're running a cron job, we should define how the manager should stop them + m.stopCronJobs = func() error { + // try to stop any jobs gracefully by waiting til they're finished + cronCtx := c.Stop() + + select { + case <-cronCtx.Done(): + logrus.Infof("media manager: cron finished jobs and stopped gracefully") + case <-time.After(1 * time.Minute): + logrus.Infof("media manager: cron didn't stop after 60 seconds, will force close") + break + } + + // whether the job is finished neatly or we had to wait a minute, cancel the context on the prune job + pruneCancel() + return nil + } + + // now start all the cron stuff we've lined up + c.Start() + logrus.Infof("started media manager remote cache cleanup job: will run next at %s", c.Entry(entryID).Next) + } + return m, nil } @@ -168,6 +232,30 @@ func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData Post return processingEmoji, nil } +func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) { + processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID) + if err != nil { + return nil, err + } + + logrus.Tracef("RecacheMedia: about to enqueue recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue()) + m.pool.Enqueue(func(innerCtx context.Context) { + select { + case <-innerCtx.Done(): + // if the inner context is done that means the worker pool is closing, so we should just return + return + default: + // start loading the media already for the caller's convenience + if _, err := processingRecache.LoadAttachment(innerCtx); err != nil { + logrus.Errorf("RecacheMedia: error processing recache with attachmentID %s: %s", processingRecache.AttachmentID(), err) + } + } + }) + logrus.Tracef("RecacheMedia: succesfully queued recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue()) + + return processingRecache, nil +} + func (m *manager) NumWorkers() int { return m.numWorkers } @@ -186,10 +274,15 @@ func (m *manager) ActiveWorkers() int { func (m *manager) Stop() error { logrus.Info("stopping media manager worker pool") - stopped := m.pool.Stop() if !stopped { return errors.New("could not stop media manager worker pool") } + + if m.stopCronJobs != nil { // only defined if cron jobs are actually running + logrus.Info("stopping media manager cache cleanup jobs") + return m.stopCronJobs() + } + return nil } |