Skip to content

Commit

Permalink
feat(gensupport): per-chunk transfer timeout configs (#2865)
Browse files Browse the repository at this point in the history
feat(gensupport): per-chunk transfer timeout configs

Allow users to configure the per-chunk transfer timeout for retries
that's used during resumable uploads.

Needs to be exposed via the manual layer for storage.

Tested the feature(with default timeout and with some random value) with storagetestbench server emulator.
  • Loading branch information
Tulsishah authored Nov 20, 2024
1 parent 44435a9 commit 09fa125
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 16 deletions.
15 changes: 15 additions & 0 deletions googleapi/googleapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,20 @@ func ChunkSize(size int) MediaOption {
return chunkSizeOption(size)
}

type chunkTransferTimeoutOption time.Duration

func (cd chunkTransferTimeoutOption) setOptions(o *MediaOptions) {
o.ChunkTransferTimeout = time.Duration(cd)
}

// ChunkTransferTimeout returns a MediaOption which sets a per-chunk
// transfer timeout for resumable uploads. If a single chunk has been
// attempting to upload for longer than this time then the old req got canceled and retried.
// The default is no timeout for the request.
func ChunkTransferTimeout(timeout time.Duration) MediaOption {
return chunkTransferTimeoutOption(timeout)
}

type chunkRetryDeadlineOption time.Duration

func (cd chunkRetryDeadlineOption) setOptions(o *MediaOptions) {
Expand All @@ -283,6 +297,7 @@ type MediaOptions struct {
ForceEmptyContentType bool
ChunkSize int
ChunkRetryDeadline time.Duration
ChunkTransferTimeout time.Duration
}

// ProcessMediaOptions stores options from opts in a MediaOptions.
Expand Down
19 changes: 11 additions & 8 deletions internal/gensupport/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,14 @@ func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer
// code only.
type MediaInfo struct {
// At most one of Media and MediaBuffer will be set.
media io.Reader
buffer *MediaBuffer
singleChunk bool
mType string
size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
progressUpdater googleapi.ProgressUpdater
chunkRetryDeadline time.Duration
media io.Reader
buffer *MediaBuffer
singleChunk bool
mType string
size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
progressUpdater googleapi.ProgressUpdater
chunkRetryDeadline time.Duration
chunkTransferTimeout time.Duration
}

// NewInfoFromMedia should be invoked from the Media method of a call. It returns a
Expand All @@ -157,6 +158,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
}
}
mi.chunkRetryDeadline = opts.ChunkRetryDeadline
mi.chunkTransferTimeout = opts.ChunkTransferTimeout
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
return mi
}
Expand Down Expand Up @@ -294,7 +296,8 @@ func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
mi.progressUpdater(curr, mi.size)
}
},
ChunkRetryDeadline: mi.chunkRetryDeadline,
ChunkRetryDeadline: mi.chunkRetryDeadline,
ChunkTransferTimeout: mi.chunkTransferTimeout,
}
}

Expand Down
33 changes: 27 additions & 6 deletions internal/gensupport/media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,13 @@ func TestUploadRequestGetBody(t *testing.T) {

func TestResumableUpload(t *testing.T) {
for _, test := range []struct {
desc string
r io.Reader
chunkSize int
wantUploadType string
wantResumableUpload bool
chunkRetryDeadline time.Duration
desc string
r io.Reader
chunkSize int
wantUploadType string
wantResumableUpload bool
chunkRetryDeadline time.Duration
chunkTransferTimeOut time.Duration
}{
{
desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk",
Expand Down Expand Up @@ -263,11 +264,22 @@ func TestResumableUpload(t *testing.T) {
wantResumableUpload: true,
chunkRetryDeadline: 1 * time.Second,
},
{
desc: "confirm that ChunkTransferTimeout is carried to ResumableUpload",
r: &nullReader{2 * googleapi.MinUploadChunkSize},
chunkSize: 1,
wantUploadType: "resumable",
wantResumableUpload: true,
chunkTransferTimeOut: 5 * time.Second,
},
} {
opts := []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)}
if test.chunkRetryDeadline != 0 {
opts = append(opts, googleapi.ChunkRetryDeadline(test.chunkRetryDeadline))
}
if test.chunkTransferTimeOut != 0 {
opts = append(opts, googleapi.ChunkTransferTimeout(test.chunkTransferTimeOut))
}
mi := NewInfoFromMedia(test.r, opts)
if got, want := mi.UploadType(), test.wantUploadType; got != want {
t.Errorf("%s: upload type: got %q, want %q", test.desc, got, want)
Expand All @@ -284,6 +296,15 @@ func TestResumableUpload(t *testing.T) {
t.Errorf("%s: test case invalid; resumable upload is nil", test.desc)
}
}
if test.chunkTransferTimeOut != 0 {
if got := mi.ResumableUpload(""); got != nil {
if got.ChunkTransferTimeout != test.chunkTransferTimeOut {
t.Errorf("%s: ChunkTransferTimeout: got %v, want %v", test.desc, got.ChunkTransferTimeout, test.chunkTransferTimeOut)
}
} else {
t.Errorf("%s: test case invalid; resumable upload is nil", test.desc)
}
}
}
}

Expand Down
28 changes: 27 additions & 1 deletion internal/gensupport/resumable.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type ResumableUpload struct {
// retries should happen.
ChunkRetryDeadline time.Duration

// ChunkTransferTimeout configures the per-chunk transfer timeout. If a chunk upload stalls for longer than
// this duration, the upload will be retried.
ChunkTransferTimeout time.Duration

// Track current request invocation ID and attempt count for retry metrics
// and idempotency headers.
invocationID string
Expand Down Expand Up @@ -241,13 +245,35 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
default:
}

resp, err = rx.transferChunk(ctx)
// rCtx is derived from a context with a defined transferTimeout with non-zero value.
// If a particular request exceeds this transfer time for getting response, the rCtx deadline will be exceeded,
// triggering a retry of the request.
var rCtx context.Context
var cancel context.CancelFunc

rCtx = ctx
if rx.ChunkTransferTimeout != 0 {
rCtx, cancel = context.WithTimeout(ctx, rx.ChunkTransferTimeout)
}

resp, err = rx.transferChunk(rCtx)

var status int
if resp != nil {
status = resp.StatusCode
}

// The upload should be retried if the rCtx is canceled due to a timeout.
select {
case <-rCtx.Done():
if errors.Is(rCtx.Err(), context.DeadlineExceeded) {
// Cancel the context for rCtx
cancel()
continue
}
default:
}

// Check if we should retry the request.
if !errorFunc(status, err) {
quitAfterTimer.Stop()
Expand Down
3 changes: 2 additions & 1 deletion internal/gensupport/resumable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package gensupport

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -280,7 +281,7 @@ func TestCancelUploadBasic(t *testing.T) {
defer func() { backoff = oldBackoff }()

res, err := rx.Upload(ctx)
if err != context.Canceled {
if !errors.Is(err, context.Canceled) {
t.Fatalf("Upload err: got: %v; want: context cancelled", err)
}
if res != nil {
Expand Down

0 comments on commit 09fa125

Please sign in to comment.