diff --git a/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3BlobFs.java b/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3BlobFs.java index b003cf2063..dd8375c308 100644 --- a/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3BlobFs.java +++ b/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3BlobFs.java @@ -55,6 +55,7 @@ public class S3BlobFs extends BlobFs { public static final String S3_SCHEME = "s3://"; private static final Logger LOG = LoggerFactory.getLogger(S3BlobFs.class); private static final String DELIMITER = "/"; + private static final int LIST_MAX_KEYS = 2500; private S3Client s3Client; public S3BlobFs(S3Client s3Client) { @@ -381,6 +382,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { String continuationToken = null; boolean isDone = false; String prefix = normalizeToDirectoryPrefix(fileUri); + int fileCount = 0; while (!isDone) { ListObjectsV2Request.Builder listObjectsV2RequestBuilder = ListObjectsV2Request.builder().bucket(fileUri.getHost()); @@ -398,6 +400,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { ListObjectsV2Response listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request); LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response); List filesReturned = listObjectsV2Response.contents(); + fileCount += filesReturned.size(); filesReturned.stream() .forEach( object -> { @@ -411,6 +414,15 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey); } }); + if (fileCount == LIST_MAX_KEYS) { + // check if we reached the max keys returned, if so abort and throw an error message + LOG.error( + "Too many files ({}) returned from S3 when attempting to list object prefixes", + LIST_MAX_KEYS); + throw new IllegalStateException( + String.format( + "Max keys (%s) reached when attempting to list S3 objects", LIST_MAX_KEYS)); + } isDone = !listObjectsV2Response.isTruncated(); continuationToken = listObjectsV2Response.nextContinuationToken(); } diff --git a/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java b/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java index b5c3400dbb..ca2063b9f8 100644 --- a/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java +++ b/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java @@ -35,6 +35,7 @@ import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration; import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration; +import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; @@ -55,6 +56,7 @@ public class S3CrtBlobFs extends BlobFs { public static final String S3_SCHEME = "s3://"; private static final Logger LOG = LoggerFactory.getLogger(S3CrtBlobFs.class); private static final String DELIMITER = "/"; + private static final int LIST_MAX_KEYS = 2500; private final S3AsyncClient s3AsyncClient; @@ -84,6 +86,7 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) { S3CrtAsyncClientBuilder s3AsyncClient = S3AsyncClient.crtBuilder() + .retryConfiguration(S3CrtRetryConfiguration.builder().numRetries(3).build()) .targetThroughputInGbps(config.getS3TargetThroughputGbps()) .region(Region.of(region)) .credentialsProvider(awsCredentialsProvider); @@ -92,9 +95,10 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) { // continue to attempt to read data from a socket that is no longer returning data S3CrtHttpConfiguration.Builder httpConfigurationBuilder = S3CrtHttpConfiguration.builder() + .connectionTimeout(Duration.ofSeconds(5)) .connectionHealthConfiguration( S3CrtConnectionHealthConfiguration.builder() - .minimumThroughputTimeout(Duration.ofSeconds(15)) + .minimumThroughputTimeout(Duration.ofSeconds(3)) .minimumThroughputInBps(32000L) .build()); @@ -427,9 +431,10 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { String continuationToken = null; boolean isDone = false; String prefix = normalizeToDirectoryPrefix(fileUri); + int fileCount = 0; while (!isDone) { ListObjectsV2Request.Builder listObjectsV2RequestBuilder = - ListObjectsV2Request.builder().bucket(fileUri.getHost()); + ListObjectsV2Request.builder().maxKeys(LIST_MAX_KEYS).bucket(fileUri.getHost()); if (!prefix.equals(DELIMITER)) { listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.prefix(prefix); } @@ -445,6 +450,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { s3AsyncClient.listObjectsV2(listObjectsV2Request).get(); LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response); List filesReturned = listObjectsV2Response.contents(); + fileCount += filesReturned.size(); filesReturned.stream() .forEach( object -> { @@ -458,6 +464,15 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey); } }); + if (fileCount == LIST_MAX_KEYS) { + // check if we reached the max keys returned, if so abort and throw an error message + LOG.error( + "Too many files ({}) returned from S3 when attempting to list object prefixes", + LIST_MAX_KEYS); + throw new IllegalStateException( + String.format( + "Max keys (%s) reached when attempting to list S3 objects", LIST_MAX_KEYS)); + } isDone = !listObjectsV2Response.isTruncated(); continuationToken = listObjectsV2Response.nextContinuationToken(); }