Skip to content

Commit

Permalink
aws: add rate_limiter_capacity to configure client-side rate limits
Browse files Browse the repository at this point in the history
Issuing many S3 uploads in quick succession causes the following
error:

operation error S3: PutObject, failed to get rate limit token, retry quota exceeded, 0 available, 5 requested

By default, this rate limit is disabled to restore the same behavior
as SDK v1. If `rate_limiter_capacity` is set, then client-side rate
limits will take effect.

Closes google#3496
  • Loading branch information
stanhu committed Oct 10, 2024
1 parent bbdd0b3 commit 60a130a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
27 changes: 27 additions & 0 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"strconv"

awsv2 "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/ratelimit"
"github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go-v2/config"
awsv2cfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
Expand Down Expand Up @@ -197,9 +200,13 @@ func NewDefaultV2Config(ctx context.Context) (awsv2.Config, error) {
// - hostname_immutable: Make the hostname immutable, only works if endpoint is also set.
// - dualstack: A value of "true" enables dual stack (IPv4 and IPv6) endpoints.
// - fips: A value of "true" enables the use of FIPS endpoints.
// - rate_limiter_capacity: A integer value configures the capacity of a token bucket used
// in client-side rate limits. If no value is set, the client-side rate limiting is disabled.
// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/retries-timeouts/#client-side-rate-limiting.
func V2ConfigFromURLParams(ctx context.Context, q url.Values) (awsv2.Config, error) {
var endpoint string
var hostnameImmutable bool
var rateLimitCapacity int64
var opts []func(*awsv2cfg.LoadOptions) error
for param, values := range q {
value := values[0]
Expand Down Expand Up @@ -232,6 +239,12 @@ func V2ConfigFromURLParams(ctx context.Context, q url.Values) (awsv2.Config, err
if fips {
opts = append(opts, awsv2cfg.WithUseFIPSEndpoint(awsv2.FIPSEndpointStateEnabled))
}
case "rate_limiter_capacity":
var err error
rateLimitCapacity, err = strconv.ParseInt(value, 10, 32)
if err != nil {
return awsv2.Config{}, fmt.Errorf("invalid value for capacity: %w", err)
}
case "awssdk":
// ignore, should be handled before this
default:
Expand All @@ -251,5 +264,19 @@ func V2ConfigFromURLParams(ctx context.Context, q url.Values) (awsv2.Config, err
opts = append(opts, awsv2cfg.WithEndpointResolverWithOptions(customResolver))
}

// SDK v2 adds client-side rate limiting: https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/retries-timeouts/#client-side-rate-limiting
// This can cause "failed to get rate limit token" errors. Disable this to restore
// the behavior of SDK v1.
var rateLimiter retry.RateLimiter
rateLimiter = ratelimit.None
if rateLimitCapacity > 0 {
rateLimiter = ratelimit.NewTokenRateLimit(uint(rateLimitCapacity))
}
opts = append(opts, config.WithRetryer(func() awsv2.Retryer {
return retry.NewStandard(func(so *retry.StandardOptions) {
so.RateLimiter = rateLimiter
})
}))

return awsv2cfg.LoadDefaultConfig(ctx, opts...)
}
11 changes: 11 additions & 0 deletions aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

awsv2 "github.com/aws/aws-sdk-go-v2/aws"
awsv2retry "github.com/aws/aws-sdk-go-v2/aws/retry"
"github.com/aws/aws-sdk-go/aws"
"github.com/google/go-cmp/cmp"
gcaws "gocloud.dev/aws"
Expand Down Expand Up @@ -198,6 +199,10 @@ func TestV2ConfigFromURLParams(t *testing.T) {
name: "FIPS and dual stack",
query: url.Values{"fips": {"true"}, "dualstack": {"true"}},
},
{
name: "Rate limit capacity",
query: url.Values{"rate_limiter_capacity": {"500"}},
},
// Can't test "profile", since AWS validates that the profile exists.
}

Expand Down Expand Up @@ -227,6 +232,12 @@ func TestV2ConfigFromURLParams(t *testing.T) {
t.Errorf("got endpoint %+v, want %+v", gotE, *test.wantEndpoint)
}
}

// Unfortunately, we can't look at the options set for the rate limiter.
r, ok := got.Retryer().(*awsv2retry.Standard)
if !ok {
t.Errorf("expected a standard retryer, got %v, expected awsv2retry.Standard", r)
}
})
}
}

0 comments on commit 60a130a

Please sign in to comment.