Skip to content

Commit

Permalink
Merge pull request #99811 from stevendanna/retry-connection-timeout
Browse files Browse the repository at this point in the history
cloud: add optional connection timeout retries
  • Loading branch information
dt authored Mar 28, 2023
2 parents 2064a8c + e5862c2 commit b3b04e5
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 11 deletions.
7 changes: 4 additions & 3 deletions pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,8 @@ func (s *s3Storage) ReadFileAt(
) (ioctx.ReadCloserCtx, int64, error) {
ctx, sp := tracing.ChildSpan(ctx, "s3.ReadFileAt")
defer sp.Finish()
sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("s3.ReadFileAt: %s", path.Join(s.prefix, basename))})
path := path.Join(s.prefix, basename)
sp.RecordStructured(&types.StringValue{Value: fmt.Sprintf("s3.ReadFileAt: %s", path)})

stream, err := s.openStreamAt(ctx, basename, offset)
if err != nil {
Expand Down Expand Up @@ -778,8 +779,8 @@ func (s *s3Storage) ReadFileAt(
}
return s.Body, nil
}
return cloud.NewResumingReader(ctx, opener, stream.Body, offset,
cloud.IsResumableHTTPError, s3ErrDelay), size, nil
return cloud.NewResumingReader(ctx, opener, stream.Body, offset, path,
cloud.ResumingReaderRetryOnErrFnForSettings(ctx, s.settings), s3ErrDelay), size, nil
}

func (s *s3Storage) List(ctx context.Context, prefix, delim string, fn cloud.ListingFn) error {
Expand Down
43 changes: 38 additions & 5 deletions pkg/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ var WriteChunkSize = settings.RegisterByteSizeSetting(
8<<20,
)

var retryConnectionTimedOut = settings.RegisterBoolSetting(
settings.TenantWritable,
"cloudstorage.connection_timed_out_retries.enabled",
"retry generic connection timed out errors; use with extreme caution",
false,
)

// HTTPRetryOptions defines the tunable settings which control the retry of HTTP
// operations.
var HTTPRetryOptions = retry.Options{
Expand Down Expand Up @@ -147,6 +154,25 @@ func IsResumableHTTPError(err error) bool {
sysutil.IsErrConnectionRefused(err)
}

// ResumingReaderRetryOnErrFnForSettings returns a function that can
// be passed as a RetryOnErrFn to NewResumingReader.
func ResumingReaderRetryOnErrFnForSettings(
ctx context.Context, st *cluster.Settings,
) func(error) bool {
return func(err error) bool {
if IsResumableHTTPError(err) {
return true
}

retryTimeouts := retryConnectionTimedOut.Get(&st.SV)
if retryTimeouts && sysutil.IsErrTimedOut(err) {
log.Warningf(ctx, "retrying connection timed out because %s = true", retryConnectionTimedOut.Key())
return true
}
return false
}
}

// Maximum number of times we can attempt to retry reading from external storage,
// without making any progress.
const maxNoProgressReads = 3
Expand All @@ -159,6 +185,7 @@ type ReaderOpenerAt func(ctx context.Context, pos int64) (io.ReadCloser, error)
type ResumingReader struct {
Opener ReaderOpenerAt // Get additional content
Reader io.ReadCloser // Currently opened reader
Filename string // Used for logging
Pos int64 // How much data was received so far
RetryOnErrFn func(error) bool // custom retry-on-error function
// ErrFn injects a delay between retries on errors. nil means no delay.
Expand All @@ -173,13 +200,15 @@ func NewResumingReader(
opener ReaderOpenerAt,
reader io.ReadCloser,
pos int64,
filename string,
retryOnErrFn func(error) bool,
errFn func(error) time.Duration,
) *ResumingReader {
r := &ResumingReader{
Opener: opener,
Reader: reader,
Pos: pos,
Filename: filename,
RetryOnErrFn: retryOnErrFn,
ErrFn: errFn,
}
Expand All @@ -194,8 +223,12 @@ func NewResumingReader(
func (r *ResumingReader) Open(ctx context.Context) error {
return DelayedRetry(ctx, "Open", r.ErrFn, func() error {
var readErr error

r.Reader, readErr = r.Opener(ctx, r.Pos)
return readErr
if readErr != nil {
return errors.Wrapf(readErr, "open %s", r.Filename)
}
return nil
})
}

Expand All @@ -216,19 +249,19 @@ func (r *ResumingReader) Read(ctx context.Context, p []byte) (int, error) {
r.Pos += int64(n)
return n, readErr
}
lastErr = readErr
lastErr = errors.Wrapf(readErr, "read %s", r.Filename)
}

if !errors.IsAny(lastErr, io.EOF, io.ErrUnexpectedEOF) {
log.Errorf(ctx, "Read err: %s", lastErr)
log.Errorf(ctx, "%s", lastErr)
}

// Use the configured retry-on-error decider to check for a resumable error.
if r.RetryOnErrFn(lastErr) {
if retries >= maxNoProgressReads {
return 0, errors.Wrap(lastErr, "multiple Read calls return no data")
return 0, errors.Wrapf(lastErr, "multiple Read calls (%d) return no data", retries)
}
log.Errorf(ctx, "Retry IO: error %s", lastErr)
log.Errorf(ctx, "Retry IO error: %s", lastErr)
lastErr = nil
if r.Reader != nil {
r.Reader.Close()
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ func (g *gcsStorage) ReadFileAt(
}, // opener
nil, // reader
offset,
cloud.IsResumableHTTPError,
object,
cloud.ResumingReaderRetryOnErrFnForSettings(ctx, g.settings),
nil, // errFn
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/cloud/httpsink/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (h *httpStorage) ReadFileAt(
}
return s.Body, err
}
return cloud.NewResumingReader(ctx, opener, stream.Body, offset,
cloud.IsResumableHTTPError, nil), size, nil
return cloud.NewResumingReader(ctx, opener, stream.Body, offset, basename,
cloud.ResumingReaderRetryOnErrFnForSettings(ctx, h.settings), nil), size, nil
}
return ioctx.ReadCloserAdapter(stream.Body), size, nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/sysutil/sysutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ func IsErrConnectionReset(err error) bool {
func IsErrConnectionRefused(err error) bool {
return errors.Is(err, syscall.ECONNREFUSED)
}

// IsErrTimedOut returns true if an error is an ETIMEDOUT error.
func IsErrTimedOut(err error) bool {
return errors.Is(err, syscall.ETIMEDOUT)
}

0 comments on commit b3b04e5

Please sign in to comment.