summaryrefslogtreecommitdiff
path: root/internal/media/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/media/manager.go')
-rw-r--r--internal/media/manager.go107
1 files changed, 100 insertions, 7 deletions
diff --git a/internal/media/manager.go b/internal/media/manager.go
index 3901bae00..deb7e34f7 100644
--- a/internal/media/manager.go
+++ b/internal/media/manager.go
@@ -21,11 +21,16 @@ package media
import (
"context"
"errors"
+ "fmt"
"runtime"
+ "time"
"codeberg.org/gruf/go-runners"
"codeberg.org/gruf/go-store/kv"
+ "github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/db"
)
@@ -61,6 +66,12 @@ type Manager interface {
//
// ai is optional and can be nil. Any additional information about the emoji provided will be put in the database.
ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error)
+ // RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote.
+ RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error)
+ // PruneRemote prunes all remote media cached on this instance that's older than the given amount of days.
+ // 'Pruning' in this context means removing the locally stored data of the attachment (both thumbnail and full size),
+ // and setting 'cached' to false on the associated attachment.
+ PruneRemote(ctx context.Context, olderThanDays int) (int, error)
// NumWorkers returns the total number of workers available to this manager.
NumWorkers() int
// QueueSize returns the total capacity of the queue.
@@ -76,11 +87,12 @@ type Manager interface {
}
type manager struct {
- db db.DB
- storage *kv.KVStore
- pool runners.WorkerPool
- numWorkers int
- queueSize int
+ db db.DB
+ storage *kv.KVStore
+ pool runners.WorkerPool
+ stopCronJobs func() error
+ numWorkers int
+ queueSize int
}
// NewManager returns a media manager with the given db and underlying storage.
@@ -97,8 +109,10 @@ type manager struct {
// For a 4 core machine, this will be 2 workers, and a queue length of 20.
// For a single or 2-core machine, the media manager will get 1 worker, and a queue of length 10.
func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
- numWorkers := runtime.NumCPU() / 2
+
+ // configure the worker pool
// make sure we always have at least 1 worker even on single-core machines
+ numWorkers := runtime.NumCPU() / 2
if numWorkers == 0 {
numWorkers = 1
}
@@ -112,11 +126,61 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
queueSize: queueSize,
}
+ // start the worker pool
if start := m.pool.Start(); !start {
return nil, errors.New("could not start worker pool")
}
logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", numWorkers, queueSize)
+ // start remote cache cleanup cronjob if configured
+ cacheCleanupDays := viper.GetInt(config.Keys.MediaRemoteCacheDays)
+ if cacheCleanupDays != 0 {
+ // we need a way of cancelling running jobs if the media manager is told to stop
+ pruneCtx, pruneCancel := context.WithCancel(context.Background())
+
+ // create a new cron instance and add a function to it
+ c := cron.New(cron.WithLogger(&logrusWrapper{}))
+
+ pruneFunc := func() {
+ begin := time.Now()
+ pruned, err := m.PruneRemote(pruneCtx, cacheCleanupDays)
+ if err != nil {
+ logrus.Errorf("media manager: error pruning remote cache: %s", err)
+ return
+ }
+ logrus.Infof("media manager: pruned %d remote cache entries in %s", pruned, time.Since(begin))
+ }
+
+ // run every night
+ entryID, err := c.AddFunc("@midnight", pruneFunc)
+ if err != nil {
+ pruneCancel()
+ return nil, fmt.Errorf("error starting media manager remote cache cleanup job: %s", err)
+ }
+
+ // since we're running a cron job, we should define how the manager should stop them
+ m.stopCronJobs = func() error {
+ // try to stop any jobs gracefully by waiting til they're finished
+ cronCtx := c.Stop()
+
+ select {
+ case <-cronCtx.Done():
+ logrus.Infof("media manager: cron finished jobs and stopped gracefully")
+ case <-time.After(1 * time.Minute):
+ logrus.Infof("media manager: cron didn't stop after 60 seconds, will force close")
+ break
+ }
+
+ // whether the job is finished neatly or we had to wait a minute, cancel the context on the prune job
+ pruneCancel()
+ return nil
+ }
+
+ // now start all the cron stuff we've lined up
+ c.Start()
+ logrus.Infof("started media manager remote cache cleanup job: will run next at %s", c.Entry(entryID).Next)
+ }
+
return m, nil
}
@@ -168,6 +232,30 @@ func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData Post
return processingEmoji, nil
}
+func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) {
+ processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID)
+ if err != nil {
+ return nil, err
+ }
+
+ logrus.Tracef("RecacheMedia: about to enqueue recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue())
+ m.pool.Enqueue(func(innerCtx context.Context) {
+ select {
+ case <-innerCtx.Done():
+ // if the inner context is done that means the worker pool is closing, so we should just return
+ return
+ default:
+ // start loading the media already for the caller's convenience
+ if _, err := processingRecache.LoadAttachment(innerCtx); err != nil {
+ logrus.Errorf("RecacheMedia: error processing recache with attachmentID %s: %s", processingRecache.AttachmentID(), err)
+ }
+ }
+ })
+ logrus.Tracef("RecacheMedia: succesfully queued recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue())
+
+ return processingRecache, nil
+}
+
func (m *manager) NumWorkers() int {
return m.numWorkers
}
@@ -186,10 +274,15 @@ func (m *manager) ActiveWorkers() int {
func (m *manager) Stop() error {
logrus.Info("stopping media manager worker pool")
-
stopped := m.pool.Stop()
if !stopped {
return errors.New("could not stop media manager worker pool")
}
+
+ if m.stopCronJobs != nil { // only defined if cron jobs are actually running
+ logrus.Info("stopping media manager cache cleanup jobs")
+ return m.stopCronJobs()
+ }
+
return nil
}