From 7a6ada0983bf843405028e28c886bb15bb4b05f3 Mon Sep 17 00:00:00 2001 From: Benjamin Rewis <32186188+benjirewis@users.noreply.github.com> Date: Tue, 4 May 2021 12:07:53 -0400 Subject: [PATCH] GODRIVER-1925 Surface cursor errors in DownloadStream fillBuffer (#653) --- mongo/gridfs/download_stream.go | 8 +++++ mongo/integration/gridfs_test.go | 56 ++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/mongo/gridfs/download_stream.go b/mongo/gridfs/download_stream.go index 23e90677279..79b9c2916bd 100644 --- a/mongo/gridfs/download_stream.go +++ b/mongo/gridfs/download_stream.go @@ -119,6 +119,9 @@ func (ds *DownloadStream) Close() error { } ds.closed = true + if ds.cursor != nil { + return ds.cursor.Close(context.Background()) + } return nil } @@ -226,6 +229,11 @@ func (ds *DownloadStream) GetFile() *File { func (ds *DownloadStream) fillBuffer(ctx context.Context) error { if !ds.cursor.Next(ctx) { ds.done = true + // Check for cursor error, otherwise there are no more chunks. + if ds.cursor.Err() != nil { + _ = ds.cursor.Close(ctx) + return ds.cursor.Err() + } return errNoMoreChunks } diff --git a/mongo/integration/gridfs_test.go b/mongo/integration/gridfs_test.go index cd8d4a92c24..849ac93c935 100644 --- a/mongo/integration/gridfs_test.go +++ b/mongo/integration/gridfs_test.go @@ -12,6 +12,7 @@ import ( "io" "math/rand" "runtime" + "strings" "testing" "time" @@ -369,6 +370,61 @@ func TestGridFS(x *testing.T) { _, err = bucket.OpenDownloadStream(oid) assert.Equal(mt, gridfs.ErrMissingChunkSize, err, "expected error %v, got %v", gridfs.ErrMissingChunkSize, err) }) + mt.Run("cursor error during read after downloading", func(mt *mtest.T) { + // To simulate a cursor error we upload a file larger than the 16MB default batch size, + // so the underlying cursor remains open on the server. Since the ReadDeadline is + // set in the past, Read should cause a timeout. + + fileName := "read-error-test" + fileData := make([]byte, 17000000) + + bucket, err := gridfs.NewBucket(mt.DB) + assert.Nil(mt, err, "NewBucket error: %v", err) + defer func() { _ = bucket.Drop() }() + + dataReader := bytes.NewReader(fileData) + _, err = bucket.UploadFromStream(fileName, dataReader) + assert.Nil(mt, err, "UploadFromStream error: %v", err) + + ds, err := bucket.OpenDownloadStreamByName(fileName) + assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err) + + err = ds.SetReadDeadline(time.Now().Add(-1 * time.Second)) + assert.Nil(mt, err, "SetReadDeadline error: %v", err) + + p := make([]byte, len(fileData)) + _, err = ds.Read(p) + assert.NotNil(mt, err, "expected error from Read, got nil") + assert.True(mt, strings.Contains(err.Error(), "context deadline exceeded"), + "expected error to contain 'context deadline exceeded', got %v", err.Error()) + }) + mt.Run("cursor error during skip after downloading", func(mt *mtest.T) { + // To simulate a cursor error we upload a file larger than the 16MB default batch size, + // so the underlying cursor remains open on the server. Since the ReadDeadline is + // set in the past, Skip should cause a timeout. + + fileName := "skip-error-test" + fileData := make([]byte, 17000000) + + bucket, err := gridfs.NewBucket(mt.DB) + assert.Nil(mt, err, "NewBucket error: %v", err) + defer func() { _ = bucket.Drop() }() + + dataReader := bytes.NewReader(fileData) + _, err = bucket.UploadFromStream(fileName, dataReader) + assert.Nil(mt, err, "UploadFromStream error: %v", err) + + ds, err := bucket.OpenDownloadStreamByName(fileName) + assert.Nil(mt, err, "OpenDownloadStreamByName error: %v", err) + + err = ds.SetReadDeadline(time.Now().Add(-1 * time.Second)) + assert.Nil(mt, err, "SetReadDeadline error: %v", err) + + _, err = ds.Skip(int64(len(fileData))) + assert.NotNil(mt, err, "expected error from Skip, got nil") + assert.True(mt, strings.Contains(err.Error(), "context deadline exceeded"), + "expected error to contain 'context deadline exceeded', got %v", err.Error()) + }) }) mt.RunOpts("bucket collection accessors", noClientOpts, func(mt *mtest.T) {