mirror of
https://github.com/superseriousbusiness/gotosocial
synced 2025-06-05 21:59:39 +02:00
[bugfix] Close reader gracefully when streaming recache of remote media to fileserver api caller (#1281)
* close pipereader on failed data function * gently slurp the bytes * readability updates * go fmt * tidy up file server tests + add more cases * start moving io wrappers to separate iotools package. Remove use of buffering while piping recache stream Signed-off-by: kim <grufwub@gmail.com> * add license text Signed-off-by: kim <grufwub@gmail.com> Co-authored-by: kim <grufwub@gmail.com>
This commit is contained in:
@@ -19,7 +19,6 @@
|
||||
package media
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -29,7 +28,7 @@ import (
|
||||
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/log"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/iotools"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/media"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/transport"
|
||||
"github.com/superseriousbusiness/gotosocial/internal/uris"
|
||||
@@ -135,7 +134,6 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
|
||||
}
|
||||
|
||||
var data media.DataFunc
|
||||
var postDataCallback media.PostDataCallbackFunc
|
||||
|
||||
if mediaSize == media.SizeSmall {
|
||||
// if it's the thumbnail that's requested then the user will have to wait a bit while we process the
|
||||
@@ -155,7 +153,7 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
|
||||
//
|
||||
// this looks a bit like this:
|
||||
//
|
||||
// http fetch buffered pipe
|
||||
// http fetch pipe
|
||||
// remote server ------------> data function ----------------> api caller
|
||||
// |
|
||||
// | tee
|
||||
@@ -163,54 +161,58 @@ func (p *processor) getAttachmentContent(ctx context.Context, requestingAccount
|
||||
// ▼
|
||||
// instance storage
|
||||
|
||||
// Buffer each end of the pipe, so that if the caller drops the connection during the flow, the tee
|
||||
// reader can continue without having to worry about tee-ing into a closed or blocked pipe.
|
||||
// This pipe will connect the caller to the in-process media retrieval...
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
bufferedWriter := bufio.NewWriterSize(pipeWriter, int(attachmentContent.ContentLength))
|
||||
bufferedReader := bufio.NewReaderSize(pipeReader, int(attachmentContent.ContentLength))
|
||||
|
||||
// the caller will read from the buffered reader, so it doesn't matter if they drop out without reading everything
|
||||
attachmentContent.Content = io.NopCloser(bufferedReader)
|
||||
// Wrap the output pipe to silence any errors during the actual media
|
||||
// streaming process. We catch the error later but they must be silenced
|
||||
// during stream to prevent interruptions to storage of the actual media.
|
||||
silencedWriter := iotools.SilenceWriter(pipeWriter)
|
||||
|
||||
// Pass the reader side of the pipe to the caller to slurp from.
|
||||
attachmentContent.Content = pipeReader
|
||||
|
||||
// Create a data function which injects the writer end of the pipe
|
||||
// into the data retrieval process. If something goes wrong while
|
||||
// doing the data retrieval, we hang up the underlying pipeReader
|
||||
// to indicate to the caller that no data is available. It's up to
|
||||
// the caller of this processor function to handle that gracefully.
|
||||
data = func(innerCtx context.Context) (io.ReadCloser, int64, error) {
|
||||
t, err := p.transportController.NewTransportForUsername(innerCtx, requestingUsername)
|
||||
if err != nil {
|
||||
// propagate the transport error to read end of pipe.
|
||||
_ = pipeWriter.CloseWithError(fmt.Errorf("error getting transport for user: %w", err))
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
readCloser, fileSize, err := t.DereferenceMedia(transport.WithFastfail(innerCtx), remoteMediaIRI)
|
||||
if err != nil {
|
||||
// propagate the dereference error to read end of pipe.
|
||||
_ = pipeWriter.CloseWithError(fmt.Errorf("error dereferencing media: %w", err))
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Make a TeeReader so that everything read from the readCloser by the media manager will be written into the bufferedWriter.
|
||||
// We wrap this in a teeReadCloser which implements io.ReadCloser, so that whoever uses the teeReader can close the readCloser
|
||||
// when they're done with it.
|
||||
trc := teeReadCloser{
|
||||
teeReader: io.TeeReader(readCloser, bufferedWriter),
|
||||
close: readCloser.Close,
|
||||
}
|
||||
// Make a TeeReader so that everything read from the readCloser,
|
||||
// aka the remote instance, will also be written into the pipe.
|
||||
teeReader := io.TeeReader(readCloser, silencedWriter)
|
||||
|
||||
return trc, fileSize, nil
|
||||
}
|
||||
// Wrap teereader to implement original readcloser's close,
|
||||
// and also ensuring that we close the pipe from write end.
|
||||
return iotools.ReadFnCloser(teeReader, func() error {
|
||||
defer func() {
|
||||
// We use the error (if any) encountered by the
|
||||
// silenced writer to close connection to make sure it
|
||||
// gets propagated to the attachment.Content reader.
|
||||
_ = pipeWriter.CloseWithError(silencedWriter.Error())
|
||||
}()
|
||||
|
||||
// close the pipewriter after data has been piped into it, so the reader on the other side doesn't block;
|
||||
// we don't need to close the reader here because that's the caller's responsibility
|
||||
postDataCallback = func(innerCtx context.Context) error {
|
||||
// close the underlying pipe writer when we're done with it
|
||||
defer func() {
|
||||
if err := pipeWriter.Close(); err != nil {
|
||||
log.Errorf("getAttachmentContent: error closing pipeWriter: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// and flush the buffered writer into the buffer of the reader
|
||||
return bufferedWriter.Flush()
|
||||
return readCloser.Close()
|
||||
}), fileSize, nil
|
||||
}
|
||||
}
|
||||
|
||||
// put the media recached in the queue
|
||||
processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, postDataCallback, wantedMediaID)
|
||||
processingMedia, err := p.mediaManager.RecacheMedia(ctx, data, nil, wantedMediaID)
|
||||
if err != nil {
|
||||
return nil, gtserror.NewErrorNotFound(fmt.Errorf("error recaching media: %s", err))
|
||||
}
|
||||
|
@@ -19,6 +19,7 @@
|
||||
package media_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"path"
|
||||
@@ -143,9 +144,13 @@ func (suite *GetFileTestSuite) TestGetRemoteFileUncachedInterrupted() {
|
||||
suite.NotNil(content)
|
||||
|
||||
// only read the first kilobyte and then stop
|
||||
b := make([]byte, 1024)
|
||||
_, err = content.Content.Read(b)
|
||||
suite.NoError(err)
|
||||
b := make([]byte, 0, 1024)
|
||||
if !testrig.WaitFor(func() bool {
|
||||
read, err := io.CopyN(bytes.NewBuffer(b), content.Content, 1024)
|
||||
return err == nil && read == 1024
|
||||
}) {
|
||||
suite.FailNow("timed out trying to read first 1024 bytes")
|
||||
}
|
||||
|
||||
// close the reader
|
||||
suite.NoError(content.Content.Close())
|
||||
|
@@ -20,7 +20,6 @@ package media
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
@@ -62,16 +61,3 @@ func parseFocus(focus string) (focusx, focusy float32, err error) {
|
||||
focusy = float32(fy)
|
||||
return
|
||||
}
|
||||
|
||||
type teeReadCloser struct {
|
||||
teeReader io.Reader
|
||||
close func() error
|
||||
}
|
||||
|
||||
func (t teeReadCloser) Read(p []byte) (n int, err error) {
|
||||
return t.teeReader.Read(p)
|
||||
}
|
||||
|
||||
func (t teeReadCloser) Close() error {
|
||||
return t.close()
|
||||
}
|
||||
|
Reference in New Issue
Block a user