Skip to content

Commit

Permalink
Add error handling when reaching max keys listing S3 objects (#676)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Sep 19, 2023
1 parent afa1ba9 commit eef7a7e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
12 changes: 12 additions & 0 deletions kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3BlobFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class S3BlobFs extends BlobFs {
public static final String S3_SCHEME = "s3://";
private static final Logger LOG = LoggerFactory.getLogger(S3BlobFs.class);
private static final String DELIMITER = "/";
private static final int LIST_MAX_KEYS = 2500;
private S3Client s3Client;

public S3BlobFs(S3Client s3Client) {
Expand Down Expand Up @@ -381,6 +382,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
String continuationToken = null;
boolean isDone = false;
String prefix = normalizeToDirectoryPrefix(fileUri);
int fileCount = 0;
while (!isDone) {
ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
ListObjectsV2Request.builder().bucket(fileUri.getHost());
Expand All @@ -398,6 +400,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
ListObjectsV2Response listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request);
LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
fileCount += filesReturned.size();
filesReturned.stream()
.forEach(
object -> {
Expand All @@ -411,6 +414,15 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
}
});
if (fileCount == LIST_MAX_KEYS) {
// check if we reached the max keys returned, if so abort and throw an error message
LOG.error(
"Too many files ({}) returned from S3 when attempting to list object prefixes",
LIST_MAX_KEYS);
throw new IllegalStateException(
String.format(
"Max keys (%s) reached when attempting to list S3 objects", LIST_MAX_KEYS));
}
isDone = !listObjectsV2Response.isTruncated();
continuationToken = listObjectsV2Response.nextContinuationToken();
}
Expand Down
19 changes: 17 additions & 2 deletions kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand All @@ -55,6 +56,7 @@ public class S3CrtBlobFs extends BlobFs {
public static final String S3_SCHEME = "s3://";
private static final Logger LOG = LoggerFactory.getLogger(S3CrtBlobFs.class);
private static final String DELIMITER = "/";
private static final int LIST_MAX_KEYS = 2500;

private final S3AsyncClient s3AsyncClient;

Expand Down Expand Up @@ -84,6 +86,7 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) {

S3CrtAsyncClientBuilder s3AsyncClient =
S3AsyncClient.crtBuilder()
.retryConfiguration(S3CrtRetryConfiguration.builder().numRetries(3).build())
.targetThroughputInGbps(config.getS3TargetThroughputGbps())
.region(Region.of(region))
.credentialsProvider(awsCredentialsProvider);
Expand All @@ -92,9 +95,10 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) {
// continue to attempt to read data from a socket that is no longer returning data
S3CrtHttpConfiguration.Builder httpConfigurationBuilder =
S3CrtHttpConfiguration.builder()
.connectionTimeout(Duration.ofSeconds(5))
.connectionHealthConfiguration(
S3CrtConnectionHealthConfiguration.builder()
.minimumThroughputTimeout(Duration.ofSeconds(15))
.minimumThroughputTimeout(Duration.ofSeconds(3))
.minimumThroughputInBps(32000L)
.build());

Expand Down Expand Up @@ -427,9 +431,10 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
String continuationToken = null;
boolean isDone = false;
String prefix = normalizeToDirectoryPrefix(fileUri);
int fileCount = 0;
while (!isDone) {
ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
ListObjectsV2Request.builder().bucket(fileUri.getHost());
ListObjectsV2Request.builder().maxKeys(LIST_MAX_KEYS).bucket(fileUri.getHost());
if (!prefix.equals(DELIMITER)) {
listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.prefix(prefix);
}
Expand All @@ -445,6 +450,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
s3AsyncClient.listObjectsV2(listObjectsV2Request).get();
LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
fileCount += filesReturned.size();
filesReturned.stream()
.forEach(
object -> {
Expand All @@ -458,6 +464,15 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
}
});
if (fileCount == LIST_MAX_KEYS) {
// check if we reached the max keys returned, if so abort and throw an error message
LOG.error(
"Too many files ({}) returned from S3 when attempting to list object prefixes",
LIST_MAX_KEYS);
throw new IllegalStateException(
String.format(
"Max keys (%s) reached when attempting to list S3 objects", LIST_MAX_KEYS));
}
isDone = !listObjectsV2Response.isTruncated();
continuationToken = listObjectsV2Response.nextContinuationToken();
}
Expand Down

0 comments on commit eef7a7e

Please sign in to comment.