Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error handling when reaching max keys listing S3 objects #676

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3BlobFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class S3BlobFs extends BlobFs {
public static final String S3_SCHEME = "s3://";
private static final Logger LOG = LoggerFactory.getLogger(S3BlobFs.class);
private static final String DELIMITER = "/";
private static final int LIST_MAX_KEYS = 2500;
private S3Client s3Client;

public S3BlobFs(S3Client s3Client) {
Expand Down Expand Up @@ -381,6 +382,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
String continuationToken = null;
boolean isDone = false;
String prefix = normalizeToDirectoryPrefix(fileUri);
int fileCount = 0;
while (!isDone) {
ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
ListObjectsV2Request.builder().bucket(fileUri.getHost());
Expand All @@ -398,6 +400,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
ListObjectsV2Response listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request);
LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
fileCount += filesReturned.size();
filesReturned.stream()
.forEach(
object -> {
Expand All @@ -411,6 +414,15 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
}
});
if (fileCount == LIST_MAX_KEYS) {
// check if we reached the max keys returned, if so abort and throw an error message
LOG.error(
"Too many files ({}) returned from S3 when attempting to list object prefixes",
LIST_MAX_KEYS);
throw new IllegalStateException(
String.format(
"Max keys (%s) reached when attempting to list S3 objects", LIST_MAX_KEYS));
}
isDone = !listObjectsV2Response.isTruncated();
continuationToken = listObjectsV2Response.nextContinuationToken();
}
Expand Down
19 changes: 17 additions & 2 deletions kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand All @@ -55,6 +56,7 @@ public class S3CrtBlobFs extends BlobFs {
public static final String S3_SCHEME = "s3://";
private static final Logger LOG = LoggerFactory.getLogger(S3CrtBlobFs.class);
private static final String DELIMITER = "/";
private static final int LIST_MAX_KEYS = 2500;

private final S3AsyncClient s3AsyncClient;

Expand Down Expand Up @@ -84,6 +86,7 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) {

S3CrtAsyncClientBuilder s3AsyncClient =
S3AsyncClient.crtBuilder()
.retryConfiguration(S3CrtRetryConfiguration.builder().numRetries(3).build())
.targetThroughputInGbps(config.getS3TargetThroughputGbps())
.region(Region.of(region))
.credentialsProvider(awsCredentialsProvider);
Expand All @@ -92,9 +95,10 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) {
// continue to attempt to read data from a socket that is no longer returning data
S3CrtHttpConfiguration.Builder httpConfigurationBuilder =
S3CrtHttpConfiguration.builder()
.connectionTimeout(Duration.ofSeconds(5))
.connectionHealthConfiguration(
S3CrtConnectionHealthConfiguration.builder()
.minimumThroughputTimeout(Duration.ofSeconds(15))
.minimumThroughputTimeout(Duration.ofSeconds(3))
.minimumThroughputInBps(32000L)
.build());

Expand Down Expand Up @@ -427,9 +431,10 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
String continuationToken = null;
boolean isDone = false;
String prefix = normalizeToDirectoryPrefix(fileUri);
int fileCount = 0;
while (!isDone) {
ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
ListObjectsV2Request.builder().bucket(fileUri.getHost());
ListObjectsV2Request.builder().maxKeys(LIST_MAX_KEYS).bucket(fileUri.getHost());
if (!prefix.equals(DELIMITER)) {
listObjectsV2RequestBuilder = listObjectsV2RequestBuilder.prefix(prefix);
}
Expand All @@ -445,6 +450,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
s3AsyncClient.listObjectsV2(listObjectsV2Request).get();
LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
fileCount += filesReturned.size();
filesReturned.stream()
.forEach(
object -> {
Expand All @@ -458,6 +464,15 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
builder.add(S3_SCHEME + fileUri.getHost() + DELIMITER + fileKey);
}
});
if (fileCount == LIST_MAX_KEYS) {
// check if we reached the max keys returned, if so abort and throw an error message
LOG.error(
"Too many files ({}) returned from S3 when attempting to list object prefixes",
LIST_MAX_KEYS);
throw new IllegalStateException(
String.format(
"Max keys (%s) reached when attempting to list S3 objects", LIST_MAX_KEYS));
}
isDone = !listObjectsV2Response.isTruncated();
continuationToken = listObjectsV2Response.nextContinuationToken();
}
Expand Down