diff options
author | 2022-03-07 11:08:26 +0100 | |
---|---|---|
committer | 2022-03-07 11:08:26 +0100 | |
commit | 07727753b96d209406783e5e539725bcdafebdc7 (patch) | |
tree | b32f11cbc304d633ed0acd8f84b4c11e909bb5f3 /internal/media | |
parent | [documentation] Creates Docker documentation and docker-compose.yaml (#416) (diff) | |
download | gotosocial-07727753b96d209406783e5e539725bcdafebdc7.tar.xz |
[feature] Clean up/uncache remote media (#407)
* Add whereNotEmptyAndNotNull
* Add GetRemoteOlderThanDays
* Add GetRemoteOlderThanDays
* Add PruneRemote to Manager interface
* Start implementing PruneRemote
* add new attachment + status to tests
* fix up and test GetRemoteOlderThan
* fix bad import
* PruneRemote: return number pruned
* add Cached column to mediaattachment
* update + test pruneRemote
* update mediaTest
* use Cached column
* upstep bun to latest version
* embed structs in mediaAttachment
* migrate mediaAttachment to new format
* don't default cached to true
* select only remote media
* update db dependencies
* step bun back to last working version
* update pruneRemote to use Cached field
* fix storage path of test attachments
* add recache logic to manager
* fix trimmed aspect ratio
* test prune and recache
* return errwithcode
* tidy up different paths for emoji vs attachment
* fix incorrect thumbnail type being stored
* expose TransportController to media processor
* implement tee-ing recached content
* add thoughts of dog to test fedi attachments
* test get remote files
* add comment on PruneRemote
* add postData cleanup to recache
* test thumbnail fetching
* add incredible diagram
* go mod tidy
* buffer pipes for recache streaming
* test for client stops reading after 1kb
* add media-remote-cache-days to config
* add cron package
* wrap logrus so it's available to cron
* start and stop cron jobs gracefully
Diffstat (limited to 'internal/media')
-rw-r--r-- | internal/media/manager.go | 107 | ||||
-rw-r--r-- | internal/media/manager_test.go | 2 | ||||
-rw-r--r-- | internal/media/media_test.go | 9 | ||||
-rw-r--r-- | internal/media/processingmedia.go | 38 | ||||
-rw-r--r-- | internal/media/pruneremote.go | 96 | ||||
-rw-r--r-- | internal/media/pruneremote_test.go | 111 | ||||
-rw-r--r-- | internal/media/util.go | 15 |
7 files changed, 365 insertions, 13 deletions
diff --git a/internal/media/manager.go b/internal/media/manager.go index 3901bae00..deb7e34f7 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -21,11 +21,16 @@ package media import ( "context" "errors" + "fmt" "runtime" + "time" "codeberg.org/gruf/go-runners" "codeberg.org/gruf/go-store/kv" + "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/db" ) @@ -61,6 +66,12 @@ type Manager interface { // // ai is optional and can be nil. Any additional information about the emoji provided will be put in the database. ProcessEmoji(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) + // RecacheMedia refetches, reprocesses, and recaches an existing attachment that has been uncached via pruneRemote. + RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) + // PruneRemote prunes all remote media cached on this instance that's older than the given amount of days. + // 'Pruning' in this context means removing the locally stored data of the attachment (both thumbnail and full size), + // and setting 'cached' to false on the associated attachment. + PruneRemote(ctx context.Context, olderThanDays int) (int, error) // NumWorkers returns the total number of workers available to this manager. NumWorkers() int // QueueSize returns the total capacity of the queue. @@ -76,11 +87,12 @@ type Manager interface { } type manager struct { - db db.DB - storage *kv.KVStore - pool runners.WorkerPool - numWorkers int - queueSize int + db db.DB + storage *kv.KVStore + pool runners.WorkerPool + stopCronJobs func() error + numWorkers int + queueSize int } // NewManager returns a media manager with the given db and underlying storage. @@ -97,8 +109,10 @@ type manager struct { // For a 4 core machine, this will be 2 workers, and a queue length of 20. // For a single or 2-core machine, the media manager will get 1 worker, and a queue of length 10. func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) { - numWorkers := runtime.NumCPU() / 2 + + // configure the worker pool // make sure we always have at least 1 worker even on single-core machines + numWorkers := runtime.NumCPU() / 2 if numWorkers == 0 { numWorkers = 1 } @@ -112,11 +126,61 @@ func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) { queueSize: queueSize, } + // start the worker pool if start := m.pool.Start(); !start { return nil, errors.New("could not start worker pool") } logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", numWorkers, queueSize) + // start remote cache cleanup cronjob if configured + cacheCleanupDays := viper.GetInt(config.Keys.MediaRemoteCacheDays) + if cacheCleanupDays != 0 { + // we need a way of cancelling running jobs if the media manager is told to stop + pruneCtx, pruneCancel := context.WithCancel(context.Background()) + + // create a new cron instance and add a function to it + c := cron.New(cron.WithLogger(&logrusWrapper{})) + + pruneFunc := func() { + begin := time.Now() + pruned, err := m.PruneRemote(pruneCtx, cacheCleanupDays) + if err != nil { + logrus.Errorf("media manager: error pruning remote cache: %s", err) + return + } + logrus.Infof("media manager: pruned %d remote cache entries in %s", pruned, time.Since(begin)) + } + + // run every night + entryID, err := c.AddFunc("@midnight", pruneFunc) + if err != nil { + pruneCancel() + return nil, fmt.Errorf("error starting media manager remote cache cleanup job: %s", err) + } + + // since we're running a cron job, we should define how the manager should stop them + m.stopCronJobs = func() error { + // try to stop any jobs gracefully by waiting til they're finished + cronCtx := c.Stop() + + select { + case <-cronCtx.Done(): + logrus.Infof("media manager: cron finished jobs and stopped gracefully") + case <-time.After(1 * time.Minute): + logrus.Infof("media manager: cron didn't stop after 60 seconds, will force close") + break + } + + // whether the job is finished neatly or we had to wait a minute, cancel the context on the prune job + pruneCancel() + return nil + } + + // now start all the cron stuff we've lined up + c.Start() + logrus.Infof("started media manager remote cache cleanup job: will run next at %s", c.Entry(entryID).Next) + } + return m, nil } @@ -168,6 +232,30 @@ func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, postData Post return processingEmoji, nil } +func (m *manager) RecacheMedia(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) { + processingRecache, err := m.preProcessRecache(ctx, data, postData, attachmentID) + if err != nil { + return nil, err + } + + logrus.Tracef("RecacheMedia: about to enqueue recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue()) + m.pool.Enqueue(func(innerCtx context.Context) { + select { + case <-innerCtx.Done(): + // if the inner context is done that means the worker pool is closing, so we should just return + return + default: + // start loading the media already for the caller's convenience + if _, err := processingRecache.LoadAttachment(innerCtx); err != nil { + logrus.Errorf("RecacheMedia: error processing recache with attachmentID %s: %s", processingRecache.AttachmentID(), err) + } + } + }) + logrus.Tracef("RecacheMedia: succesfully queued recache with attachmentID %s, queue length is %d", processingRecache.AttachmentID(), m.pool.Queue()) + + return processingRecache, nil +} + func (m *manager) NumWorkers() int { return m.numWorkers } @@ -186,10 +274,15 @@ func (m *manager) ActiveWorkers() int { func (m *manager) Stop() error { logrus.Info("stopping media manager worker pool") - stopped := m.pool.Stop() if !stopped { return errors.New("could not stop media manager worker pool") } + + if m.stopCronJobs != nil { // only defined if cron jobs are actually running + logrus.Info("stopping media manager cache cleanup jobs") + return m.stopCronJobs() + } + return nil } diff --git a/internal/media/manager_test.go b/internal/media/manager_test.go index a962c2a44..95cefe1db 100644 --- a/internal/media/manager_test.go +++ b/internal/media/manager_test.go @@ -31,7 +31,7 @@ import ( "codeberg.org/gruf/go-store/kv" "codeberg.org/gruf/go-store/storage" "github.com/stretchr/testify/suite" - gtsmodel "github.com/superseriousbusiness/gotosocial/internal/db/bundb/migrations/20211113114307_init" + gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/media" ) diff --git a/internal/media/media_test.go b/internal/media/media_test.go index f3e73ed79..ee0fd8eea 100644 --- a/internal/media/media_test.go +++ b/internal/media/media_test.go @@ -22,6 +22,7 @@ import ( "codeberg.org/gruf/go-store/kv" "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/db" + gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -29,9 +30,10 @@ import ( type MediaStandardTestSuite struct { suite.Suite - db db.DB - storage *kv.KVStore - manager media.Manager + db db.DB + storage *kv.KVStore + manager media.Manager + testAttachments map[string]*gtsmodel.MediaAttachment } func (suite *MediaStandardTestSuite) SetupSuite() { @@ -45,6 +47,7 @@ func (suite *MediaStandardTestSuite) SetupSuite() { func (suite *MediaStandardTestSuite) SetupTest() { testrig.StandardStorageSetup(suite.storage, "../../testrig/media") testrig.StandardDBSetup(suite.db, nil) + suite.testAttachments = testrig.NewTestAttachments() suite.manager = testrig.NewTestMediaManager(suite.db, suite.storage) } diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index 5d5cfd249..634d4eb48 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -66,6 +66,9 @@ type ProcessingMedia struct { // track whether this media has already been put in the databse insertedInDB bool + + // true if this is a recache, false if it's brand new media + recache bool } // AttachmentID returns the ID of the underlying media attachment without blocking processing. @@ -93,8 +96,16 @@ func (p *ProcessingMedia) LoadAttachment(ctx context.Context) (*gtsmodel.MediaAt // store the result in the database before returning it if !p.insertedInDB { - if err := p.database.Put(ctx, p.attachment); err != nil { - return nil, err + if p.recache { + // if it's a recache we should only need to update + if err := p.database.UpdateByPrimaryKey(ctx, p.attachment); err != nil { + return nil, err + } + } else { + // otherwise we need to really PUT it + if err := p.database.Put(ctx, p.attachment); err != nil { + return nil, err + } } p.insertedInDB = true } @@ -305,6 +316,7 @@ func (p *ProcessingMedia) store(ctx context.Context) error { if err := p.storage.PutStream(p.attachment.File.Path, clean); err != nil { return fmt.Errorf("store: error storing stream: %s", err) } + p.attachment.Cached = true // if the original reader is a readcloser, close it since we're done with it now if rc, ok := reader.(io.ReadCloser); ok { @@ -360,6 +372,7 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData P Thumbnail: thumbnail, Avatar: false, Header: false, + Cached: false, } // check if we have additional info to add to the attachment, @@ -418,3 +431,24 @@ func (m *manager) preProcessMedia(ctx context.Context, data DataFunc, postData P return processingMedia, nil } + +func (m *manager) preProcessRecache(ctx context.Context, data DataFunc, postData PostDataCallbackFunc, attachmentID string) (*ProcessingMedia, error) { + // get the existing attachment + attachment, err := m.db.GetAttachmentByID(ctx, attachmentID) + if err != nil { + return nil, err + } + + processingMedia := &ProcessingMedia{ + attachment: attachment, + data: data, + postData: postData, + thumbState: int32(received), + fullSizeState: int32(received), + database: m.db, + storage: m.storage, + recache: true, // indicate it's a recache + } + + return processingMedia, nil +} diff --git a/internal/media/pruneremote.go b/internal/media/pruneremote.go new file mode 100644 index 000000000..372f7bbb9 --- /dev/null +++ b/internal/media/pruneremote.go @@ -0,0 +1,96 @@ +/* + GoToSocial + Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package media + +import ( + "context" + "fmt" + "time" + + "codeberg.org/gruf/go-store/storage" + "github.com/sirupsen/logrus" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// amount of media attachments to select at a time from the db when pruning +const selectPruneLimit = 20 + +func (m *manager) PruneRemote(ctx context.Context, olderThanDays int) (int, error) { + var totalPruned int + + // convert days into a duration string + olderThanHoursString := fmt.Sprintf("%dh", olderThanDays*24) + // parse the duration string into a duration + olderThanHours, err := time.ParseDuration(olderThanHoursString) + if err != nil { + return totalPruned, fmt.Errorf("PruneRemote: %d", err) + } + // 'subtract' that from the time now to give our threshold + olderThan := time.Now().Add(-olderThanHours) + logrus.Infof("PruneRemote: pruning media older than %s", olderThan) + + // select 20 attachments at a time and prune them + for attachments, err := m.db.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit); err == nil && len(attachments) != 0; attachments, err = m.db.GetRemoteOlderThan(ctx, olderThan, selectPruneLimit) { + + // use the age of the oldest attachment (the last one in the slice) as the next 'older than' value + l := len(attachments) + logrus.Tracef("PruneRemote: got %d attachments older than %s", l, olderThan) + olderThan = attachments[l-1].CreatedAt + + // prune each attachment + for _, attachment := range attachments { + if err := m.PruneOne(ctx, attachment); err != nil { + return totalPruned, err + } + totalPruned++ + } + } + + // make sure we don't have a real error when we leave the loop + if err != nil && err != db.ErrNoEntries { + return totalPruned, err + } + + logrus.Infof("PruneRemote: finished pruning remote media: pruned %d entries", totalPruned) + return totalPruned, nil +} + +func (m *manager) PruneOne(ctx context.Context, attachment *gtsmodel.MediaAttachment) error { + if attachment.File.Path != "" { + // delete the full size attachment from storage + logrus.Tracef("PruneOne: deleting %s", attachment.File.Path) + if err := m.storage.Delete(attachment.File.Path); err != nil && err != storage.ErrNotFound { + return err + } + attachment.Cached = false + } + + if attachment.Thumbnail.Path != "" { + // delete the thumbnail from storage + logrus.Tracef("PruneOne: deleting %s", attachment.Thumbnail.Path) + if err := m.storage.Delete(attachment.Thumbnail.Path); err != nil && err != storage.ErrNotFound { + return err + } + attachment.Cached = false + } + + // update the attachment to reflect that we no longer have it cached + return m.db.UpdateByPrimaryKey(ctx, attachment) +} diff --git a/internal/media/pruneremote_test.go b/internal/media/pruneremote_test.go new file mode 100644 index 000000000..f9d71cae2 --- /dev/null +++ b/internal/media/pruneremote_test.go @@ -0,0 +1,111 @@ +/* + GoToSocial + Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package media_test + +import ( + "bytes" + "context" + "io" + "os" + "testing" + + "codeberg.org/gruf/go-store/storage" + "github.com/stretchr/testify/suite" +) + +type PruneRemoteTestSuite struct { + MediaStandardTestSuite +} + +func (suite *PruneRemoteTestSuite) TestPruneRemote() { + testAttachment := suite.testAttachments["remote_account_1_status_1_attachment_1"] + suite.True(testAttachment.Cached) + + totalPruned, err := suite.manager.PruneRemote(context.Background(), 1) + suite.NoError(err) + suite.Equal(1, totalPruned) + + prunedAttachment, err := suite.db.GetAttachmentByID(context.Background(), testAttachment.ID) + suite.NoError(err) + + // the media should no longer be cached + suite.False(prunedAttachment.Cached) +} + +func (suite *PruneRemoteTestSuite) TestPruneRemoteTwice() { + totalPruned, err := suite.manager.PruneRemote(context.Background(), 1) + suite.NoError(err) + suite.Equal(1, totalPruned) + + // final prune should prune nothing, since the first prune already happened + totalPrunedAgain, err := suite.manager.PruneRemote(context.Background(), 1) + suite.NoError(err) + suite.Equal(0, totalPrunedAgain) +} + +func (suite *PruneRemoteTestSuite) TestPruneAndRecache() { + ctx := context.Background() + testAttachment := suite.testAttachments["remote_account_1_status_1_attachment_1"] + + totalPruned, err := suite.manager.PruneRemote(ctx, 1) + suite.NoError(err) + suite.Equal(1, totalPruned) + + // media should no longer be stored + _, err = suite.storage.Get(testAttachment.File.Path) + suite.Error(err) + suite.ErrorIs(err, storage.ErrNotFound) + _, err = suite.storage.Get(testAttachment.Thumbnail.Path) + suite.Error(err) + suite.ErrorIs(err, storage.ErrNotFound) + + // now recache the image.... + data := func(_ context.Context) (io.Reader, int, error) { + // load bytes from a test image + b, err := os.ReadFile("../../testrig/media/thoughtsofdog-original.jpeg") + if err != nil { + panic(err) + } + return bytes.NewBuffer(b), len(b), nil + } + processingRecache, err := suite.manager.RecacheMedia(ctx, data, nil, testAttachment.ID) + suite.NoError(err) + + // synchronously load the recached attachment + recachedAttachment, err := processingRecache.LoadAttachment(ctx) + suite.NoError(err) + suite.NotNil(recachedAttachment) + + // recachedAttachment should be basically the same as the old attachment + suite.True(recachedAttachment.Cached) + suite.Equal(testAttachment.ID, recachedAttachment.ID) + suite.Equal(testAttachment.File.Path, recachedAttachment.File.Path) // file should be stored in the same place + suite.Equal(testAttachment.Thumbnail.Path, recachedAttachment.Thumbnail.Path) // as should the thumbnail + suite.EqualValues(testAttachment.FileMeta, recachedAttachment.FileMeta) // and the filemeta should be the same + + // recached files should be back in storage + _, err = suite.storage.Get(recachedAttachment.File.Path) + suite.NoError(err) + _, err = suite.storage.Get(recachedAttachment.Thumbnail.Path) + suite.NoError(err) +} + +func TestPruneRemoteTestSuite(t *testing.T) { + suite.Run(t, &PruneRemoteTestSuite{}) +} diff --git a/internal/media/util.go b/internal/media/util.go index 248d5fb19..f3cd1b986 100644 --- a/internal/media/util.go +++ b/internal/media/util.go @@ -23,6 +23,7 @@ import ( "fmt" "github.com/h2non/filetype" + "github.com/sirupsen/logrus" ) // parseContentType parses the MIME content type from a file, returning it as a string in the form (eg., "image/jpeg"). @@ -103,3 +104,17 @@ func ParseMediaSize(s string) (Size, error) { } return "", fmt.Errorf("%s not a recognized MediaSize", s) } + +// logrusWrapper is just a util for passing the logrus logger into the cron logging system. +type logrusWrapper struct { +} + +// Info logs routine messages about cron's operation. +func (l *logrusWrapper) Info(msg string, keysAndValues ...interface{}) { + logrus.Info("media manager cron logger: ", msg, keysAndValues) +} + +// Error logs an error condition. +func (l *logrusWrapper) Error(err error, msg string, keysAndValues ...interface{}) { + logrus.Error("media manager cron logger: ", err, msg, keysAndValues) +} |