diff --git a/googleapi/googleapi.go b/googleapi/googleapi.go index b5e38c66282..04a10f51c97 100644 --- a/googleapi/googleapi.go +++ b/googleapi/googleapi.go @@ -259,6 +259,20 @@ func ChunkSize(size int) MediaOption { return chunkSizeOption(size) } +type chunkTransferTimeoutOption time.Duration + +func (cd chunkTransferTimeoutOption) setOptions(o *MediaOptions) { + o.ChunkTransferTimeout = time.Duration(cd) +} + +// ChunkTransferTimeout returns a MediaOption which sets a per-chunk +// transfer timeout for resumable uploads. If a single chunk has been +// attempting to upload for longer than this time then the old req got canceled and retried. +// The default is no timeout for the request. +func ChunkTransferTimeout(timeout time.Duration) MediaOption { + return chunkTransferTimeoutOption(timeout) +} + type chunkRetryDeadlineOption time.Duration func (cd chunkRetryDeadlineOption) setOptions(o *MediaOptions) { @@ -283,6 +297,7 @@ type MediaOptions struct { ForceEmptyContentType bool ChunkSize int ChunkRetryDeadline time.Duration + ChunkTransferTimeout time.Duration } // ProcessMediaOptions stores options from opts in a MediaOptions. diff --git a/internal/gensupport/media.go b/internal/gensupport/media.go index c048a57084b..0861d4d3c87 100644 --- a/internal/gensupport/media.go +++ b/internal/gensupport/media.go @@ -135,13 +135,14 @@ func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer // code only. type MediaInfo struct { // At most one of Media and MediaBuffer will be set. - media io.Reader - buffer *MediaBuffer - singleChunk bool - mType string - size int64 // mediaSize, if known. Used only for calls to progressUpdater_. - progressUpdater googleapi.ProgressUpdater - chunkRetryDeadline time.Duration + media io.Reader + buffer *MediaBuffer + singleChunk bool + mType string + size int64 // mediaSize, if known. Used only for calls to progressUpdater_. + progressUpdater googleapi.ProgressUpdater + chunkRetryDeadline time.Duration + chunkTransferTimeout time.Duration } // NewInfoFromMedia should be invoked from the Media method of a call. It returns a @@ -157,6 +158,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo { } } mi.chunkRetryDeadline = opts.ChunkRetryDeadline + mi.chunkTransferTimeout = opts.ChunkTransferTimeout mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize) return mi } @@ -294,7 +296,8 @@ func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload { mi.progressUpdater(curr, mi.size) } }, - ChunkRetryDeadline: mi.chunkRetryDeadline, + ChunkRetryDeadline: mi.chunkRetryDeadline, + ChunkTransferTimeout: mi.chunkTransferTimeout, } } diff --git a/internal/gensupport/media_test.go b/internal/gensupport/media_test.go index 65fd62b3dcc..f018fabf8fe 100644 --- a/internal/gensupport/media_test.go +++ b/internal/gensupport/media_test.go @@ -218,12 +218,13 @@ func TestUploadRequestGetBody(t *testing.T) { func TestResumableUpload(t *testing.T) { for _, test := range []struct { - desc string - r io.Reader - chunkSize int - wantUploadType string - wantResumableUpload bool - chunkRetryDeadline time.Duration + desc string + r io.Reader + chunkSize int + wantUploadType string + wantResumableUpload bool + chunkRetryDeadline time.Duration + chunkTransferTimeOut time.Duration }{ { desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk", @@ -263,11 +264,22 @@ func TestResumableUpload(t *testing.T) { wantResumableUpload: true, chunkRetryDeadline: 1 * time.Second, }, + { + desc: "confirm that ChunkTransferTimeout is carried to ResumableUpload", + r: &nullReader{2 * googleapi.MinUploadChunkSize}, + chunkSize: 1, + wantUploadType: "resumable", + wantResumableUpload: true, + chunkTransferTimeOut: 5 * time.Second, + }, } { opts := []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)} if test.chunkRetryDeadline != 0 { opts = append(opts, googleapi.ChunkRetryDeadline(test.chunkRetryDeadline)) } + if test.chunkTransferTimeOut != 0 { + opts = append(opts, googleapi.ChunkTransferTimeout(test.chunkTransferTimeOut)) + } mi := NewInfoFromMedia(test.r, opts) if got, want := mi.UploadType(), test.wantUploadType; got != want { t.Errorf("%s: upload type: got %q, want %q", test.desc, got, want) @@ -284,6 +296,15 @@ func TestResumableUpload(t *testing.T) { t.Errorf("%s: test case invalid; resumable upload is nil", test.desc) } } + if test.chunkTransferTimeOut != 0 { + if got := mi.ResumableUpload(""); got != nil { + if got.ChunkTransferTimeout != test.chunkTransferTimeOut { + t.Errorf("%s: ChunkTransferTimeout: got %v, want %v", test.desc, got.ChunkTransferTimeout, test.chunkTransferTimeOut) + } + } else { + t.Errorf("%s: test case invalid; resumable upload is nil", test.desc) + } + } } } diff --git a/internal/gensupport/resumable.go b/internal/gensupport/resumable.go index f828ddb60e6..9e3bcf15963 100644 --- a/internal/gensupport/resumable.go +++ b/internal/gensupport/resumable.go @@ -43,6 +43,10 @@ type ResumableUpload struct { // retries should happen. ChunkRetryDeadline time.Duration + // ChunkTransferTimeout configures the per-chunk transfer timeout. If a chunk upload stalls for longer than + // this duration, the upload will be retried. + ChunkTransferTimeout time.Duration + // Track current request invocation ID and attempt count for retry metrics // and idempotency headers. invocationID string @@ -241,13 +245,35 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err default: } - resp, err = rx.transferChunk(ctx) + // rCtx is derived from a context with a defined transferTimeout with non-zero value. + // If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded, + // triggering a retry of the request. + var rCtx context.Context + var cancel context.CancelFunc + + rCtx = ctx + if rx.ChunkTransferTimeout != 0 { + rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout) + } + + resp, err = rx.transferChunk(rCtx) var status int if resp != nil { status = resp.StatusCode } + // The upload should be retried if the rCtx is canceled due to a timeout. + select { + case <-rCtx.Done(): + if errors.Is(rCtx.Err(), context.DeadlineExceeded) { + // Cancel the context for rCtx + cancel() + continue + } + default: + } + // Check if we should retry the request. if !errorFunc(status, err) { quitAfterTimer.Stop() diff --git a/internal/gensupport/resumable_test.go b/internal/gensupport/resumable_test.go index 0000d403e0c..5b1e39a5945 100644 --- a/internal/gensupport/resumable_test.go +++ b/internal/gensupport/resumable_test.go @@ -6,6 +6,7 @@ package gensupport import ( "context" + "errors" "fmt" "io" "net/http" @@ -280,7 +281,7 @@ func TestCancelUploadBasic(t *testing.T) { defer func() { backoff = oldBackoff }() res, err := rx.Upload(ctx) - if err != context.Canceled { + if !errors.Is(err, context.Canceled) { t.Fatalf("Upload err: got: %v; want: context cancelled", err) } if res != nil {