summaryrefslogtreecommitdiff
path: root/internal/processing/media/getfile.go
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2022-12-21 11:17:43 +0100
committerLibravatar GitHub <noreply@github.com>2022-12-21 11:17:43 +0100
commit6ebdc306edd9b1ee0d853bdad63c0fb418382eb7 (patch)
treee2e2e5262af41cbeea7dd716e9cdc53092078200 /internal/processing/media/getfile.go
parent[chore] note broken go v1.19.4 in contributing.md (#1278) (diff)
downloadgotosocial-6ebdc306edd9b1ee0d853bdad63c0fb418382eb7.tar.xz
[bugfix] Close reader gracefully when streaming recache of remote media to fileserver api caller (#1281)
* close pipereader on failed data function * gently slurp the bytes * readability updates * go fmt * tidy up file server tests + add more cases * start moving io wrappers to separate iotools package. Remove use of buffering while piping recache stream Signed-off-by: kim <grufwub@gmail.com> * add license text Signed-off-by: kim <grufwub@gmail.com> Co-authored-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/processing/media/getfile.go')
-rw-r--r--internal/processing/media/getfile.go70
1 files changed, 36 insertions, 34 deletions
diff --git a/internal/processing/media/getfile.go b/internal/processing/media/getfile.go
index ddc14479a..eba3fdb7e 100644
--- a/internal/processing/media/getfile.go
+++ b/internal/processing/media/getfile.go
@@ -19,7 +19,6 @@
package media
import (
- "bufio"
"context"
"fmt"
"io"
@@ -29,7 +28,7 @@ import (
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/iotools"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/transport"
"github.com/superseriousbusiness/gotosocial/internal/uris"
@@ -135,7 +134,6 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
}
var data media.DataFunc
- var postDataCallback media.PostDataCallbackFunc
if mediaSize == media.SizeSmall {
// if it's the thumbnail that's requested then the user will have to wait a bit while we process the
@@ -155,7 +153,7 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
//
// this looks a bit like this:
//
- // http fetch buffered pipe
+ // http fetch pipe
// remote server ------------> data function ----------------> api caller
// |
// | tee
@@ -163,54 +161,58 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
// ▼
// instance storage
- // Buffer each end of the pipe, so that if the caller drops the connection during the flow, the tee
- // reader can continue without having to worry about tee-ing into a closed or blocked pipe.
+ // This pipe will connect the caller to the in-process media retrieval...
pipeReader, pipeWriter := io.Pipe()
- bufferedWriter := bufio.NewWriterSize(pipeWriter, int(attachmentContent.ContentLength))
- bufferedReader := bufio.NewReaderSize(pipeReader, int(attachmentContent.ContentLength))
- // the caller will read from the buffered reader, so it doesn't matter if they drop out without reading everything
- attachmentContent.Content = io.NopCloser(bufferedReader)
+ // Wrap the output pipe to silence any errors during the actual media
+ // streaming process. We catch the error later but they must be silenced
+ // during stream to prevent interruptions to storage of the actual media.
+ silencedWriter := iotools.SilenceWriter(pipeWriter)
+ // Pass the reader side of the pipe to the caller to slurp from.
+ attachmentContent.Content = pipeReader
+
+ // Create a data function which injects the writer end of the pipe
+ // into the data retrieval process. If something goes wrong while
+ // doing the data retrieval, we hang up the underlying pipeReader
+ // to indicate to the caller that no data is available. It's up to
+ // the caller of this processor function to handle that gracefully.
data = func(innerCtx context.Context) (io.ReadCloser, int64, error) {
t, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername)
if err != nil {
+ // propagate the transport error to read end of pipe.
+ _ = pipeWriter.CloseWithError(fmt.Errorf("error getting transport for user: %w", err))
return nil, 0, err
}
readCloser, fileSize, err := t.DereferenceMedia(transport.WithFastfail(innerCtx), remoteMediaIRI)
if err != nil {
+ // propagate the dereference error to read end of pipe.
+ _ = pipeWriter.CloseWithError(fmt.Errorf("error dereferencing media: %w", err))
return nil, 0, err
}
- // Make a TeeReader so that everything read from the readCloser by the media manager will be written into the bufferedWriter.
- // We wrap this in a teeReadCloser which implements io.ReadCloser, so that whoever uses the teeReader can close the readCloser
- // when they're done with it.
- trc := teeReadCloser{
- teeReader: io.TeeReader(readCloser, bufferedWriter),
- close: readCloser.Close,
- }
-
- return trc, fileSize, nil
- }
-
- // close the pipewriter after data has been piped into it, so the reader on the other side doesn't block;
- // we don't need to close the reader here because that's the caller's responsibility
- postDataCallback = func(innerCtx context.Context) error {
- // close the underlying pipe writer when we're done with it
- defer func() {
- if err := pipeWriter.Close(); err != nil {
- log.Errorf("getAttachmentContent: error closing pipeWriter: %s", err)
- }
- }()
-
- // and flush the buffered writer into the buffer of the reader
- return bufferedWriter.Flush()
+ // Make a TeeReader so that everything read from the readCloser,
+ // aka the remote instance, will also be written into the pipe.
+ teeReader := io.TeeReader(readCloser, silencedWriter)
+
+ // Wrap teereader to implement original readcloser's close,
+ // and also ensuring that we close the pipe from write end.
+ return iotools.ReadFnCloser(teeReader, func() error {
+ defer func() {
+ // We use the error (if any) encountered by the
+ // silenced writer to close connection to make sure it
+ // gets propagated to the attachment.Content reader.
+ _ = pipeWriter.CloseWithError(silencedWriter.Error())
+ }()
+
+ return readCloser.Close()
+ }), fileSize, nil
}
}
// put the media recached in the queue
- processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, postDataCallback, wantedMediaID)
+ processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, nil, wantedMediaID)
if err != nil {
return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err))
}