summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/api/client/media/mediacreate_test.go9
-rw-r--r--internal/api/fileserver/servefile.go18
-rw-r--r--internal/cleaner/cleaner.go5
-rw-r--r--internal/cleaner/media.go2
-rw-r--r--internal/cleaner/media_test.go8
-rw-r--r--internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go14
-rw-r--r--internal/iotools/io.go4
-rw-r--r--internal/media/manager.go7
-rw-r--r--internal/media/manager_test.go12
-rw-r--r--internal/media/processingmedia.go3
-rw-r--r--internal/media/video.go30
-rw-r--r--internal/processing/media/delete.go6
-rw-r--r--internal/storage/storage.go66
13 files changed, 101 insertions, 83 deletions
diff --git a/internal/api/client/media/mediacreate_test.go b/internal/api/client/media/mediacreate_test.go
index 41a1fc16f..00f385032 100644
--- a/internal/api/client/media/mediacreate_test.go
+++ b/internal/api/client/media/mediacreate_test.go
@@ -19,7 +19,6 @@ package media_test
import (
"bytes"
- "context"
"crypto/rand"
"encoding/base64"
"encoding/json"
@@ -152,7 +151,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() {
// see what's in storage *before* the request
var storageKeysBeforeRequest []string
- if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
+ if err := suite.storage.WalkKeys(ctx, func(key string) error {
storageKeysBeforeRequest = append(storageKeysBeforeRequest, key)
return nil
}); err != nil {
@@ -177,7 +176,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessful() {
// check what's in storage *after* the request
var storageKeysAfterRequest []string
- if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
+ if err := suite.storage.WalkKeys(ctx, func(key string) error {
storageKeysAfterRequest = append(storageKeysAfterRequest, key)
return nil
}); err != nil {
@@ -237,7 +236,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() {
// see what's in storage *before* the request
var storageKeysBeforeRequest []string
- if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
+ if err := suite.storage.WalkKeys(ctx, func(key string) error {
storageKeysBeforeRequest = append(storageKeysBeforeRequest, key)
return nil
}); err != nil {
@@ -262,7 +261,7 @@ func (suite *MediaCreateTestSuite) TestMediaCreateSuccessfulV2() {
// check what's in storage *after* the request
var storageKeysAfterRequest []string
- if err := suite.storage.WalkKeys(ctx, func(ctx context.Context, key string) error {
+ if err := suite.storage.WalkKeys(ctx, func(key string) error {
storageKeysAfterRequest = append(storageKeysAfterRequest, key)
return nil
}); err != nil {
diff --git a/internal/api/fileserver/servefile.go b/internal/api/fileserver/servefile.go
index 8fb5a838e..fc6ef0e7e 100644
--- a/internal/api/fileserver/servefile.go
+++ b/internal/api/fileserver/servefile.go
@@ -224,10 +224,20 @@ func serveFileRange(rw http.ResponseWriter, r *http.Request, src io.Reader, rng
return
}
- // Dump the first 'start' many bytes into the void...
- if _, err := fastcopy.CopyN(io.Discard, src, start); err != nil {
- log.Errorf(r.Context(), "error reading from source: %v", err)
- return
+ if rs, ok := src.(io.ReadSeeker); ok {
+ // Source supports seeking (usually *os.File),
+ // seek to the 'start' byte position in file.
+ if _, err := rs.Seek(start, 0); err != nil {
+ log.Errorf(r.Context(), "error seeking in source: %v", err)
+ return
+ }
+ } else {
+ // Compat for when no seek call is implemented,
+ // dump the first 'start' many bytes into void.
+ if _, err := fastcopy.CopyN(io.Discard, src, start); err != nil {
+ log.Errorf(r.Context(), "error reading from source: %v", err)
+ return
+ }
}
// Determine new content length
diff --git a/internal/cleaner/cleaner.go b/internal/cleaner/cleaner.go
index a1209ae08..e87041d71 100644
--- a/internal/cleaner/cleaner.go
+++ b/internal/cleaner/cleaner.go
@@ -19,15 +19,14 @@ package cleaner
import (
"context"
- "errors"
"time"
- "codeberg.org/gruf/go-store/v2/storage"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
)
const (
@@ -89,7 +88,7 @@ func (c *Cleaner) removeFiles(ctx context.Context, files ...string) (int, error)
// Remove each provided storage path.
log.Debugf(ctx, "removing file: %s", path)
err := c.state.Storage.Delete(ctx, path)
- if err != nil && !errors.Is(err, storage.ErrNotFound) {
+ if err != nil && !storage.IsNotFound(err) {
errs.Appendf("error removing %s: %w", path, err)
errCount++
}
diff --git a/internal/cleaner/media.go b/internal/cleaner/media.go
index f3cda5d87..185c64fb9 100644
--- a/internal/cleaner/media.go
+++ b/internal/cleaner/media.go
@@ -96,7 +96,7 @@ func (m *Media) PruneOrphaned(ctx context.Context) (int, error) {
var files []string
// All media files in storage will have path fitting: {$account}/{$type}/{$size}/{$id}.{$ext}
- if err := m.state.Storage.WalkKeys(ctx, func(ctx context.Context, path string) error {
+ if err := m.state.Storage.WalkKeys(ctx, func(path string) error {
// Check for our expected fileserver path format.
if !regexes.FilePath.MatchString(path) {
log.Warn(ctx, "unexpected storage item: %s", path)
diff --git a/internal/cleaner/media_test.go b/internal/cleaner/media_test.go
index c27890f55..b33ae4b4f 100644
--- a/internal/cleaner/media_test.go
+++ b/internal/cleaner/media_test.go
@@ -364,13 +364,13 @@ func (suite *MediaTestSuite) TestUncacheAndRecache() {
// media should no longer be stored
_, err = suite.storage.Get(ctx, testStatusAttachment.File.Path)
- suite.ErrorIs(err, storage.ErrNotFound)
+ suite.True(storage.IsNotFound(err))
_, err = suite.storage.Get(ctx, testStatusAttachment.Thumbnail.Path)
- suite.ErrorIs(err, storage.ErrNotFound)
+ suite.True(storage.IsNotFound(err))
_, err = suite.storage.Get(ctx, testHeader.File.Path)
- suite.ErrorIs(err, storage.ErrNotFound)
+ suite.True(storage.IsNotFound(err))
_, err = suite.storage.Get(ctx, testHeader.Thumbnail.Path)
- suite.ErrorIs(err, storage.ErrNotFound)
+ suite.True(storage.IsNotFound(err))
// now recache the image....
data := func(_ context.Context) (io.ReadCloser, int64, error) {
diff --git a/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go b/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go
index 28bbb3a81..6c280fb11 100644
--- a/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go
+++ b/internal/db/bundb/migrations/20220612091800_duplicated_media_cleanup.go
@@ -20,10 +20,11 @@ package migrations
import (
"context"
"database/sql"
+ "errors"
"fmt"
- "path"
- "codeberg.org/gruf/go-store/v2/storage"
+ "codeberg.org/gruf/go-storage"
+ "codeberg.org/gruf/go-storage/disk"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
@@ -32,13 +33,13 @@ import (
func init() {
deleteAttachment := func(ctx context.Context, l log.Entry, a *gtsmodel.MediaAttachment, s storage.Storage, tx bun.Tx) {
- if err := s.Remove(ctx, a.File.Path); err != nil && err != storage.ErrNotFound {
+ if err := s.Remove(ctx, a.File.Path); err != nil && !errors.Is(err, storage.ErrNotFound) {
l.Errorf("error removing file %s: %s", a.File.Path, err)
} else {
l.Debugf("deleted %s", a.File.Path)
}
- if err := s.Remove(ctx, a.Thumbnail.Path); err != nil && err != storage.ErrNotFound {
+ if err := s.Remove(ctx, a.Thumbnail.Path); err != nil && !errors.Is(err, storage.ErrNotFound) {
l.Errorf("error removing file %s: %s", a.Thumbnail.Path, err)
} else {
l.Debugf("deleted %s", a.Thumbnail.Path)
@@ -68,13 +69,10 @@ func init() {
}
return db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
- s, err := storage.OpenDisk(storageBasePath, &storage.DiskConfig{
- LockFile: path.Join(storageBasePath, "store.lock"),
- })
+ s, err := disk.Open(storageBasePath, nil)
if err != nil {
return fmt.Errorf("error creating storage backend: %s", err)
}
- defer s.Close()
// step 1. select all media attachment remote URLs that have duplicates
var dupes int
diff --git a/internal/iotools/io.go b/internal/iotools/io.go
index d79843341..1c5da25d9 100644
--- a/internal/iotools/io.go
+++ b/internal/iotools/io.go
@@ -20,6 +20,8 @@ package iotools
import (
"io"
"os"
+
+ "codeberg.org/gruf/go-fastcopy"
)
// ReadFnCloser takes an io.Reader and wraps it to use the provided function to implement io.Closer.
@@ -179,7 +181,7 @@ func TempFileSeeker(r io.Reader) (io.ReadSeekCloser, error) {
return nil, err
}
- if _, err := io.Copy(tmp, r); err != nil {
+ if _, err := fastcopy.Copy(tmp, r); err != nil {
return nil, err
}
diff --git a/internal/media/manager.go b/internal/media/manager.go
index 73494881c..be428aa3b 100644
--- a/internal/media/manager.go
+++ b/internal/media/manager.go
@@ -19,17 +19,16 @@ package media
import (
"context"
- "errors"
"io"
"time"
"codeberg.org/gruf/go-iotools"
- "codeberg.org/gruf/go-store/v2/storage"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
"github.com/superseriousbusiness/gotosocial/internal/uris"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
@@ -260,11 +259,11 @@ func (m *Manager) PreProcessEmoji(
// Wrap closer to cleanup old data.
c := iotools.CloserCallback(rc, func() {
- if err := m.state.Storage.Delete(ctx, originalImagePath); err != nil && !errors.Is(err, storage.ErrNotFound) {
+ if err := m.state.Storage.Delete(ctx, originalImagePath); err != nil && !storage.IsNotFound(err) {
log.Errorf(ctx, "error removing old emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err)
}
- if err := m.state.Storage.Delete(ctx, originalImageStaticPath); err != nil && !errors.Is(err, storage.ErrNotFound) {
+ if err := m.state.Storage.Delete(ctx, originalImageStaticPath); err != nil && !storage.IsNotFound(err) {
log.Errorf(ctx, "error removing old static emoji %s@%s from storage: %v", emoji.Shortcode, emoji.Domain, err)
}
})
diff --git a/internal/media/manager_test.go b/internal/media/manager_test.go
index ac4286c73..d184e4605 100644
--- a/internal/media/manager_test.go
+++ b/internal/media/manager_test.go
@@ -23,15 +23,15 @@ import (
"fmt"
"io"
"os"
- "path"
"testing"
"time"
- "codeberg.org/gruf/go-store/v2/storage"
+ "codeberg.org/gruf/go-storage/disk"
"github.com/stretchr/testify/suite"
gtsmodel "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
gtsstorage "github.com/superseriousbusiness/gotosocial/internal/storage"
"github.com/superseriousbusiness/gotosocial/testrig"
)
@@ -189,9 +189,9 @@ func (suite *ManagerTestSuite) TestEmojiProcessBlockingRefresh() {
// the old image files should no longer be in storage
_, err = suite.storage.Get(ctx, oldEmojiImagePath)
- suite.ErrorIs(err, storage.ErrNotFound)
+ suite.True(storage.IsNotFound(err))
_, err = suite.storage.Get(ctx, oldEmojiImageStaticPath)
- suite.ErrorIs(err, storage.ErrNotFound)
+ suite.True(storage.IsNotFound(err))
}
func (suite *ManagerTestSuite) TestEmojiProcessBlockingTooLarge() {
@@ -1189,9 +1189,7 @@ func (suite *ManagerTestSuite) TestSimpleJpegProcessBlockingWithDiskStorage() {
temp := fmt.Sprintf("%s/gotosocial-test", os.TempDir())
defer os.RemoveAll(temp)
- disk, err := storage.OpenDisk(temp, &storage.DiskConfig{
- LockFile: path.Join(temp, "store.lock"),
- })
+ disk, err := disk.Open(temp, nil)
if err != nil {
panic(err)
}
diff --git a/internal/media/processingmedia.go b/internal/media/processingmedia.go
index 5975de859..b65e3cd48 100644
--- a/internal/media/processingmedia.go
+++ b/internal/media/processingmedia.go
@@ -20,7 +20,6 @@ package media
import (
"bytes"
"context"
- "errors"
"image/jpeg"
"io"
"time"
@@ -156,7 +155,7 @@ func (p *ProcessingMedia) load(ctx context.Context) (*gtsmodel.MediaAttachment,
// never decoded). Try to clean up in this case.
if p.media.Type == gtsmodel.FileTypeUnknown {
deleteErr := p.mgr.state.Storage.Delete(ctx, p.media.File.Path)
- if deleteErr != nil && !errors.Is(deleteErr, storage.ErrNotFound) {
+ if deleteErr != nil && !storage.IsNotFound(deleteErr) {
errs.Append(deleteErr)
}
}
diff --git a/internal/media/video.go b/internal/media/video.go
index f98880615..5068be636 100644
--- a/internal/media/video.go
+++ b/internal/media/video.go
@@ -36,20 +36,30 @@ type gtsVideo struct {
// decodeVideoFrame decodes and returns an image from a single frame in the given video stream.
// (note: currently this only returns a blank image resized to fit video dimensions).
func decodeVideoFrame(r io.Reader) (*gtsVideo, error) {
- // we need a readseeker to decode the video...
- tfs, err := iotools.TempFileSeeker(r)
- if err != nil {
- return nil, fmt.Errorf("error creating temp file seeker: %w", err)
- }
- defer func() {
- if err := tfs.Close(); err != nil {
- log.Errorf(nil, "error closing temp file seeker: %s", err)
+ // Check if video stream supports
+ // seeking, usually when *os.File.
+ rsc, ok := r.(io.ReadSeekCloser)
+ if !ok {
+ var err error
+
+ // Store stream to temporary location
+ // in order that we can get seek-reads.
+ rsc, err = iotools.TempFileSeeker(r)
+ if err != nil {
+ return nil, fmt.Errorf("error creating temp file seeker: %w", err)
}
- }()
+
+ defer func() {
+ // Ensure temp. read seeker closed.
+ if err := rsc.Close(); err != nil {
+ log.Errorf(nil, "error closing temp file seeker: %s", err)
+ }
+ }()
+ }
// probe the video file to extract useful metadata from it; for methodology, see:
// https://github.com/abema/go-mp4/blob/7d8e5a7c5e644e0394261b0cf72fef79ce246d31/mp4tool/probe/probe.go#L85-L154
- info, err := mp4.Probe(tfs)
+ info, err := mp4.Probe(rsc)
if err != nil {
return nil, fmt.Errorf("error during mp4 probe: %w", err)
}
diff --git a/internal/processing/media/delete.go b/internal/processing/media/delete.go
index 2b215dc81..32650fb2c 100644
--- a/internal/processing/media/delete.go
+++ b/internal/processing/media/delete.go
@@ -23,9 +23,9 @@ import (
"fmt"
"strings"
- "codeberg.org/gruf/go-store/v2/storage"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/storage"
)
// Delete deletes the media attachment with the given ID, including all files pertaining to that attachment.
@@ -44,14 +44,14 @@ func (p *Processor) Delete(ctx context.Context, mediaAttachmentID string) gtserr
// delete the thumbnail from storage
if attachment.Thumbnail.Path != "" {
- if err := p.state.Storage.Delete(ctx, attachment.Thumbnail.Path); err != nil && !errors.Is(err, storage.ErrNotFound) {
+ if err := p.state.Storage.Delete(ctx, attachment.Thumbnail.Path); err != nil && !storage.IsNotFound(err) {
errs = append(errs, fmt.Sprintf("remove thumbnail at path %s: %s", attachment.Thumbnail.Path, err))
}
}
// delete the file from storage
if attachment.File.Path != "" {
- if err := p.state.Storage.Delete(ctx, attachment.File.Path); err != nil && !errors.Is(err, storage.ErrNotFound) {
+ if err := p.state.Storage.Delete(ctx, attachment.File.Path); err != nil && !storage.IsNotFound(err) {
errs = append(errs, fmt.Sprintf("remove file at path %s: %s", attachment.File.Path, err))
}
}
diff --git a/internal/storage/storage.go b/internal/storage/storage.go
index c27037fba..872ea1210 100644
--- a/internal/storage/storage.go
+++ b/internal/storage/storage.go
@@ -19,16 +19,20 @@ package storage
import (
"context"
+ "errors"
"fmt"
"io"
"mime"
"net/url"
"path"
+ "syscall"
"time"
"codeberg.org/gruf/go-bytesize"
"codeberg.org/gruf/go-cache/v3/ttl"
- "codeberg.org/gruf/go-store/v2/storage"
+ "codeberg.org/gruf/go-storage"
+ "codeberg.org/gruf/go-storage/disk"
+ "codeberg.org/gruf/go-storage/s3"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/superseriousbusiness/gotosocial/internal/config"
@@ -48,11 +52,17 @@ type PresignedURL struct {
Expiry time.Time // link expires at this time
}
-var (
- // Ptrs to underlying storage library errors.
- ErrAlreadyExists = storage.ErrAlreadyExists
- ErrNotFound = storage.ErrNotFound
-)
+// IsAlreadyExist returns whether error is an already-exists
+// type error returned by the underlying storage library.
+func IsAlreadyExist(err error) bool {
+ return errors.Is(err, storage.ErrAlreadyExists)
+}
+
+// IsNotFound returns whether error is a not-found error
+// type returned by the underlying storage library.
+func IsNotFound(err error) bool {
+ return errors.Is(err, storage.ErrNotFound)
+}
// Driver wraps a kv.KVStore to also provide S3 presigned GET URLs.
type Driver struct {
@@ -92,30 +102,23 @@ func (d *Driver) Delete(ctx context.Context, key string) error {
// Has checks if the supplied key is in the storage.
func (d *Driver) Has(ctx context.Context, key string) (bool, error) {
- return d.Storage.Stat(ctx, key)
+ stat, err := d.Storage.Stat(ctx, key)
+ return (stat != nil), err
}
// WalkKeys walks the keys in the storage.
-func (d *Driver) WalkKeys(ctx context.Context, walk func(context.Context, string) error) error {
- return d.Storage.WalkKeys(ctx, storage.WalkKeysOptions{
- WalkFn: func(ctx context.Context, entry storage.Entry) error {
- if entry.Key == "store.lock" {
- return nil // skip this.
- }
- return walk(ctx, entry.Key)
+func (d *Driver) WalkKeys(ctx context.Context, walk func(string) error) error {
+ return d.Storage.WalkKeys(ctx, storage.WalkKeysOpts{
+ Step: func(entry storage.Entry) error {
+ return walk(entry.Key)
},
})
}
-// Close will close the storage, releasing any file locks.
-func (d *Driver) Close() error {
- return d.Storage.Close()
-}
-
// URL will return a presigned GET object URL, but only if running on S3 storage with proxying disabled.
func (d *Driver) URL(ctx context.Context, key string) *PresignedURL {
// Check whether S3 *without* proxying is enabled
- s3, ok := d.Storage.(*storage.S3Storage)
+ s3, ok := d.Storage.(*s3.S3Storage)
if !ok || d.Proxy {
return nil
}
@@ -166,7 +169,7 @@ func (d *Driver) ProbeCSPUri(ctx context.Context) (string, error) {
// Check whether S3 without proxying
// is enabled. If it's not, there's
// no need to add anything to the CSP.
- s3, ok := d.Storage.(*storage.S3Storage)
+ s3, ok := d.Storage.(*s3.S3Storage)
if !ok || d.Proxy {
return "", nil
}
@@ -217,16 +220,17 @@ func NewFileStorage() (*Driver, error) {
// Load runtime configuration
basePath := config.GetStorageLocalBasePath()
+ // Use default disk config but with
+ // increased write buffer size and
+ // 'exclusive' bit sets when creating
+ // files to ensure we don't overwrite
+ // existing files unless intending to.
+ diskCfg := disk.DefaultConfig()
+ diskCfg.OpenWrite.Flags |= syscall.O_EXCL
+ diskCfg.WriteBufSize = int(16 * bytesize.KiB)
+
// Open the disk storage implementation
- disk, err := storage.OpenDisk(basePath, &storage.DiskConfig{
- // Put the store lockfile in the storage dir itself.
- // Normally this would not be safe, since we could end up
- // overwriting the lockfile if we store a file called 'store.lock'.
- // However, in this case it's OK because the keys are set by
- // GtS and not the user, so we know we're never going to overwrite it.
- LockFile: path.Join(basePath, "store.lock"),
- WriteBufSize: int(16 * bytesize.KiB),
- })
+ disk, err := disk.Open(basePath, &diskCfg)
if err != nil {
return nil, fmt.Errorf("error opening disk storage: %w", err)
}
@@ -245,7 +249,7 @@ func NewS3Storage() (*Driver, error) {
bucket := config.GetStorageS3BucketName()
// Open the s3 storage implementation
- s3, err := storage.OpenS3(endpoint, bucket, &storage.S3Config{
+ s3, err := s3.Open(endpoint, bucket, &s3.Config{
CoreOpts: minio.Options{
Creds: credentials.NewStaticV4(access, secret, ""),
Secure: secure,