summaryrefslogtreecommitdiff
path: root/internal/processing
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing')
-rw-r--r--internal/processing/media/getfile.go70
-rw-r--r--internal/processing/media/getfile_test.go11
-rw-r--r--internal/processing/media/util.go14
3 files changed, 44 insertions, 51 deletions
diff --git a/internal/processing/media/getfile.go b/internal/processing/media/getfile.go
index ddc14479a..eba3fdb7e 100644
--- a/internal/processing/media/getfile.go
+++ b/internal/processing/media/getfile.go
@@ -19,7 +19,6 @@
package media
import (
- "bufio"
"context"
"fmt"
"io"
@@ -29,7 +28,7 @@ import (
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/iotools"
"github.com/superseriousbusiness/gotosocial/internal/media"
"github.com/superseriousbusiness/gotosocial/internal/transport"
"github.com/superseriousbusiness/gotosocial/internal/uris"
@@ -135,7 +134,6 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
}
var data media.DataFunc
- var postDataCallback media.PostDataCallbackFunc
if mediaSize == media.SizeSmall {
// if it's the thumbnail that's requested then the user will have to wait a bit while we process the
@@ -155,7 +153,7 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
//
// this looks a bit like this:
//
- // http fetch buffered pipe
+ // http fetch pipe
// remote server ------------> data function ----------------> api caller
// |
// | tee
@@ -163,54 +161,58 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
// ▼
// instance storage
- // Buffer each end of the pipe, so that if the caller drops the connection during the flow, the tee
- // reader can continue without having to worry about tee-ing into a closed or blocked pipe.
+ // This pipe will connect the caller to the in-process media retrieval...
pipeReader, pipeWriter := io.Pipe()
- bufferedWriter := bufio.NewWriterSize(pipeWriter, int(attachmentContent.ContentLength))
- bufferedReader := bufio.NewReaderSize(pipeReader, int(attachmentContent.ContentLength))
- // the caller will read from the buffered reader, so it doesn't matter if they drop out without reading everything
- attachmentContent.Content = io.NopCloser(bufferedReader)
+ // Wrap the output pipe to silence any errors during the actual media
+ // streaming process. We catch the error later but they must be silenced
+ // during stream to prevent interruptions to storage of the actual media.
+ silencedWriter := iotools.SilenceWriter(pipeWriter)
+ // Pass the reader side of the pipe to the caller to slurp from.
+ attachmentContent.Content = pipeReader
+
+ // Create a data function which injects the writer end of the pipe
+ // into the data retrieval process. If something goes wrong while
+ // doing the data retrieval, we hang up the underlying pipeReader
+ // to indicate to the caller that no data is available. It's up to
+ // the caller of this processor function to handle that gracefully.
data = func(innerCtx context.Context) (io.ReadCloser, int64, error) {
t, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername)
if err != nil {
+ // propagate the transport error to read end of pipe.
+ _ = pipeWriter.CloseWithError(fmt.Errorf("error getting transport for user: %w", err))
return nil, 0, err
}
readCloser, fileSize, err := t.DereferenceMedia(transport.WithFastfail(innerCtx), remoteMediaIRI)
if err != nil {
+ // propagate the dereference error to read end of pipe.
+ _ = pipeWriter.CloseWithError(fmt.Errorf("error dereferencing media: %w", err))
return nil, 0, err
}
- // Make a TeeReader so that everything read from the readCloser by the media manager will be written into the bufferedWriter.
- // We wrap this in a teeReadCloser which implements io.ReadCloser, so that whoever uses the teeReader can close the readCloser
- // when they're done with it.
- trc := teeReadCloser{
- teeReader: io.TeeReader(readCloser, bufferedWriter),
- close: readCloser.Close,
- }
-
- return trc, fileSize, nil
- }
-
- // close the pipewriter after data has been piped into it, so the reader on the other side doesn't block;
- // we don't need to close the reader here because that's the caller's responsibility
- postDataCallback = func(innerCtx context.Context) error {
- // close the underlying pipe writer when we're done with it
- defer func() {
- if err := pipeWriter.Close(); err != nil {
- log.Errorf("getAttachmentContent: error closing pipeWriter: %s", err)
- }
- }()
-
- // and flush the buffered writer into the buffer of the reader
- return bufferedWriter.Flush()
+ // Make a TeeReader so that everything read from the readCloser,
+ // aka the remote instance, will also be written into the pipe.
+ teeReader := io.TeeReader(readCloser, silencedWriter)
+
+ // Wrap teereader to implement original readcloser's close,
+ // and also ensuring that we close the pipe from write end.
+ return iotools.ReadFnCloser(teeReader, func() error {
+ defer func() {
+ // We use the error (if any) encountered by the
+ // silenced writer to close connection to make sure it
+ // gets propagated to the attachment.Content reader.
+ _ = pipeWriter.CloseWithError(silencedWriter.Error())
+ }()
+
+ return readCloser.Close()
+ }), fileSize, nil
}
}
// put the media recached in the queue
- processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, postDataCallback, wantedMediaID)
+ processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, nil, wantedMediaID)
if err != nil {
return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err))
}
diff --git a/internal/processing/media/getfile_test.go b/internal/processing/media/getfile_test.go
index ba7269535..7b9786914 100644
--- a/internal/processing/media/getfile_test.go
+++ b/internal/processing/media/getfile_test.go
@@ -19,6 +19,7 @@
package media_test
import (
+ "bytes"
"context"
"io"
"path"
@@ -143,9 +144,13 @@ func (suite *GetFileTestSuite) TestGetRemoteFileUncachedInterrupted() {
suite.NotNil(content)
// only read the first kilobyte and then stop
- b := make([]byte, 1024)
- _, err = content.Content.Read(b)
- suite.NoError(err)
+ b := make([]byte, 0, 1024)
+ if !testrig.WaitFor(func() bool {
+ read, err := io.CopyN(bytes.NewBuffer(b), content.Content, 1024)
+ return err == nil && read == 1024
+ }) {
+ suite.FailNow("timed out trying to read first 1024 bytes")
+ }
// close the reader
suite.NoError(content.Content.Close())
diff --git a/internal/processing/media/util.go b/internal/processing/media/util.go
index 9739e70b7..37dc87979 100644
--- a/internal/processing/media/util.go
+++ b/internal/processing/media/util.go
@@ -20,7 +20,6 @@ package media
import (
"fmt"
- "io"
"strconv"
"strings"
)
@@ -62,16 +61,3 @@ func parseFocus(focus string) (focusx, focusy float32, err error) {
focusy = float32(fy)
return
}
-
-type teeReadCloser struct {
- teeReader io.Reader
- close func() error
-}
-
-func (t teeReadCloser) Read(p []byte) (n int, err error) {
- return t.teeReader.Read(p)
-}
-
-func (t teeReadCloser) Close() error {
- return t.close()
-}