summaryrefslogtreecommitdiff
path: root/internal/processing/media/getfile.go
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2022-03-07 11:08:26 +0100
committerLibravatar GitHub <noreply@github.com>2022-03-07 11:08:26 +0100
commit07727753b96d209406783e5e539725bcdafebdc7 (patch)
treeb32f11cbc304d633ed0acd8f84b4c11e909bb5f3 /internal/processing/media/getfile.go
parent[documentation] Creates Docker documentation and docker-compose.yaml (#416) (diff)
downloadgotosocial-07727753b96d209406783e5e539725bcdafebdc7.tar.xz
[feature] Clean up/uncache remote media (#407)
* Add whereNotEmptyAndNotNull * Add GetRemoteOlderThanDays * Add GetRemoteOlderThanDays * Add PruneRemote to Manager interface * Start implementing PruneRemote * add new attachment + status to tests * fix up and test GetRemoteOlderThan * fix bad import * PruneRemote: return number pruned * add Cached column to mediaattachment * update + test pruneRemote * update mediaTest * use Cached column * upstep bun to latest version * embed structs in mediaAttachment * migrate mediaAttachment to new format * don't default cached to true * select only remote media * update db dependencies * step bun back to last working version * update pruneRemote to use Cached field * fix storage path of test attachments * add recache logic to manager * fix trimmed aspect ratio * test prune and recache * return errwithcode * tidy up different paths for emoji vs attachment * fix incorrect thumbnail type being stored * expose TransportController to media processor * implement tee-ing recached content * add thoughts of dog to test fedi attachments * test get remote files * add comment on PruneRemote * add postData cleanup to recache * test thumbnail fetching * add incredible diagram * go mod tidy * buffer pipes for recache streaming * test for client stops reading after 1kb * add media-remote-cache-days to config * add cron package * wrap logrus so it's available to cron * start and stop cron jobs gracefully
Diffstat (limited to 'internal/processing/media/getfile.go')
-rw-r--r--internal/processing/media/getfile.go225
1 files changed, 181 insertions, 44 deletions
diff --git a/internal/processing/media/getfile.go b/internal/processing/media/getfile.go
index 7431224c4..1faa8702f 100644
--- a/internal/processing/media/getfile.go
+++ b/internal/processing/media/getfile.go
@@ -19,8 +19,11 @@
package media
import (
+ "bufio"
"context"
"fmt"
+ "io"
+ "net/url"
"strings"
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
@@ -29,7 +32,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/media"
)
-func (p *processor) GetFile(ctx context.Context, account *gtsmodel.Account, form *apimodel.GetContentRequestForm) (*apimodel.Content, error) {
+func (p *processor) GetFile(ctx context.Context, account *gtsmodel.Account, form *apimodel.GetContentRequestForm) (*apimodel.Content, gtserror.WithCode) {
// parse the form fields
mediaSize, err := media.ParseMediaSize(form.MediaSize)
if err != nil {
@@ -46,74 +49,208 @@ func (p *processor) GetFile(ctx context.Context, account *gtsmodel.Account, form
return nil, gtserror.NewErrorNotFound(fmt.Errorf("file name %s not parseable", form.FileName))
}
wantedMediaID := spl[0]
+ expectedAccountID := form.AccountID
// get the account that owns the media and make sure it's not suspended
- acct, err := p.db.GetAccountByID(ctx, form.AccountID)
+ acct, err := p.db.GetAccountByID(ctx, expectedAccountID)
if err != nil {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("account with id %s could not be selected from the db: %s", form.AccountID, err))
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("account with id %s could not be selected from the db: %s", expectedAccountID, err))
}
if !acct.SuspendedAt.IsZero() {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("account with id %s is suspended", form.AccountID))
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("account with id %s is suspended", expectedAccountID))
}
// make sure the requesting account and the media account don't block each other
if account != nil {
- blocked, err := p.db.IsBlocked(ctx, account.ID, form.AccountID, true)
+ blocked, err := p.db.IsBlocked(ctx, account.ID, expectedAccountID, true)
if err != nil {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("block status could not be established between accounts %s and %s: %s", form.AccountID, account.ID, err))
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("block status could not be established between accounts %s and %s: %s", expectedAccountID, account.ID, err))
}
if blocked {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("block exists between accounts %s and %s", form.AccountID, account.ID))
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("block exists between accounts %s and %s", expectedAccountID, account.ID))
}
}
// the way we store emojis is a little different from the way we store other attachments,
// so we need to take different steps depending on the media type being requested
- content := &apimodel.Content{}
- var storagePath string
switch mediaType {
case media.TypeEmoji:
- e := &gtsmodel.Emoji{}
- if err := p.db.GetByID(ctx, wantedMediaID, e); err != nil {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("emoji %s could not be taken from the db: %s", wantedMediaID, err))
- }
- if e.Disabled {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("emoji %s has been disabled", wantedMediaID))
- }
- switch mediaSize {
- case media.SizeOriginal:
- content.ContentType = e.ImageContentType
- content.ContentLength = int64(e.ImageFileSize)
- storagePath = e.ImagePath
- case media.SizeStatic:
- content.ContentType = e.ImageStaticContentType
- content.ContentLength = int64(e.ImageStaticFileSize)
- storagePath = e.ImageStaticPath
- default:
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("media size %s not recognized for emoji", mediaSize))
- }
+ return p.getEmojiContent(ctx, wantedMediaID, mediaSize)
case media.TypeAttachment, media.TypeHeader, media.TypeAvatar:
- a, err := p.db.GetAttachmentByID(ctx, wantedMediaID)
- if err != nil {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("attachment %s could not be taken from the db: %s", wantedMediaID, err))
+ return p.getAttachmentContent(ctx, account, wantedMediaID, expectedAccountID, mediaSize)
+ default:
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("media type %s not recognized", mediaType))
+ }
+}
+
+func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount *gtsmodel.Account, wantedMediaID string, expectedAccountID string, mediaSize media.Size) (*apimodel.Content, gtserror.WithCode) {
+ attachmentContent := &apimodel.Content{}
+ var storagePath string
+
+ // retrieve attachment from the database and do basic checks on it
+ a, err := p.db.GetAttachmentByID(ctx, wantedMediaID)
+ if err != nil {
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("attachment %s could not be taken from the db: %s", wantedMediaID, err))
+ }
+
+ if a.AccountID != expectedAccountID {
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("attachment %s is not owned by %s", wantedMediaID, expectedAccountID))
+ }
+
+ // get file information from the attachment depending on the requested media size
+ switch mediaSize {
+ case media.SizeOriginal:
+ attachmentContent.ContentType = a.File.ContentType
+ attachmentContent.ContentLength = int64(a.File.FileSize)
+ storagePath = a.File.Path
+ case media.SizeSmall:
+ attachmentContent.ContentType = a.Thumbnail.ContentType
+ attachmentContent.ContentLength = int64(a.Thumbnail.FileSize)
+ storagePath = a.Thumbnail.Path
+ default:
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("media size %s not recognized for attachment", mediaSize))
+ }
+
+ // if we have the media cached on our server already, we can now simply return it from storage
+ if a.Cached {
+ return p.streamFromStorage(storagePath, attachmentContent)
+ }
+
+ // if we don't have it cached, then we can assume two things:
+ // 1. this is remote media, since local media should never be uncached
+ // 2. we need to fetch it again using a transport and the media manager
+ remoteMediaIRI, err := url.Parse(a.RemoteURL)
+ if err != nil {
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("error parsing remote media iri %s: %s", a.RemoteURL, err))
+ }
+
+ // use an empty string as requestingUsername to use the instance account, unless the request for this
+ // media has been http signed, then use the requesting account to make the request to remote server
+ var requestingUsername string
+ if requestingAccount != nil {
+ requestingUsername = requestingAccount.Username
+ }
+
+ var data media.DataFunc
+ var postDataCallback media.PostDataCallbackFunc
+
+ if mediaSize == media.SizeSmall {
+ // if it's the thumbnail that's requested then the user will have to wait a bit while we process the
+ // large version and derive a thumbnail from it, so use the normal recaching procedure: fetch the media,
+ // process it, then return the thumbnail data
+ data = func(innerCtx context.Context) (io.Reader, int, error) {
+ transport, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername)
+ if err != nil {
+ return nil, 0, err
+ }
+ return transport.DereferenceMedia(innerCtx, remoteMediaIRI)
}
- if a.AccountID != form.AccountID {
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("attachment %s is not owned by %s", wantedMediaID, form.AccountID))
+ } else {
+ // if it's the full-sized version being requested, we can cheat a bit by streaming data to the user as
+ // it's retrieved from the remote server, using tee; this saves the user from having to wait while
+ // we process the media on our side
+ //
+ // this looks a bit like this:
+ //
+ // http fetch buffered pipe
+ // remote server ------------> data function ----------------> api caller
+ // |
+ // | tee
+ // |
+ // ▼
+ // instance storage
+
+ // Buffer each end of the pipe, so that if the caller drops the connection during the flow, the tee
+ // reader can continue without having to worry about tee-ing into a closed or blocked pipe.
+ pipeReader, pipeWriter := io.Pipe()
+ bufferedWriter := bufio.NewWriterSize(pipeWriter, int(attachmentContent.ContentLength))
+ bufferedReader := bufio.NewReaderSize(pipeReader, int(attachmentContent.ContentLength))
+
+ // the caller will read from the buffered reader, so it doesn't matter if they drop out without reading everything
+ attachmentContent.Content = bufferedReader
+
+ data = func(innerCtx context.Context) (io.Reader, int, error) {
+ transport, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ readCloser, fileSize, err := transport.DereferenceMedia(innerCtx, remoteMediaIRI)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ // everything read from the readCloser by the media manager will be written into the bufferedWriter
+ teeReader := io.TeeReader(readCloser, bufferedWriter)
+ return teeReader, fileSize, nil
}
- switch mediaSize {
- case media.SizeOriginal:
- content.ContentType = a.File.ContentType
- content.ContentLength = int64(a.File.FileSize)
- storagePath = a.File.Path
- case media.SizeSmall:
- content.ContentType = a.Thumbnail.ContentType
- content.ContentLength = int64(a.Thumbnail.FileSize)
- storagePath = a.Thumbnail.Path
- default:
- return nil, gtserror.NewErrorNotFound(fmt.Errorf("media size %s not recognized for attachment", mediaSize))
+
+ // close the pipewriter after data has been piped into it, so the reader on the other side doesn't block;
+ // we don't need to close the reader here because that's the caller's responsibility
+ postDataCallback = func(innerCtx context.Context) error {
+ // flush the buffered writer into the buffer of the reader...
+ if err := bufferedWriter.Flush(); err != nil {
+ return err
+ }
+
+ // and close the underlying pipe writer
+ if err := pipeWriter.Close(); err != nil {
+ return err
+ }
+
+ return nil
+ }
+ }
+
+ // put the media recached in the queue
+ processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, postDataCallback, wantedMediaID)
+ if err != nil {
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err))
+ }
+
+ // if it's the thumbnail, stream the processed thumbnail from storage, after waiting for processing to finish
+ if mediaSize == media.SizeSmall {
+ // below function call blocks until all processing on the attachment has finished...
+ if _, err := processingMedia.LoadAttachment(ctx); err != nil {
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("error loading recached attachment: %s", err))
}
+ // ... so now we can safely return it
+ return p.streamFromStorage(storagePath, attachmentContent)
}
+ return attachmentContent, nil
+}
+
+func (p *processor) getEmojiContent(ctx context.Context, wantedEmojiID string, emojiSize media.Size) (*apimodel.Content, gtserror.WithCode) {
+ emojiContent := &apimodel.Content{}
+ var storagePath string
+
+ e := &gtsmodel.Emoji{}
+ if err := p.db.GetByID(ctx, wantedEmojiID, e); err != nil {
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("emoji %s could not be taken from the db: %s", wantedEmojiID, err))
+ }
+
+ if e.Disabled {
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("emoji %s has been disabled", wantedEmojiID))
+ }
+
+ switch emojiSize {
+ case media.SizeOriginal:
+ emojiContent.ContentType = e.ImageContentType
+ emojiContent.ContentLength = int64(e.ImageFileSize)
+ storagePath = e.ImagePath
+ case media.SizeStatic:
+ emojiContent.ContentType = e.ImageStaticContentType
+ emojiContent.ContentLength = int64(e.ImageStaticFileSize)
+ storagePath = e.ImageStaticPath
+ default:
+ return nil, gtserror.NewErrorNotFound(fmt.Errorf("media size %s not recognized for emoji", emojiSize))
+ }
+
+ return p.streamFromStorage(storagePath, emojiContent)
+}
+
+func (p *processor) streamFromStorage(storagePath string, content *apimodel.Content) (*apimodel.Content, gtserror.WithCode) {
reader, err := p.storage.GetStream(storagePath)
if err != nil {
return nil, gtserror.NewErrorNotFound(fmt.Errorf("error retrieving from storage: %s", err))