diff options
| author | 2022-12-21 11:17:43 +0100 | |
|---|---|---|
| committer | 2022-12-21 11:17:43 +0100 | |
| commit | 6ebdc306edd9b1ee0d853bdad63c0fb418382eb7 (patch) | |
| tree | e2e2e5262af41cbeea7dd716e9cdc53092078200 /internal/processing | |
| parent | [chore] note broken go v1.19.4 in contributing.md (#1278) (diff) | |
| download | gotosocial-6ebdc306edd9b1ee0d853bdad63c0fb418382eb7.tar.xz | |
[bugfix] Close reader gracefully when streaming recache of remote media to fileserver api caller (#1281)
* close pipereader on failed data function
* gently slurp the bytes
* readability updates
* go fmt
* tidy up file server tests + add more cases
* start moving io wrappers to separate iotools package. Remove use of buffering while piping recache stream
Signed-off-by: kim <grufwub@gmail.com>
* add license text
Signed-off-by: kim <grufwub@gmail.com>
Co-authored-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/processing')
| -rw-r--r-- | internal/processing/media/getfile.go | 70 | ||||
| -rw-r--r-- | internal/processing/media/getfile_test.go | 11 | ||||
| -rw-r--r-- | internal/processing/media/util.go | 14 | 
3 files changed, 44 insertions, 51 deletions
diff --git a/internal/processing/media/getfile.go b/internal/processing/media/getfile.go index ddc14479a..eba3fdb7e 100644 --- a/internal/processing/media/getfile.go +++ b/internal/processing/media/getfile.go @@ -19,7 +19,6 @@  package media  import ( -	"bufio"  	"context"  	"fmt"  	"io" @@ -29,7 +28,7 @@ import (  	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"  	"github.com/superseriousbusiness/gotosocial/internal/gtserror"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/log" +	"github.com/superseriousbusiness/gotosocial/internal/iotools"  	"github.com/superseriousbusiness/gotosocial/internal/media"  	"github.com/superseriousbusiness/gotosocial/internal/transport"  	"github.com/superseriousbusiness/gotosocial/internal/uris" @@ -135,7 +134,6 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount  	}  	var data media.DataFunc -	var postDataCallback media.PostDataCallbackFunc  	if mediaSize == media.SizeSmall {  		// if it's the thumbnail that's requested then the user will have to wait a bit while we process the @@ -155,7 +153,7 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount  		//  		// this looks a bit like this:  		// -		//                http fetch                   buffered pipe +		//                http fetch                       pipe  		// remote server ------------> data function ----------------> api caller  		//                                   |  		//                                   | tee @@ -163,54 +161,58 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount  		//                                   ▼  		//                            instance storage -		// Buffer each end of the pipe, so that if the caller drops the connection during the flow, the tee -		// reader can continue without having to worry about tee-ing into a closed or blocked pipe. +		// This pipe will connect the caller to the in-process media retrieval...  		pipeReader, pipeWriter := io.Pipe() -		bufferedWriter := bufio.NewWriterSize(pipeWriter, int(attachmentContent.ContentLength)) -		bufferedReader := bufio.NewReaderSize(pipeReader, int(attachmentContent.ContentLength)) -		// the caller will read from the buffered reader, so it doesn't matter if they drop out without reading everything -		attachmentContent.Content = io.NopCloser(bufferedReader) +		// Wrap the output pipe to silence any errors during the actual media +		// streaming process. We catch the error later but they must be silenced +		// during stream to prevent interruptions to storage of the actual media. +		silencedWriter := iotools.SilenceWriter(pipeWriter) +		// Pass the reader side of the pipe to the caller to slurp from. +		attachmentContent.Content = pipeReader + +		// Create a data function which injects the writer end of the pipe +		// into the data retrieval process. If something goes wrong while +		// doing the data retrieval, we hang up the underlying pipeReader +		// to indicate to the caller that no data is available. It's up to +		// the caller of this processor function to handle that gracefully.  		data = func(innerCtx context.Context) (io.ReadCloser, int64, error) {  			t, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername)  			if err != nil { +				// propagate the transport error to read end of pipe. +				_ = pipeWriter.CloseWithError(fmt.Errorf("error getting transport for user: %w", err))  				return nil, 0, err  			}  			readCloser, fileSize, err := t.DereferenceMedia(transport.WithFastfail(innerCtx), remoteMediaIRI)  			if err != nil { +				// propagate the dereference error to read end of pipe. +				_ = pipeWriter.CloseWithError(fmt.Errorf("error dereferencing media: %w", err))  				return nil, 0, err  			} -			// Make a TeeReader so that everything read from the readCloser by the media manager will be written into the bufferedWriter. -			// We wrap this in a teeReadCloser which implements io.ReadCloser, so that whoever uses the teeReader can close the readCloser -			// when they're done with it. -			trc := teeReadCloser{ -				teeReader: io.TeeReader(readCloser, bufferedWriter), -				close:     readCloser.Close, -			} - -			return trc, fileSize, nil -		} - -		// close the pipewriter after data has been piped into it, so the reader on the other side doesn't block; -		// we don't need to close the reader here because that's the caller's responsibility -		postDataCallback = func(innerCtx context.Context) error { -			// close the underlying pipe writer when we're done with it -			defer func() { -				if err := pipeWriter.Close(); err != nil { -					log.Errorf("getAttachmentContent: error closing pipeWriter: %s", err) -				} -			}() - -			// and flush the buffered writer into the buffer of the reader -			return bufferedWriter.Flush() +			// Make a TeeReader so that everything read from the readCloser, +			// aka the remote instance, will also be written into the pipe. +			teeReader := io.TeeReader(readCloser, silencedWriter) + +			// Wrap teereader to implement original readcloser's close, +			// and also ensuring that we close the pipe from write end. +			return iotools.ReadFnCloser(teeReader, func() error { +				defer func() { +					// We use the error (if any) encountered by the +					// silenced writer to close connection to make sure it +					// gets propagated to the attachment.Content reader. +					_ = pipeWriter.CloseWithError(silencedWriter.Error()) +				}() + +				return readCloser.Close() +			}), fileSize, nil  		}  	}  	// put the media recached in the queue -	processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, postDataCallback, wantedMediaID) +	processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, nil, wantedMediaID)  	if err != nil {  		return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err))  	} diff --git a/internal/processing/media/getfile_test.go b/internal/processing/media/getfile_test.go index ba7269535..7b9786914 100644 --- a/internal/processing/media/getfile_test.go +++ b/internal/processing/media/getfile_test.go @@ -19,6 +19,7 @@  package media_test  import ( +	"bytes"  	"context"  	"io"  	"path" @@ -143,9 +144,13 @@ func (suite *GetFileTestSuite) TestGetRemoteFileUncachedInterrupted() {  	suite.NotNil(content)  	// only read the first kilobyte and then stop -	b := make([]byte, 1024) -	_, err = content.Content.Read(b) -	suite.NoError(err) +	b := make([]byte, 0, 1024) +	if !testrig.WaitFor(func() bool { +		read, err := io.CopyN(bytes.NewBuffer(b), content.Content, 1024) +		return err == nil && read == 1024 +	}) { +		suite.FailNow("timed out trying to read first 1024 bytes") +	}  	// close the reader  	suite.NoError(content.Content.Close()) diff --git a/internal/processing/media/util.go b/internal/processing/media/util.go index 9739e70b7..37dc87979 100644 --- a/internal/processing/media/util.go +++ b/internal/processing/media/util.go @@ -20,7 +20,6 @@ package media  import (  	"fmt" -	"io"  	"strconv"  	"strings"  ) @@ -62,16 +61,3 @@ func parseFocus(focus string) (focusx, focusy float32, err error) {  	focusy = float32(fy)  	return  } - -type teeReadCloser struct { -	teeReader io.Reader -	close     func() error -} - -func (t teeReadCloser) Read(p []byte) (n int, err error) { -	return t.teeReader.Read(p) -} - -func (t teeReadCloser) Close() error { -	return t.close() -}  | 
