Skip to content

Commit

Permalink
AWS: Add configuration and set better defaults for S3 retry behaviour
Browse files Browse the repository at this point in the history
Co-authored-by: Drew Schleit <[email protected]>
  • Loading branch information
ookumuso and drewschleit committed Sep 25, 2024
1 parent 474a770 commit 884dded
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyServiceConfigurations)
.applyMutation(s3FileIOProperties::applySignerConfiguration)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applySignerConfiguration)
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public S3Client s3() {
.applyMutation(httpClientProperties()::applyHttpClientConfigurations)
.applyMutation(s3FileIOProperties()::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties()::applyServiceConfigurations)
.applyMutation(s3FileIOProperties()::applyRetryConfigurations)
.credentialsProvider(
new LakeFormationCredentialsProvider(lakeFormation(), buildTableArn()))
.region(Region.of(region()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public S3Client s3() {
.applyMutation(s3FileIOProperties::applySignerConfiguration)
.applyMutation(s3FileIOProperties::applyS3AccessGrantsConfigurations)
.applyMutation(s3FileIOProperties::applyUserAgentConfigurations)
.applyMutation(s3FileIOProperties::applyRetryConfigurations)
.build();
}
}
124 changes: 124 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.io.Serializable;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.xml.stream.XMLStreamException;
import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
Expand All @@ -38,6 +40,14 @@
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.core.retry.conditions.OrRetryCondition;
import software.amazon.awssdk.core.retry.conditions.RetryCondition;
import software.amazon.awssdk.core.retry.conditions.RetryOnExceptionsCondition;
import software.amazon.awssdk.core.retry.conditions.TokenBucketRetryCondition;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
Expand Down Expand Up @@ -393,6 +403,21 @@ public class S3FileIOProperties implements Serializable {
*/
private static final String S3_FILE_IO_USER_AGENT = "s3fileio/" + EnvironmentContext.get();

/** Number of times to retry S3 operations. */
public static final String S3_RETRY_NUM_RETRIES = "s3.retry.num-retries";

public static final int S3_RETRY_NUM_RETRIES_DEFAULT = 32;

/** Minimum wait time to retry a S3 operation */
public static final String S3_RETRY_MIN_WAIT_MS = "s3.retry.min-wait-ms";

public static final long S3_RETRY_MIN_WAIT_MS_DEFAULT = 2_000; // 2 seconds

/** Maximum wait time to retry a S3 read operation */
public static final String S3_RETRY_MAX_WAIT_MS = "s3.retry.max-wait-ms";

public static final long S3_RETRY_MAX_WAIT_MS_DEFAULT = 20_000; // 20 seconds

private String sseType;
private String sseKey;
private String sseMd5;
Expand Down Expand Up @@ -423,6 +448,9 @@ public class S3FileIOProperties implements Serializable {
private final String endpoint;
private final boolean isRemoteSigningEnabled;
private String writeStorageClass;
private int s3RetryNumRetries;
private long s3RetryMinWaitMs;
private long s3RetryMaxWaitMs;
private final Map<String, String> allProperties;

public S3FileIOProperties() {
Expand Down Expand Up @@ -455,6 +483,9 @@ public S3FileIOProperties() {
this.isRemoteSigningEnabled = REMOTE_SIGNING_ENABLED_DEFAULT;
this.isS3AccessGrantsEnabled = S3_ACCESS_GRANTS_ENABLED_DEFAULT;
this.isS3AccessGrantsFallbackToIamEnabled = S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT;
this.s3RetryNumRetries = S3_RETRY_NUM_RETRIES_DEFAULT;
this.s3RetryMinWaitMs = S3_RETRY_MIN_WAIT_MS_DEFAULT;
this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT;
this.allProperties = Maps.newHashMap();

ValidationException.check(
Expand Down Expand Up @@ -553,6 +584,12 @@ public S3FileIOProperties(Map<String, String> properties) {
properties,
S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED,
S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED_DEFAULT);
this.s3RetryNumRetries =
PropertyUtil.propertyAsInt(properties, S3_RETRY_NUM_RETRIES, S3_RETRY_NUM_RETRIES_DEFAULT);
this.s3RetryMinWaitMs =
PropertyUtil.propertyAsLong(properties, S3_RETRY_MIN_WAIT_MS, S3_RETRY_MIN_WAIT_MS_DEFAULT);
this.s3RetryMaxWaitMs =
PropertyUtil.propertyAsLong(properties, S3_RETRY_MAX_WAIT_MS, S3_RETRY_MAX_WAIT_MS_DEFAULT);

ValidationException.check(
keyIdAccessKeyBothConfigured(),
Expand Down Expand Up @@ -753,6 +790,34 @@ public void setS3AccessGrantsFallbackToIamEnabled(boolean s3AccessGrantsFallback
this.isS3AccessGrantsFallbackToIamEnabled = s3AccessGrantsFallbackToIamEnabled;
}

public int s3RetryNumRetries() {
return s3RetryNumRetries;
}

public void setS3RetryNumRetries(int s3RetryNumRetries) {
this.s3RetryNumRetries = s3RetryNumRetries;
}

public long s3RetryMinWaitMs() {
return s3RetryMinWaitMs;
}

public void setS3RetryMinWaitMs(long s3RetryMinWaitMs) {
this.s3RetryMinWaitMs = s3RetryMinWaitMs;
}

public long s3RetryMaxWaitMs() {
return s3RetryMaxWaitMs;
}

public void setS3RetryMaxWaitMs(long s3RetryMaxWaitMs) {
this.s3RetryMaxWaitMs = s3RetryMaxWaitMs;
}

public long s3RetryTotalWaitMs() {
return (long) s3RetryNumRetries() * s3RetryMaxWaitMs();
}

private boolean keyIdAccessKeyBothConfigured() {
return (accessKeyId == null) == (secretAccessKey == null);
}
Expand Down Expand Up @@ -824,6 +889,65 @@ public <T extends S3ClientBuilder> void applyEndpointConfigurations(T builder) {
}
}

/**
* Override the retry configurations for an S3 client.
*
* <p>Sample usage:
*
* <pre>
* S3Client.builder().applyMutation(s3FileIOProperties::applyRetryConfigurations)
* </pre>
*/
public <T extends S3ClientBuilder> void applyRetryConfigurations(T builder) {
builder.overrideConfiguration(
config ->
config.retryPolicy(
// Use a retry strategy which will persistently retry throttled exceptions with
// exponential backoff, to give S3 a chance to autoscale.
// LEGACY mode works best here, as it will allow throttled exceptions to use all of
// the configured retry attempts.
RetryPolicy.builder(RetryMode.LEGACY)
.numRetries(s3RetryNumRetries)
.throttlingBackoffStrategy(
EqualJitterBackoffStrategy.builder()
.baseDelay(Duration.ofMillis(s3RetryMinWaitMs))
.maxBackoffTime(Duration.ofMillis(s3RetryMaxWaitMs))
.build())

// Workaround: add XMLStreamException as a retryable exception.
// https://github.com/aws/aws-sdk-java-v2/issues/5442
// Without this workaround, we see SDK failures if there's a socket exception
// while parsing an error XML response.
.retryCondition(
OrRetryCondition.create(
RetryCondition.defaultRetryCondition(),
RetryOnExceptionsCondition.create(XMLStreamException.class)))

// Workaround: exclude all 503s from consuming retry tokens.
// https://github.com/aws/aws-sdk-java-v2/issues/5414
// Without this workaround, workloads which see 503s from S3 HEAD will fail
// prematurely.
.retryCapacityCondition(
TokenBucketRetryCondition.builder()
.tokenBucketSize(500) // 500 is the SDK default
.exceptionCostFunction(
e -> {
if (e instanceof SdkServiceException) {
SdkServiceException sdkServiceException =
(SdkServiceException) e;
if (sdkServiceException.isThrottlingException()
|| sdkServiceException.statusCode() == 503) {
return 0;
}
}

// 5 is the SDK default for non-throttling exceptions
return 5;
})
.build())
.build()));
}

/**
* Add the S3 Access Grants Plugin for an S3 client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
Expand Down Expand Up @@ -491,4 +493,17 @@ public void testApplyUserAgentConfigurations() {
Mockito.verify(mockS3ClientBuilder)
.overrideConfiguration(Mockito.any(ClientOverrideConfiguration.class));
}

@Test
public void testApplyRetryConfiguration() {
Map<String, String> properties = Maps.newHashMap();
properties.put(S3FileIOProperties.S3_RETRY_NUM_RETRIES, "999");
S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties);

S3ClientBuilder builder = S3Client.builder();
s3FileIOProperties.applyRetryConfigurations(builder);

RetryPolicy retryPolicy = builder.overrideConfiguration().retryPolicy().get();
assertThat(retryPolicy.numRetries()).as("retries was not set").isEqualTo(999);
}
}
13 changes: 13 additions & 0 deletions docs/docs/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,19 @@ However, for the older versions up to 0.12.0, the logic is as follows:

For more details, please refer to the [LocationProvider Configuration](custom-catalog.md#custom-location-provider-implementation) section.

### S3 Retries

Workloads which encounter S3 throttling should persistently retry, with exponential backoff, to make progress while S3
automatically scales. The default values for S3 retries are tuned such that most workloads that encounter throttling will
make progress without failing due to retry exhaustion. For workloads with exceptionally high throughput against tables
that S3 has not yet scaled, it may be necessary to increase the retry count further.

| Property | Default | Description |
|----------------------|---------|-------------------------------------------------|
| s3.retry.num-retries | 32 | Number of times to retry S3 operations. |
| s3.retry.min-wait-ms | 2s | Minimum wait time to retry a S3 operation. |
| s3.retry.max-wait-ms | 20s | Maximum wait time to retry a S3 read operation. |

### S3 Strong Consistency

In November 2020, S3 announced [strong consistency](https://aws.amazon.com/s3/consistency/) for all read operations, and Iceberg is updated to fully leverage this feature.
Expand Down

0 comments on commit 884dded

Please sign in to comment.