diff options
author | 2023-01-11 11:13:13 +0000 | |
---|---|---|
committer | 2023-01-11 12:13:13 +0100 | |
commit | 53180548083c0a100db2f703d5f5da047a9e0031 (patch) | |
tree | a8eb1df9d03b37f907a747ae42cc8992d2ff9f52 /internal/processing/media/getfile.go | |
parent | [feature] Add local user and post count to nodeinfo responses (#1325) (diff) | |
download | gotosocial-53180548083c0a100db2f703d5f5da047a9e0031.tar.xz |
[performance] media processing improvements (#1288)
* media processor consolidation and reformatting, reduce amount of required syscalls
Signed-off-by: kim <grufwub@gmail.com>
* update go-store library, stream jpeg/png encoding + use buffer pools, improved media processing AlreadyExists error handling
Signed-off-by: kim <grufwub@gmail.com>
* fix duration not being set, fix mp4 test expecting error
Signed-off-by: kim <grufwub@gmail.com>
* fix test expecting media files with different extension
Signed-off-by: kim <grufwub@gmail.com>
* remove unused code
Signed-off-by: kim <grufwub@gmail.com>
* fix expected storage paths in tests, update expected test thumbnails
Signed-off-by: kim <grufwub@gmail.com>
* remove dead code
Signed-off-by: kim <grufwub@gmail.com>
* fix cached presigned s3 url fetching
Signed-off-by: kim <grufwub@gmail.com>
* fix tests
Signed-off-by: kim <grufwub@gmail.com>
* fix test models
Signed-off-by: kim <grufwub@gmail.com>
* update media processing to use sync.Once{} for concurrency protection
Signed-off-by: kim <grufwub@gmail.com>
* shutup linter
Signed-off-by: kim <grufwub@gmail.com>
* fix passing in KVStore GetStream() as stream to PutStream()
Signed-off-by: kim <grufwub@gmail.com>
* fix unlocks of storage keys
Signed-off-by: kim <grufwub@gmail.com>
* whoops, return the error...
Signed-off-by: kim <grufwub@gmail.com>
* pour one out for tobi's code <3
Signed-off-by: kim <grufwub@gmail.com>
* add back the byte slurping code
Signed-off-by: kim <grufwub@gmail.com>
* check for both ErrUnexpectedEOF and EOF
Signed-off-by: kim <grufwub@gmail.com>
* add back links to file format header information
Signed-off-by: kim <grufwub@gmail.com>
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/processing/media/getfile.go')
-rw-r--r-- | internal/processing/media/getfile.go | 164 |
1 files changed, 49 insertions, 115 deletions
diff --git a/internal/processing/media/getfile.go b/internal/processing/media/getfile.go index 14e031e52..d5f74926a 100644 --- a/internal/processing/media/getfile.go +++ b/internal/processing/media/getfile.go @@ -28,7 +28,6 @@ import ( apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/iotools" "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/transport" "github.com/superseriousbusiness/gotosocial/internal/uris" @@ -99,135 +98,70 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount return nil, gtserror.NewErrorNotFound(fmt.Errorf("attachment %s is not owned by %s", wantedMediaID, owningAccountID)) } - // get file information from the attachment depending on the requested media size - switch mediaSize { - case media.SizeOriginal: - attachmentContent.ContentType = a.File.ContentType - attachmentContent.ContentLength = int64(a.File.FileSize) - storagePath = a.File.Path - case media.SizeSmall: - attachmentContent.ContentType = a.Thumbnail.ContentType - attachmentContent.ContentLength = int64(a.Thumbnail.FileSize) - storagePath = a.Thumbnail.Path - default: - return nil, gtserror.NewErrorNotFound(fmt.Errorf("media size %s not recognized for attachment", mediaSize)) - } - - // if we have the media cached on our server already, we can now simply return it from storage - if *a.Cached { - return p.retrieveFromStorage(ctx, storagePath, attachmentContent) - } - - // if we don't have it cached, then we can assume two things: - // 1. this is remote media, since local media should never be uncached - // 2. we need to fetch it again using a transport and the media manager - remoteMediaIRI, err := url.Parse(a.RemoteURL) - if err != nil { - return nil, gtserror.NewErrorNotFound(fmt.Errorf("error parsing remote media iri %s: %s", a.RemoteURL, err)) - } - - // use an empty string as requestingUsername to use the instance account, unless the request for this - // media has been http signed, then use the requesting account to make the request to remote server - var requestingUsername string - if requestingAccount != nil { - requestingUsername = requestingAccount.Username - } + if !*a.Cached { + // if we don't have it cached, then we can assume two things: + // 1. this is remote media, since local media should never be uncached + // 2. we need to fetch it again using a transport and the media manager + remoteMediaIRI, err := url.Parse(a.RemoteURL) + if err != nil { + return nil, gtserror.NewErrorNotFound(fmt.Errorf("error parsing remote media iri %s: %s", a.RemoteURL, err)) + } - var data media.DataFunc + // use an empty string as requestingUsername to use the instance account, unless the request for this + // media has been http signed, then use the requesting account to make the request to remote server + var requestingUsername string + if requestingAccount != nil { + requestingUsername = requestingAccount.Username + } - if mediaSize == media.SizeSmall { - // if it's the thumbnail that's requested then the user will have to wait a bit while we process the - // large version and derive a thumbnail from it, so use the normal recaching procedure: fetch the media, - // process it, then return the thumbnail data - data = func(innerCtx context.Context) (io.ReadCloser, int64, error) { + // Pour one out for tobi's original streamed recache + // (streaming data both to the client and storage). + // Gone and forever missed <3 + // + // [ + // the reason it was removed was because a slow + // client connection could hold open a storage + // recache operation, and so holding open a media + // worker worker. + // ] + + dataFn := func(innerCtx context.Context) (io.ReadCloser, int64, error) { t, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername) if err != nil { return nil, 0, err } return t.DereferenceMedia(transport.WithFastfail(innerCtx), remoteMediaIRI) } - } else { - // if it's the full-sized version being requested, we can cheat a bit by streaming data to the user as - // it's retrieved from the remote server, using tee; this saves the user from having to wait while - // we process the media on our side - // - // this looks a bit like this: - // - // http fetch pipe - // remote server ------------> data function ----------------> api caller - // | - // | tee - // | - // ▼ - // instance storage - - // This pipe will connect the caller to the in-process media retrieval... - pipeReader, pipeWriter := io.Pipe() - // Wrap the output pipe to silence any errors during the actual media - // streaming process. We catch the error later but they must be silenced - // during stream to prevent interruptions to storage of the actual media. - silencedWriter := iotools.SilenceWriter(pipeWriter) - - // Pass the reader side of the pipe to the caller to slurp from. - attachmentContent.Content = pipeReader - - // Create a data function which injects the writer end of the pipe - // into the data retrieval process. If something goes wrong while - // doing the data retrieval, we hang up the underlying pipeReader - // to indicate to the caller that no data is available. It's up to - // the caller of this processor function to handle that gracefully. - data = func(innerCtx context.Context) (io.ReadCloser, int64, error) { - t, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername) - if err != nil { - // propagate the transport error to read end of pipe. - _ = pipeWriter.CloseWithError(fmt.Errorf("error getting transport for user: %w", err)) - return nil, 0, err - } - - readCloser, fileSize, err := t.DereferenceMedia(transport.WithFastfail(innerCtx), remoteMediaIRI) - if err != nil { - // propagate the dereference error to read end of pipe. - _ = pipeWriter.CloseWithError(fmt.Errorf("error dereferencing media: %w", err)) - return nil, 0, err - } - - // Make a TeeReader so that everything read from the readCloser, - // aka the remote instance, will also be written into the pipe. - teeReader := io.TeeReader(readCloser, silencedWriter) - - // Wrap teereader to implement original readcloser's close, - // and also ensuring that we close the pipe from write end. - return iotools.ReadFnCloser(teeReader, func() error { - defer func() { - // We use the error (if any) encountered by the - // silenced writer to close connection to make sure it - // gets propagated to the attachment.Content reader. - _ = pipeWriter.CloseWithError(silencedWriter.Error()) - }() - - return readCloser.Close() - }), fileSize, nil + // Start recaching this media with the prepared data function. + processingMedia, err := p.mediaManager.RecacheMedia(ctx, dataFn, nil, wantedMediaID) + if err != nil { + return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err)) } - } - - // put the media recached in the queue - processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, nil, wantedMediaID) - if err != nil { - return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err)) - } - // if it's the thumbnail, stream the processed thumbnail from storage, after waiting for processing to finish - if mediaSize == media.SizeSmall { - // below function call blocks until all processing on the attachment has finished... - if _, err := processingMedia.LoadAttachment(ctx); err != nil { + // Load attachment and block until complete + a, err = processingMedia.LoadAttachment(ctx) + if err != nil { return nil, gtserror.NewErrorNotFound(fmt.Errorf("error loading recached attachment: %s", err)) } - // ... so now we can safely return it - return p.retrieveFromStorage(ctx, storagePath, attachmentContent) } - return attachmentContent, nil + // get file information from the attachment depending on the requested media size + switch mediaSize { + case media.SizeOriginal: + attachmentContent.ContentType = a.File.ContentType + attachmentContent.ContentLength = int64(a.File.FileSize) + storagePath = a.File.Path + case media.SizeSmall: + attachmentContent.ContentType = a.Thumbnail.ContentType + attachmentContent.ContentLength = int64(a.Thumbnail.FileSize) + storagePath = a.Thumbnail.Path + default: + return nil, gtserror.NewErrorNotFound(fmt.Errorf("media size %s not recognized for attachment", mediaSize)) + } + + // ... so now we can safely return it + return p.retrieveFromStorage(ctx, storagePath, attachmentContent) } func (p *processor) getEmojiContent(ctx context.Context, fileName string, owningAccountID string, emojiSize media.Size) (*apimodel.Content, gtserror.WithCode) { |