Skip to content

Commit

Permalink
Fix Retry middleware not releasing retry token
Browse files Browse the repository at this point in the history
Updates the Retry middleware to release the retry token, on subsequent
attempts. This fixes aws#1413, and is based on PR aws#1424.
  • Loading branch information
jasdel committed Jan 14, 2022
1 parent 07e5d3e commit 172fdb1
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 125 deletions.
8 changes: 8 additions & 0 deletions .changelog/781710a7ecb24b9290b2642bd90b42c9.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "781710a7-ecb2-4b92-90b2-642bd90b42c9",
"type": "bugfix",
"description": "Updates the Retry middleware to release the retry token, on subsequent attempts. This fixes #1413, and is based on PR #1424",
"modules": [
"."
]
}
123 changes: 81 additions & 42 deletions aws/retry/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/aws/smithy-go/transport/http"
)

// RequestCloner is a function that can take an input request type and clone the request
// for use in a subsequent retry attempt
// RequestCloner is a function that can take an input request type and clone
// the request for use in a subsequent retry attempt.
type RequestCloner func(interface{}) interface{}

type retryMetadata struct {
Expand All @@ -27,11 +27,12 @@ type retryMetadata struct {
AttemptClockSkew time.Duration
}

// Attempt is a Smithy FinalizeMiddleware that handles retry attempts using the provided
// Retryer implementation
// Attempt is a Smithy Finalize middleware that handles retry attempts using
// the provided Retryer implementation.
type Attempt struct {
// Enable the logging of retry attempts performed by the SDK.
// This will include logging retry attempts, unretryable errors, and when max attempts are reached.
// Enable the logging of retry attempts performed by the SDK. This will
// include logging retry attempts, unretryable errors, and when max
// attempts are reached.
LogAttempts bool

retryer aws.Retryer
Expand Down Expand Up @@ -59,21 +60,24 @@ func (r Attempt) logf(logger logging.Logger, classification logging.Classificati
logger.Logf(classification, format, v...)
}

// HandleFinalize utilizes the provider Retryer implementation to attempt retries over the next handler
func (r Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
// HandleFinalize utilizes the provider Retryer implementation to attempt
// retries over the next handler
func (r *Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
) {
var attemptNum int
var attemptClockSkew time.Duration
var attemptResults AttemptResults

maxAttempts := r.retryer.MaxAttempts()
releaseRetryToken := nopRelease

for {
attemptNum++
attemptInput := in
attemptInput.Request = r.requestCloner(attemptInput.Request)

// Record the metadata for the for attempt being started.
attemptCtx := setRetryMetadata(ctx, retryMetadata{
AttemptNum: attemptNum,
AttemptTime: sdk.NowTime().UTC(),
Expand All @@ -82,23 +86,20 @@ func (r Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInp
})

var attemptResult AttemptResult
out, attemptResult, releaseRetryToken, err = r.handleAttempt(attemptCtx, attemptInput, releaseRetryToken, next)
attemptClockSkew, _ = awsmiddle.GetAttemptSkew(attemptResult.ResponseMetadata)

out, attemptResult, err = r.handleAttempt(attemptCtx, attemptInput, next)

var ok bool
attemptClockSkew, ok = awsmiddle.GetAttemptSkew(attemptResult.ResponseMetadata)
if !ok {
attemptClockSkew = 0
}

// AttempResult Retried states that the attempt was not successful, and
// should be retried.
shouldRetry := attemptResult.Retried

// add attempt metadata to list of all attempt metadata
// Add attempt metadata to list of all attempt metadata
attemptResults.Results = append(attemptResults.Results, attemptResult)

if !shouldRetry {
// Ensure the last response's metadata is used as the bases for result
// metadata returned by the stack.
// metadata returned by the stack. The Slice of attempt results
// will be added to this cloned metadata.
metadata = attemptResult.ResponseMetadata.Clone()

break
Expand All @@ -110,26 +111,36 @@ func (r Attempt) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInp
}

// handleAttempt handles an individual request attempt.
func (r Attempt) handleAttempt(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
out smithymiddle.FinalizeOutput, attemptResult AttemptResult, err error,
func (r *Attempt) handleAttempt(
ctx context.Context, in smithymiddle.FinalizeInput, releaseRetryToken func(error) error, next smithymiddle.FinalizeHandler,
) (
out smithymiddle.FinalizeOutput, attemptResult AttemptResult, _ func(error) error, err error,
) {
defer func() {
attemptResult.Err = err
}()

relRetryToken := r.retryer.GetInitialToken()
//------------------------------
// Get Initial (aka Send) Token
//------------------------------
releaseInitialToken := r.retryer.GetInitialToken()

//------------------------------
// Send Attempt
//------------------------------
logger := smithymiddle.GetLogger(ctx)
service, operation := awsmiddle.GetServiceID(ctx), awsmiddle.GetOperationName(ctx)

retryMetadata, _ := getRetryMetadata(ctx)
attemptNum := retryMetadata.AttemptNum
maxAttempts := retryMetadata.MaxAttempts

// Following attempts must ensure the request payload stream starts in a
// rewound state.
if attemptNum > 1 {
if rewindable, ok := in.Request.(interface{ RewindStream() error }); ok {
if rewindErr := rewindable.RewindStream(); rewindErr != nil {
err = fmt.Errorf("failed to rewind transport stream for retry, %w", rewindErr)
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}
}

Expand All @@ -140,51 +151,78 @@ func (r Attempt) handleAttempt(ctx context.Context, in smithymiddle.FinalizeInpu
out, metadata, err = next.HandleFinalize(ctx, in)
attemptResult.ResponseMetadata = metadata

if releaseError := relRetryToken(err); releaseError != nil && err != nil {
err = fmt.Errorf("failed to release token after request error, %w", err)
return out, attemptResult, err
//------------------------------
// Bookkeeping
//------------------------------
// Release the initial send token based on the state of the attempt's error (if any).
if releaseError := releaseInitialToken(err); releaseError != nil && err != nil {
err = fmt.Errorf("failed to release initial token after request error, %w", err)
return out, attemptResult, nopRelease, err
}

// Release the retry token based on the state of the attempt's error (if any).
if releaseError := releaseRetryToken(err); releaseError != nil && err != nil {
err = fmt.Errorf("failed to release retry token after request error, %w", err)
return out, attemptResult, nopRelease, err
}
// If there was no error making the attempt, nothing further to do. There
// will be nothing to retry.
if err == nil {
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}

//------------------------------
// Is Retryable and Should Retry
//------------------------------
// If the attempt failed with an unretryable error, nothing further to do
// but return, and inform the caller about the terminal failure.
retryable := r.retryer.IsErrorRetryable(err)
if !retryable {
r.logf(logger, logging.Debug, "request failed with unretryable error %v", err)
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}

// set retryable to true
attemptResult.Retryable = true

// Once the maximum number of attempts have been exhausted there is nothing
// further to do other than inform the caller about the terminal failure.
if maxAttempts > 0 && attemptNum >= maxAttempts {
r.logf(logger, logging.Debug, "max retry attempts exhausted, max %d", maxAttempts)
err = &MaxAttemptsError{
Attempt: attemptNum,
Err: err,
}
return out, attemptResult, err
return out, attemptResult, nopRelease, err
}

relRetryToken, reqErr := r.retryer.GetRetryToken(ctx, err)
if reqErr != nil {
return out, attemptResult, reqErr
//------------------------------
// Get Retry (aka Retry Quota) Token
//------------------------------
// Get a retry token that will be released after the
releaseRetryToken, retryTokenErr := r.retryer.GetRetryToken(ctx, err)
if retryTokenErr != nil {
return out, attemptResult, nopRelease, retryTokenErr
}

//------------------------------
// Retry Delay and Sleep
//------------------------------
// Get the retry delay before another attempt can be made, and sleep for
// that time. Potentially early exist if the sleep is canceled via the
// context.
retryDelay, reqErr := r.retryer.RetryDelay(attemptNum, err)
if reqErr != nil {
return out, attemptResult, reqErr
return out, attemptResult, releaseRetryToken, reqErr
}

if reqErr = sdk.SleepWithContext(ctx, retryDelay); reqErr != nil {
err = &aws.RequestCanceledError{Err: reqErr}
return out, attemptResult, err
return out, attemptResult, releaseRetryToken, err
}

// The request should be re-attempted.
attemptResult.Retried = true

return out, attemptResult, err
return out, attemptResult, releaseRetryToken, err
}

// MetricsHeader attaches SDK request metric header for retries to the transport
Expand All @@ -195,7 +233,7 @@ func (r *MetricsHeader) ID() string {
return "RetryMetricsHeader"
}

// HandleFinalize attaches the sdk request metric header to the transport layer
// HandleFinalize attaches the SDK request metric header to the transport layer
func (r MetricsHeader) HandleFinalize(ctx context.Context, in smithymiddle.FinalizeInput, next smithymiddle.FinalizeHandler) (
out smithymiddle.FinalizeOutput, metadata smithymiddle.Metadata, err error,
) {
Expand Down Expand Up @@ -251,13 +289,14 @@ func setRetryMetadata(ctx context.Context, metadata retryMetadata) context.Conte
return middleware.WithStackValue(ctx, retryMetadataKey{}, metadata)
}

// AddRetryMiddlewaresOptions is the set of options that can be passed to AddRetryMiddlewares for configuring retry
// associated middleware.
// AddRetryMiddlewaresOptions is the set of options that can be passed to
// AddRetryMiddlewares for configuring retry associated middleware.
type AddRetryMiddlewaresOptions struct {
Retryer aws.Retryer

// Enable the logging of retry attempts performed by the SDK.
// This will include logging retry attempts, unretryable errors, and when max attempts are reached.
// Enable the logging of retry attempts performed by the SDK. This will
// include logging retry attempts, unretryable errors, and when max
// attempts are reached.
LogRetryAttempts bool
}

Expand Down
Loading

0 comments on commit 172fdb1

Please sign in to comment.