From 60a130ab95f75ed4b056d6f3fad12ab29a738849 Mon Sep 17 00:00:00 2001 From: Stan Hu Date: Wed, 9 Oct 2024 11:45:19 -0400 Subject: [PATCH] aws: add rate_limiter_capacity to configure client-side rate limits Issuing many S3 uploads in quick succession causes the following error: operation error S3: PutObject, failed to get rate limit token, retry quota exceeded, 0 available, 5 requested By default, this rate limit is disabled to restore the same behavior as SDK v1. If `rate_limiter_capacity` is set, then client-side rate limits will take effect. Closes https://github.com/google/go-cloud/issues/3496 --- aws/aws.go | 27 +++++++++++++++++++++++++++ aws/aws_test.go | 11 +++++++++++ 2 files changed, 38 insertions(+) diff --git a/aws/aws.go b/aws/aws.go index f4a6dce45e..ac8af5d6be 100644 --- a/aws/aws.go +++ b/aws/aws.go @@ -22,6 +22,9 @@ import ( "strconv" awsv2 "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/ratelimit" + "github.com/aws/aws-sdk-go-v2/aws/retry" + "github.com/aws/aws-sdk-go-v2/config" awsv2cfg "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/client" @@ -197,9 +200,13 @@ func NewDefaultV2Config(ctx context.Context) (awsv2.Config, error) { // - hostname_immutable: Make the hostname immutable, only works if endpoint is also set. // - dualstack: A value of "true" enables dual stack (IPv4 and IPv6) endpoints. // - fips: A value of "true" enables the use of FIPS endpoints. +// - rate_limiter_capacity: A integer value configures the capacity of a token bucket used +// in client-side rate limits. If no value is set, the client-side rate limiting is disabled. +// See https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/retries-timeouts/#client-side-rate-limiting. func V2ConfigFromURLParams(ctx context.Context, q url.Values) (awsv2.Config, error) { var endpoint string var hostnameImmutable bool + var rateLimitCapacity int64 var opts []func(*awsv2cfg.LoadOptions) error for param, values := range q { value := values[0] @@ -232,6 +239,12 @@ func V2ConfigFromURLParams(ctx context.Context, q url.Values) (awsv2.Config, err if fips { opts = append(opts, awsv2cfg.WithUseFIPSEndpoint(awsv2.FIPSEndpointStateEnabled)) } + case "rate_limiter_capacity": + var err error + rateLimitCapacity, err = strconv.ParseInt(value, 10, 32) + if err != nil { + return awsv2.Config{}, fmt.Errorf("invalid value for capacity: %w", err) + } case "awssdk": // ignore, should be handled before this default: @@ -251,5 +264,19 @@ func V2ConfigFromURLParams(ctx context.Context, q url.Values) (awsv2.Config, err opts = append(opts, awsv2cfg.WithEndpointResolverWithOptions(customResolver)) } + // SDK v2 adds client-side rate limiting: https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/retries-timeouts/#client-side-rate-limiting + // This can cause "failed to get rate limit token" errors. Disable this to restore + // the behavior of SDK v1. + var rateLimiter retry.RateLimiter + rateLimiter = ratelimit.None + if rateLimitCapacity > 0 { + rateLimiter = ratelimit.NewTokenRateLimit(uint(rateLimitCapacity)) + } + opts = append(opts, config.WithRetryer(func() awsv2.Retryer { + return retry.NewStandard(func(so *retry.StandardOptions) { + so.RateLimiter = rateLimiter + }) + })) + return awsv2cfg.LoadDefaultConfig(ctx, opts...) } diff --git a/aws/aws_test.go b/aws/aws_test.go index 471580a1d8..e48e16aac0 100644 --- a/aws/aws_test.go +++ b/aws/aws_test.go @@ -21,6 +21,7 @@ import ( "testing" awsv2 "github.com/aws/aws-sdk-go-v2/aws" + awsv2retry "github.com/aws/aws-sdk-go-v2/aws/retry" "github.com/aws/aws-sdk-go/aws" "github.com/google/go-cmp/cmp" gcaws "gocloud.dev/aws" @@ -198,6 +199,10 @@ func TestV2ConfigFromURLParams(t *testing.T) { name: "FIPS and dual stack", query: url.Values{"fips": {"true"}, "dualstack": {"true"}}, }, + { + name: "Rate limit capacity", + query: url.Values{"rate_limiter_capacity": {"500"}}, + }, // Can't test "profile", since AWS validates that the profile exists. } @@ -227,6 +232,12 @@ func TestV2ConfigFromURLParams(t *testing.T) { t.Errorf("got endpoint %+v, want %+v", gotE, *test.wantEndpoint) } } + + // Unfortunately, we can't look at the options set for the rate limiter. + r, ok := got.Retryer().(*awsv2retry.Standard) + if !ok { + t.Errorf("expected a standard retryer, got %v, expected awsv2retry.Standard", r) + } }) } }