diff options
author | 2022-12-21 11:17:43 +0100 | |
---|---|---|
committer | 2022-12-21 11:17:43 +0100 | |
commit | 6ebdc306edd9b1ee0d853bdad63c0fb418382eb7 (patch) | |
tree | e2e2e5262af41cbeea7dd716e9cdc53092078200 /internal/processing/media/getfile.go | |
parent | [chore] note broken go v1.19.4 in contributing.md (#1278) (diff) | |
download | gotosocial-6ebdc306edd9b1ee0d853bdad63c0fb418382eb7.tar.xz |
[bugfix] Close reader gracefully when streaming recache of remote media to fileserver api caller (#1281)
* close pipereader on failed data function
* gently slurp the bytes
* readability updates
* go fmt
* tidy up file server tests + add more cases
* start moving io wrappers to separate iotools package. Remove use of buffering while piping recache stream
Signed-off-by: kim <grufwub@gmail.com>
* add license text
Signed-off-by: kim <grufwub@gmail.com>
Co-authored-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/processing/media/getfile.go')
-rw-r--r-- | internal/processing/media/getfile.go | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/internal/processing/media/getfile.go b/internal/processing/media/getfile.go index ddc14479a..eba3fdb7e 100644 --- a/internal/processing/media/getfile.go +++ b/internal/processing/media/getfile.go @@ -19,7 +19,6 @@ package media import ( - "bufio" "context" "fmt" "io" @@ -29,7 +28,7 @@ import ( apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/iotools" "github.com/superseriousbusiness/gotosocial/internal/media" "github.com/superseriousbusiness/gotosocial/internal/transport" "github.com/superseriousbusiness/gotosocial/internal/uris" @@ -135,7 +134,6 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount } var data media.DataFunc - var postDataCallback media.PostDataCallbackFunc if mediaSize == media.SizeSmall { // if it's the thumbnail that's requested then the user will have to wait a bit while we process the @@ -155,7 +153,7 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount // // this looks a bit like this: // - // http fetch buffered pipe + // http fetch pipe // remote server ------------> data function ----------------> api caller // | // | tee @@ -163,54 +161,58 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount // ▼ // instance storage - // Buffer each end of the pipe, so that if the caller drops the connection during the flow, the tee - // reader can continue without having to worry about tee-ing into a closed or blocked pipe. + // This pipe will connect the caller to the in-process media retrieval... pipeReader, pipeWriter := io.Pipe() - bufferedWriter := bufio.NewWriterSize(pipeWriter, int(attachmentContent.ContentLength)) - bufferedReader := bufio.NewReaderSize(pipeReader, int(attachmentContent.ContentLength)) - // the caller will read from the buffered reader, so it doesn't matter if they drop out without reading everything - attachmentContent.Content = io.NopCloser(bufferedReader) + // Wrap the output pipe to silence any errors during the actual media + // streaming process. We catch the error later but they must be silenced + // during stream to prevent interruptions to storage of the actual media. + silencedWriter := iotools.SilenceWriter(pipeWriter) + // Pass the reader side of the pipe to the caller to slurp from. + attachmentContent.Content = pipeReader + + // Create a data function which injects the writer end of the pipe + // into the data retrieval process. If something goes wrong while + // doing the data retrieval, we hang up the underlying pipeReader + // to indicate to the caller that no data is available. It's up to + // the caller of this processor function to handle that gracefully. data = func(innerCtx context.Context) (io.ReadCloser, int64, error) { t, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername) if err != nil { + // propagate the transport error to read end of pipe. + _ = pipeWriter.CloseWithError(fmt.Errorf("error getting transport for user: %w", err)) return nil, 0, err } readCloser, fileSize, err := t.DereferenceMedia(transport.WithFastfail(innerCtx), remoteMediaIRI) if err != nil { + // propagate the dereference error to read end of pipe. + _ = pipeWriter.CloseWithError(fmt.Errorf("error dereferencing media: %w", err)) return nil, 0, err } - // Make a TeeReader so that everything read from the readCloser by the media manager will be written into the bufferedWriter. - // We wrap this in a teeReadCloser which implements io.ReadCloser, so that whoever uses the teeReader can close the readCloser - // when they're done with it. - trc := teeReadCloser{ - teeReader: io.TeeReader(readCloser, bufferedWriter), - close: readCloser.Close, - } - - return trc, fileSize, nil - } - - // close the pipewriter after data has been piped into it, so the reader on the other side doesn't block; - // we don't need to close the reader here because that's the caller's responsibility - postDataCallback = func(innerCtx context.Context) error { - // close the underlying pipe writer when we're done with it - defer func() { - if err := pipeWriter.Close(); err != nil { - log.Errorf("getAttachmentContent: error closing pipeWriter: %s", err) - } - }() - - // and flush the buffered writer into the buffer of the reader - return bufferedWriter.Flush() + // Make a TeeReader so that everything read from the readCloser, + // aka the remote instance, will also be written into the pipe. + teeReader := io.TeeReader(readCloser, silencedWriter) + + // Wrap teereader to implement original readcloser's close, + // and also ensuring that we close the pipe from write end. + return iotools.ReadFnCloser(teeReader, func() error { + defer func() { + // We use the error (if any) encountered by the + // silenced writer to close connection to make sure it + // gets propagated to the attachment.Content reader. + _ = pipeWriter.CloseWithError(silencedWriter.Error()) + }() + + return readCloser.Close() + }), fileSize, nil } } // put the media recached in the queue - processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, postDataCallback, wantedMediaID) + processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, nil, wantedMediaID) if err != nil { return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err)) } |