diff --git a/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java b/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java index f5540c63da..3c9f72559f 100644 --- a/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java +++ b/kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +37,7 @@ import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration; import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration; +import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; @@ -84,6 +87,7 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) { S3CrtAsyncClientBuilder s3AsyncClient = S3AsyncClient.crtBuilder() + .retryConfiguration(S3CrtRetryConfiguration.builder().numRetries(3).build()) .targetThroughputInGbps(config.getS3TargetThroughputGbps()) .region(Region.of(region)) .credentialsProvider(awsCredentialsProvider); @@ -151,8 +155,8 @@ private HeadObjectResponse getS3ObjectMetadata(URI uri) throws IOException { HeadObjectRequest.builder().bucket(uri.getHost()).key(path).build(); try { - return s3AsyncClient.headObject(headObjectRequest).get(); - } catch (InterruptedException | ExecutionException e) { + return s3AsyncClient.headObject(headObjectRequest).get(15, TimeUnit.SECONDS); + } catch (TimeoutException | InterruptedException | ExecutionException e) { throw new IOException(e); } } @@ -204,7 +208,7 @@ private boolean existsFile(URI uri) throws IOException { HeadObjectRequest headObjectRequest = HeadObjectRequest.builder().bucket(uri.getHost()).key(path).build(); - s3AsyncClient.headObject(headObjectRequest).get(); + s3AsyncClient.headObject(headObjectRequest).get(15, TimeUnit.SECONDS); return true; } catch (Exception e) { if (e instanceof ExecutionException && e.getCause() instanceof NoSuchKeyException) { @@ -231,8 +235,9 @@ private boolean isEmptyDirectory(URI uri) throws IOException { ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build(); try { - listObjectsV2Response = s3AsyncClient.listObjectsV2(listObjectsV2Request).get(); - } catch (InterruptedException | ExecutionException e) { + listObjectsV2Response = + s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS); + } catch (TimeoutException | InterruptedException | ExecutionException e) { throw new IOException(e); } @@ -266,9 +271,10 @@ private boolean copyFile(URI srcUri, URI dstUri) throws IOException { .destinationKey(dstPath) .build(); - CopyObjectResponse copyObjectResponse = s3AsyncClient.copyObject(copyReq).get(); + CopyObjectResponse copyObjectResponse = + s3AsyncClient.copyObject(copyReq).get(10, TimeUnit.MINUTES); return copyObjectResponse.sdkHttpResponse().isSuccessful(); - } catch (S3Exception | ExecutionException | InterruptedException e) { + } catch (TimeoutException | S3Exception | ExecutionException | InterruptedException e) { throw new IOException(e); } } @@ -288,7 +294,9 @@ public boolean mkdir(URI uri) throws IOException { PutObjectRequest.builder().bucket(uri.getHost()).key(path).build(); PutObjectResponse putObjectResponse = - s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromBytes(new byte[0])).get(); + s3AsyncClient + .putObject(putObjectRequest, AsyncRequestBody.fromBytes(new byte[0])) + .get(15, TimeUnit.SECONDS); return putObjectResponse.sdkHttpResponse().isSuccessful(); } catch (Throwable t) { @@ -314,11 +322,13 @@ public boolean delete(URI segmentUri, boolean forceDelete) throws IOException { if (prefix.equals(DELIMITER)) { ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build(); - listObjectsV2Response = s3AsyncClient.listObjectsV2(listObjectsV2Request).get(); + listObjectsV2Response = + s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS); } else { ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.prefix(prefix).build(); - listObjectsV2Response = s3AsyncClient.listObjectsV2(listObjectsV2Request).get(); + listObjectsV2Response = + s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS); } boolean deleteSucceeded = true; for (S3Object s3Object : listObjectsV2Response.contents()) { @@ -329,7 +339,7 @@ public boolean delete(URI segmentUri, boolean forceDelete) throws IOException { .build(); DeleteObjectResponse deleteObjectResponse = - s3AsyncClient.deleteObject(deleteObjectRequest).get(); + s3AsyncClient.deleteObject(deleteObjectRequest).get(15, TimeUnit.SECONDS); deleteSucceeded &= deleteObjectResponse.sdkHttpResponse().isSuccessful(); } @@ -340,7 +350,7 @@ public boolean delete(URI segmentUri, boolean forceDelete) throws IOException { DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(prefix).build(); DeleteObjectResponse deleteObjectResponse = - s3AsyncClient.deleteObject(deleteObjectRequest).get(); + s3AsyncClient.deleteObject(deleteObjectRequest).get(15, TimeUnit.SECONDS); return deleteObjectResponse.sdkHttpResponse().isSuccessful(); } @@ -443,7 +453,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException { ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build(); LOG.debug("Trying to send ListObjectsV2Request {}", listObjectsV2Request); ListObjectsV2Response listObjectsV2Response = - s3AsyncClient.listObjectsV2(listObjectsV2Request).get(); + s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS); LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response); List filesReturned = listObjectsV2Response.contents(); filesReturned.stream() @@ -480,7 +490,9 @@ public void copyToLocalFile(URI srcUri, File dstFile) throws Exception { GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build(); - s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toFile(dstFile)).get(); + s3AsyncClient + .getObject(getObjectRequest, AsyncResponseTransformer.toFile(dstFile)) + .get(10, TimeUnit.MINUTES); } @Override @@ -491,7 +503,9 @@ public void copyFromLocalFile(File srcFile, URI dstUri) throws Exception { PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix).build(); - s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromFile(srcFile)).get(); + s3AsyncClient + .putObject(putObjectRequest, AsyncRequestBody.fromFile(srcFile)) + .get(10, TimeUnit.MINUTES); } @Override @@ -505,12 +519,12 @@ public boolean isDirectory(URI uri) throws IOException { ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder().bucket(uri.getHost()).prefix(prefix).maxKeys(2).build(); ListObjectsV2Response listObjectsV2Response = - s3AsyncClient.listObjectsV2(listObjectsV2Request).get(); + s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS); return listObjectsV2Response.hasContents(); } catch (NoSuchKeyException e) { LOG.error("Could not get directory entry for {}", uri); return false; - } catch (ExecutionException | InterruptedException e) { + } catch (TimeoutException | ExecutionException | InterruptedException e) { throw new IOException(e); } } @@ -544,7 +558,7 @@ public boolean touch(URI uri) throws IOException { .metadataDirective(MetadataDirective.REPLACE) .build(); - s3AsyncClient.copyObject(request).get(); + s3AsyncClient.copyObject(request).get(15, TimeUnit.SECONDS); long newUpdateTime = getS3ObjectMetadata(uri).lastModified().toEpochMilli(); return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli(); } catch (NoSuchKeyException e) { @@ -559,7 +573,7 @@ public boolean touch(URI uri) throws IOException { throw new IOException(ex); } return true; - } catch (S3Exception | ExecutionException | InterruptedException e) { + } catch (TimeoutException | S3Exception | ExecutionException | InterruptedException e) { throw new IOException(e); } }