diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/api/client/media/mediacreate_test.go | 9 | ||||
-rw-r--r-- | internal/api/fileserver/servefile.go | 18 | ||||
-rw-r--r-- | internal/cleaner/cleaner.go | 5 | ||||
-rw-r--r-- | internal/cleaner/media.go | 2 | ||||
-rw-r--r-- | internal/cleaner/media_test.go | 8 | ||||
-rw-r--r-- | internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go | 14 | ||||
-rw-r--r-- | internal/iotools/io.go | 4 | ||||
-rw-r--r-- | internal/media/manager.go | 7 | ||||
-rw-r--r-- | internal/media/manager_test.go | 12 | ||||
-rw-r--r-- | internal/media/processingmedia.go | 3 | ||||
-rw-r--r-- | internal/media/video.go | 30 | ||||
-rw-r--r-- | internal/processing/media/delete.go | 6 | ||||
-rw-r--r-- | internal/storage/storage.go | 66 |
13 files changed, 101 insertions, 83 deletions
diff --git a/internal/api/client/media/mediacreate_test.go b/internal/api/client/media/mediacreate_test.go index 41a1fc16f..00f385032 100644 --- a/internal/api/client/media/mediacreate_test.go +++ b/internal/api/client/media/mediacreate_test.go @@ -19,7 +19,6 @@ package media_test import ( "bytes" - "context" "crypto/rand" "encoding/base64" "encoding/json" @@ -152,7 +151,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() { // see what's in storage *before* the request var storageKeysBeforeRequest []string - if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + if err := suite.storage.WalkKeys(ctx, func(key string) error { storageKeysBeforeRequest = append(storageKeysBeforeRequest, key) return nil }); err != nil { @@ -177,7 +176,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() { // check what's in storage *after* the request var storageKeysAfterRequest []string - if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + if err := suite.storage.WalkKeys(ctx, func(key string) error { storageKeysAfterRequest = append(storageKeysAfterRequest, key) return nil }); err != nil { @@ -237,7 +236,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() { // see what's in storage *before* the request var storageKeysBeforeRequest []string - if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + if err := suite.storage.WalkKeys(ctx, func(key string) error { storageKeysBeforeRequest = append(storageKeysBeforeRequest, key) return nil }); err != nil { @@ -262,7 +261,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() { // check what's in storage *after* the request var storageKeysAfterRequest []string - if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error { + if err := suite.storage.WalkKeys(ctx, func(key string) error { storageKeysAfterRequest = append(storageKeysAfterRequest, key) return nil }); err != nil { diff --git a/internal/api/fileserver/servefile.go b/internal/api/fileserver/servefile.go index 8fb5a838e..fc6ef0e7e 100644 --- a/internal/api/fileserver/servefile.go +++ b/internal/api/fileserver/servefile.go @@ -224,10 +224,20 @@ func serveFileRange(rw http.ResponseWriter, r *http.Request, src io.Reader, rng return } - // Dump the first 'start' many bytes into the void... - if _, err := fastcopy.CopyN(io.Discard, src, start); err != nil { - log.Errorf(r.Context(), "error reading from source: %v", err) - return + if rs, ok := src.(io.ReadSeeker); ok { + // Source supports seeking (usually *os.File), + // seek to the 'start' byte position in file. + if _, err := rs.Seek(start, 0); err != nil { + log.Errorf(r.Context(), "error seeking in source: %v", err) + return + } + } else { + // Compat for when no seek call is implemented, + // dump the first 'start' many bytes into void. + if _, err := fastcopy.CopyN(io.Discard, src, start); err != nil { + log.Errorf(r.Context(), "error reading from source: %v", err) + return + } } // Determine new content length diff --git a/internal/cleaner/cleaner.go b/internal/cleaner/cleaner.go index a1209ae08..e87041d71 100644 --- a/internal/cleaner/cleaner.go +++ b/internal/cleaner/cleaner.go @@ -19,15 +19,14 @@ package cleaner import ( "context" - "errors" "time" - "codeberg.org/gruf/go-store/v2/storage" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" ) const ( @@ -89,7 +88,7 @@ func (c *Cleaner) removeFiles(ctx context.Context, files ...string) (int, error) // Remove each provided storage path. log.Debugf(ctx, "removing file: %s", path) err := c.state.Storage.Delete(ctx, path) - if err != nil && !errors.Is(err, storage.ErrNotFound) { + if err != nil && !storage.IsNotFound(err) { errs.Appendf("error removing %s: %w", path, err) errCount++ } diff --git a/internal/cleaner/media.go b/internal/cleaner/media.go index f3cda5d87..185c64fb9 100644 --- a/internal/cleaner/media.go +++ b/internal/cleaner/media.go @@ -96,7 +96,7 @@ func (m *Media) PruneOrphaned(ctx context.Context) (int, error) { var files []string // All media files in storage will have path fitting: {$account}/{$type}/{$size}/{$id}.{$ext} - if err := m.state.Storage.WalkKeys(ctx, func(ctx context.Context, path string) error { + if err := m.state.Storage.WalkKeys(ctx, func(path string) error { // Check for our expected fileserver path format. if !regexes.FilePath.MatchString(path) { log.Warn(ctx, "unexpected storage item: %s", path) diff --git a/internal/cleaner/media_test.go b/internal/cleaner/media_test.go index c27890f55..b33ae4b4f 100644 --- a/internal/cleaner/media_test.go +++ b/internal/cleaner/media_test.go @@ -364,13 +364,13 @@ func (suite *MediaTestSuite) TestUncacheAndRecache() { // media should no longer be stored _, err = suite.storage.Get(ctx, testStatusAttachment.File.Path) - suite.ErrorIs(err, storage.ErrNotFound) + suite.True(storage.IsNotFound(err)) _, err = suite.storage.Get(ctx, testStatusAttachment.Thumbnail.Path) - suite.ErrorIs(err, storage.ErrNotFound) + suite.True(storage.IsNotFound(err)) _, err = suite.storage.Get(ctx, testHeader.File.Path) - suite.ErrorIs(err, storage.ErrNotFound) + suite.True(storage.IsNotFound(err)) _, err = suite.storage.Get(ctx, testHeader.Thumbnail.Path) - suite.ErrorIs(err, storage.ErrNotFound) + suite.True(storage.IsNotFound(err)) // now recache the image.... data := func(_ context.Context) (io.ReadCloser, int64, error) { diff --git a/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go b/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go index 28bbb3a81..6c280fb11 100644 --- a/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go +++ b/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go @@ -20,10 +20,11 @@ package migrations import ( "context" "database/sql" + "errors" "fmt" - "path" - "codeberg.org/gruf/go-store/v2/storage" + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/disk" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -32,13 +33,13 @@ import ( func init() { deleteAttachment := func(ctx context.Context, l log.Entry, a *gtsmodel.MediaAttachment, s storage.Storage, tx bun.Tx) { - if err := s.Remove(ctx, a.File.Path); err != nil && err != storage.ErrNotFound { + if err := s.Remove(ctx, a.File.Path); err != nil && !errors.Is(err, storage.ErrNotFound) { l.Errorf("error removing file %s: %s", a.File.Path, err) } else { l.Debugf("deleted %s", a.File.Path) } - if err := s.Remove(ctx, a.Thumbnail.Path); err != nil && err != storage.ErrNotFound { + if err := s.Remove(ctx, a.Thumbnail.Path); err != nil && !errors.Is(err, storage.ErrNotFound) { l.Errorf("error removing file %s: %s", a.Thumbnail.Path, err) } else { l.Debugf("deleted %s", a.Thumbnail.Path) @@ -68,13 +69,10 @@ func init() { } return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error { - s, err := storage.OpenDisk(storageBasePath, &storage.DiskConfig{ - LockFile: path.Join(storageBasePath, "store.lock"), - }) + s, err := disk.Open(storageBasePath, nil) if err != nil { return fmt.Errorf("error creating storage backend: %s", err) } - defer s.Close() // step 1. select all media attachment remote URLs that have duplicates var dupes int diff --git a/internal/iotools/io.go b/internal/iotools/io.go index d79843341..1c5da25d9 100644 --- a/internal/iotools/io.go +++ b/internal/iotools/io.go @@ -20,6 +20,8 @@ package iotools import ( "io" "os" + + "codeberg.org/gruf/go-fastcopy" ) // ReadFnCloser takes an io.Reader and wraps it to use the provided function to implement io.Closer. @@ -179,7 +181,7 @@ func TempFileSeeker(r io.Reader) (io.ReadSeekCloser, error) { return nil, err } - if _, err := io.Copy(tmp, r); err != nil { + if _, err := fastcopy.Copy(tmp, r); err != nil { return nil, err } diff --git a/internal/media/manager.go b/internal/media/manager.go index 73494881c..be428aa3b 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -19,17 +19,16 @@ package media import ( "context" - "errors" "io" "time" "codeberg.org/gruf/go-iotools" - "codeberg.org/gruf/go-store/v2/storage" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" "github.com/superseriousbusiness/gotosocial/internal/uris" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -260,11 +259,11 @@ func (m *Manager) PreProcessEmoji( // Wrap closer to cleanup old data. c := iotools.CloserCallback(rc, func() { - if err := m.state.Storage.Delete(ctx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) { + if err := m.state.Storage.Delete(ctx, originalImagePath); err != nil && !storage.IsNotFound(err) { log.Errorf(ctx, "error removing old emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err) } - if err := m.state.Storage.Delete(ctx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) { + if err := m.state.Storage.Delete(ctx, originalImageStaticPath); err != nil && !storage.IsNotFound(err) { log.Errorf(ctx, "error removing old static emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err) } }) diff --git a/internal/media/manager_test.go b/internal/media/manager_test.go index ac4286c73..d184e4605 100644 --- a/internal/media/manager_test.go +++ b/internal/media/manager_test.go @@ -23,15 +23,15 @@ import ( "fmt" "io" "os" - "path" "testing" "time" - "codeberg.org/gruf/go-store/v2/storage" + "codeberg.org/gruf/go-storage/disk" "github.com/stretchr/testify/suite" gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/storage" gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -189,9 +189,9 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingRefresh() { // the old image files should no longer be in storage _, err = suite.storage.Get(ctx, oldEmojiImagePath) - suite.ErrorIs(err, storage.ErrNotFound) + suite.True(storage.IsNotFound(err)) _, err = suite.storage.Get(ctx, oldEmojiImageStaticPath) - suite.ErrorIs(err, storage.ErrNotFound) + suite.True(storage.IsNotFound(err)) } func (suite *ManagerTestSuite) TestEmojiProcessBlockingTooLarge() { @@ -1189,9 +1189,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() { temp := fmt.Sprintf("%s/gotosocial-test", os.TempDir()) defer os.RemoveAll(temp) - disk, err := storage.OpenDisk(temp, &storage.DiskConfig{ - LockFile: path.Join(temp, "store.lock"), - }) + disk, err := disk.Open(temp, nil) if err != nil { panic(err) } diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go index 5975de859..b65e3cd48 100644 --- a/internal/media/processingmedia.go +++ b/internal/media/processingmedia.go @@ -20,7 +20,6 @@ package media import ( "bytes" "context" - "errors" "image/jpeg" "io" "time" @@ -156,7 +155,7 @@ func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment, // never decoded). Try to clean up in this case. if p.media.Type == gtsmodel.FileTypeUnknown { deleteErr := p.mgr.state.Storage.Delete(ctx, p.media.File.Path) - if deleteErr != nil && !errors.Is(deleteErr, storage.ErrNotFound) { + if deleteErr != nil && !storage.IsNotFound(deleteErr) { errs.Append(deleteErr) } } diff --git a/internal/media/video.go b/internal/media/video.go index f98880615..5068be636 100644 --- a/internal/media/video.go +++ b/internal/media/video.go @@ -36,20 +36,30 @@ type gtsVideo struct { // decodeVideoFrame decodes and returns an image from a single frame in the given video stream. // (note: currently this only returns a blank image resized to fit video dimensions). func decodeVideoFrame(r io.Reader) (*gtsVideo, error) { - // we need a readseeker to decode the video... - tfs, err := iotools.TempFileSeeker(r) - if err != nil { - return nil, fmt.Errorf("error creating temp file seeker: %w", err) - } - defer func() { - if err := tfs.Close(); err != nil { - log.Errorf(nil, "error closing temp file seeker: %s", err) + // Check if video stream supports + // seeking, usually when *os.File. + rsc, ok := r.(io.ReadSeekCloser) + if !ok { + var err error + + // Store stream to temporary location + // in order that we can get seek-reads. + rsc, err = iotools.TempFileSeeker(r) + if err != nil { + return nil, fmt.Errorf("error creating temp file seeker: %w", err) } - }() + + defer func() { + // Ensure temp. read seeker closed. + if err := rsc.Close(); err != nil { + log.Errorf(nil, "error closing temp file seeker: %s", err) + } + }() + } // probe the video file to extract useful metadata from it; for methodology, see: // https://github.com/abema/go-mp4/blob/7d8e5a7c5e644e0394261b0cf72fef79ce246d31/mp4tool/probe/probe.go#L85-L154 - info, err := mp4.Probe(tfs) + info, err := mp4.Probe(rsc) if err != nil { return nil, fmt.Errorf("error during mp4 probe: %w", err) } diff --git a/internal/processing/media/delete.go b/internal/processing/media/delete.go index 2b215dc81..32650fb2c 100644 --- a/internal/processing/media/delete.go +++ b/internal/processing/media/delete.go @@ -23,9 +23,9 @@ import ( "fmt" "strings" - "codeberg.org/gruf/go-store/v2/storage" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/storage" ) // Delete deletes the media attachment with the given ID, including all files pertaining to that attachment. @@ -44,14 +44,14 @@ func (p *Processor) Delete(ctx context.Context, mediaAttachmentID string) gtserr // delete the thumbnail from storage if attachment.Thumbnail.Path != "" { - if err := p.state.Storage.Delete(ctx, attachment.Thumbnail.Path); err != nil && !errors.Is(err, storage.ErrNotFound) { + if err := p.state.Storage.Delete(ctx, attachment.Thumbnail.Path); err != nil && !storage.IsNotFound(err) { errs = append(errs, fmt.Sprintf("remove thumbnail at path %s: %s", attachment.Thumbnail.Path, err)) } } // delete the file from storage if attachment.File.Path != "" { - if err := p.state.Storage.Delete(ctx, attachment.File.Path); err != nil && !errors.Is(err, storage.ErrNotFound) { + if err := p.state.Storage.Delete(ctx, attachment.File.Path); err != nil && !storage.IsNotFound(err) { errs = append(errs, fmt.Sprintf("remove file at path %s: %s", attachment.File.Path, err)) } } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index c27037fba..872ea1210 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -19,16 +19,20 @@ package storage import ( "context" + "errors" "fmt" "io" "mime" "net/url" "path" + "syscall" "time" "codeberg.org/gruf/go-bytesize" "codeberg.org/gruf/go-cache/v3/ttl" - "codeberg.org/gruf/go-store/v2/storage" + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/disk" + "codeberg.org/gruf/go-storage/s3" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/superseriousbusiness/gotosocial/internal/config" @@ -48,11 +52,17 @@ type PresignedURL struct { Expiry time.Time // link expires at this time } -var ( - // Ptrs to underlying storage library errors. - ErrAlreadyExists = storage.ErrAlreadyExists - ErrNotFound = storage.ErrNotFound -) +// IsAlreadyExist returns whether error is an already-exists +// type error returned by the underlying storage library. +func IsAlreadyExist(err error) bool { + return errors.Is(err, storage.ErrAlreadyExists) +} + +// IsNotFound returns whether error is a not-found error +// type returned by the underlying storage library. +func IsNotFound(err error) bool { + return errors.Is(err, storage.ErrNotFound) +} // Driver wraps a kv.KVStore to also provide S3 presigned GET URLs. type Driver struct { @@ -92,30 +102,23 @@ func (d *Driver) Delete(ctx context.Context, key string) error { // Has checks if the supplied key is in the storage. func (d *Driver) Has(ctx context.Context, key string) (bool, error) { - return d.Storage.Stat(ctx, key) + stat, err := d.Storage.Stat(ctx, key) + return (stat != nil), err } // WalkKeys walks the keys in the storage. -func (d *Driver) WalkKeys(ctx context.Context, walk func(context.Context, string) error) error { - return d.Storage.WalkKeys(ctx, storage.WalkKeysOptions{ - WalkFn: func(ctx context.Context, entry storage.Entry) error { - if entry.Key == "store.lock" { - return nil // skip this. - } - return walk(ctx, entry.Key) +func (d *Driver) WalkKeys(ctx context.Context, walk func(string) error) error { + return d.Storage.WalkKeys(ctx, storage.WalkKeysOpts{ + Step: func(entry storage.Entry) error { + return walk(entry.Key) }, }) } -// Close will close the storage, releasing any file locks. -func (d *Driver) Close() error { - return d.Storage.Close() -} - // URL will return a presigned GET object URL, but only if running on S3 storage with proxying disabled. func (d *Driver) URL(ctx context.Context, key string) *PresignedURL { // Check whether S3 *without* proxying is enabled - s3, ok := d.Storage.(*storage.S3Storage) + s3, ok := d.Storage.(*s3.S3Storage) if !ok || d.Proxy { return nil } @@ -166,7 +169,7 @@ func (d *Driver) ProbeCSPUri(ctx context.Context) (string, error) { // Check whether S3 without proxying // is enabled. If it's not, there's // no need to add anything to the CSP. - s3, ok := d.Storage.(*storage.S3Storage) + s3, ok := d.Storage.(*s3.S3Storage) if !ok || d.Proxy { return "", nil } @@ -217,16 +220,17 @@ func NewFileStorage() (*Driver, error) { // Load runtime configuration basePath := config.GetStorageLocalBasePath() + // Use default disk config but with + // increased write buffer size and + // 'exclusive' bit sets when creating + // files to ensure we don't overwrite + // existing files unless intending to. + diskCfg := disk.DefaultConfig() + diskCfg.OpenWrite.Flags |= syscall.O_EXCL + diskCfg.WriteBufSize = int(16 * bytesize.KiB) + // Open the disk storage implementation - disk, err := storage.OpenDisk(basePath, &storage.DiskConfig{ - // Put the store lockfile in the storage dir itself. - // Normally this would not be safe, since we could end up - // overwriting the lockfile if we store a file called 'store.lock'. - // However, in this case it's OK because the keys are set by - // GtS and not the user, so we know we're never going to overwrite it. - LockFile: path.Join(basePath, "store.lock"), - WriteBufSize: int(16 * bytesize.KiB), - }) + disk, err := disk.Open(basePath, &diskCfg) if err != nil { return nil, fmt.Errorf("error opening disk storage: %w", err) } @@ -245,7 +249,7 @@ func NewS3Storage() (*Driver, error) { bucket := config.GetStorageS3BucketName() // Open the s3 storage implementation - s3, err := storage.OpenS3(endpoint, bucket, &storage.S3Config{ + s3, err := s3.Open(endpoint, bucket, &s3.Config{ CoreOpts: minio.Options{ Creds: credentials.NewStaticV4(access, secret, ""), Secure: secure, |