Skip to content

Commit

Permalink
Add a retry and hard timeout for copy file
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Sep 15, 2023
1 parent 0cc2f9d commit 7497778
Showing 1 changed file with 33 additions and 19 deletions.
52 changes: 33 additions & 19 deletions kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +37,7 @@
import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand Down Expand Up @@ -84,6 +87,7 @@ public static S3AsyncClient initS3Client(KaldbConfigs.S3Config config) {

S3CrtAsyncClientBuilder s3AsyncClient =
S3AsyncClient.crtBuilder()
.retryConfiguration(S3CrtRetryConfiguration.builder().numRetries(3).build())
.targetThroughputInGbps(config.getS3TargetThroughputGbps())
.region(Region.of(region))
.credentialsProvider(awsCredentialsProvider);
Expand Down Expand Up @@ -151,8 +155,8 @@ private HeadObjectResponse getS3ObjectMetadata(URI uri) throws IOException {
HeadObjectRequest.builder().bucket(uri.getHost()).key(path).build();

try {
return s3AsyncClient.headObject(headObjectRequest).get();
} catch (InterruptedException | ExecutionException e) {
return s3AsyncClient.headObject(headObjectRequest).get(15, TimeUnit.SECONDS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}
Expand Down Expand Up @@ -204,7 +208,7 @@ private boolean existsFile(URI uri) throws IOException {
HeadObjectRequest headObjectRequest =
HeadObjectRequest.builder().bucket(uri.getHost()).key(path).build();

s3AsyncClient.headObject(headObjectRequest).get();
s3AsyncClient.headObject(headObjectRequest).get(15, TimeUnit.SECONDS);
return true;
} catch (Exception e) {
if (e instanceof ExecutionException && e.getCause() instanceof NoSuchKeyException) {
Expand All @@ -231,8 +235,9 @@ private boolean isEmptyDirectory(URI uri) throws IOException {

ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build();
try {
listObjectsV2Response = s3AsyncClient.listObjectsV2(listObjectsV2Request).get();
} catch (InterruptedException | ExecutionException e) {
listObjectsV2Response =
s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw new IOException(e);
}

Expand Down Expand Up @@ -266,9 +271,10 @@ private boolean copyFile(URI srcUri, URI dstUri) throws IOException {
.destinationKey(dstPath)
.build();

CopyObjectResponse copyObjectResponse = s3AsyncClient.copyObject(copyReq).get();
CopyObjectResponse copyObjectResponse =
s3AsyncClient.copyObject(copyReq).get(10, TimeUnit.MINUTES);
return copyObjectResponse.sdkHttpResponse().isSuccessful();
} catch (S3Exception | ExecutionException | InterruptedException e) {
} catch (TimeoutException | S3Exception | ExecutionException | InterruptedException e) {
throw new IOException(e);
}
}
Expand All @@ -288,7 +294,9 @@ public boolean mkdir(URI uri) throws IOException {
PutObjectRequest.builder().bucket(uri.getHost()).key(path).build();

PutObjectResponse putObjectResponse =
s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromBytes(new byte[0])).get();
s3AsyncClient
.putObject(putObjectRequest, AsyncRequestBody.fromBytes(new byte[0]))
.get(15, TimeUnit.SECONDS);

return putObjectResponse.sdkHttpResponse().isSuccessful();
} catch (Throwable t) {
Expand All @@ -314,11 +322,13 @@ public boolean delete(URI segmentUri, boolean forceDelete) throws IOException {

if (prefix.equals(DELIMITER)) {
ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build();
listObjectsV2Response = s3AsyncClient.listObjectsV2(listObjectsV2Request).get();
listObjectsV2Response =
s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS);
} else {
ListObjectsV2Request listObjectsV2Request =
listObjectsV2RequestBuilder.prefix(prefix).build();
listObjectsV2Response = s3AsyncClient.listObjectsV2(listObjectsV2Request).get();
listObjectsV2Response =
s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS);
}
boolean deleteSucceeded = true;
for (S3Object s3Object : listObjectsV2Response.contents()) {
Expand All @@ -329,7 +339,7 @@ public boolean delete(URI segmentUri, boolean forceDelete) throws IOException {
.build();

DeleteObjectResponse deleteObjectResponse =
s3AsyncClient.deleteObject(deleteObjectRequest).get();
s3AsyncClient.deleteObject(deleteObjectRequest).get(15, TimeUnit.SECONDS);

deleteSucceeded &= deleteObjectResponse.sdkHttpResponse().isSuccessful();
}
Expand All @@ -340,7 +350,7 @@ public boolean delete(URI segmentUri, boolean forceDelete) throws IOException {
DeleteObjectRequest.builder().bucket(segmentUri.getHost()).key(prefix).build();

DeleteObjectResponse deleteObjectResponse =
s3AsyncClient.deleteObject(deleteObjectRequest).get();
s3AsyncClient.deleteObject(deleteObjectRequest).get(15, TimeUnit.SECONDS);

return deleteObjectResponse.sdkHttpResponse().isSuccessful();
}
Expand Down Expand Up @@ -443,7 +453,7 @@ public String[] listFiles(URI fileUri, boolean recursive) throws IOException {
ListObjectsV2Request listObjectsV2Request = listObjectsV2RequestBuilder.build();
LOG.debug("Trying to send ListObjectsV2Request {}", listObjectsV2Request);
ListObjectsV2Response listObjectsV2Response =
s3AsyncClient.listObjectsV2(listObjectsV2Request).get();
s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS);
LOG.debug("Getting ListObjectsV2Response: {}", listObjectsV2Response);
List<S3Object> filesReturned = listObjectsV2Response.contents();
filesReturned.stream()
Expand Down Expand Up @@ -480,7 +490,9 @@ public void copyToLocalFile(URI srcUri, File dstFile) throws Exception {
GetObjectRequest getObjectRequest =
GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build();

s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toFile(dstFile)).get();
s3AsyncClient
.getObject(getObjectRequest, AsyncResponseTransformer.toFile(dstFile))
.get(10, TimeUnit.MINUTES);
}

@Override
Expand All @@ -491,7 +503,9 @@ public void copyFromLocalFile(File srcFile, URI dstUri) throws Exception {
PutObjectRequest putObjectRequest =
PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix).build();

s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromFile(srcFile)).get();
s3AsyncClient
.putObject(putObjectRequest, AsyncRequestBody.fromFile(srcFile))
.get(10, TimeUnit.MINUTES);
}

@Override
Expand All @@ -505,12 +519,12 @@ public boolean isDirectory(URI uri) throws IOException {
ListObjectsV2Request listObjectsV2Request =
ListObjectsV2Request.builder().bucket(uri.getHost()).prefix(prefix).maxKeys(2).build();
ListObjectsV2Response listObjectsV2Response =
s3AsyncClient.listObjectsV2(listObjectsV2Request).get();
s3AsyncClient.listObjectsV2(listObjectsV2Request).get(15, TimeUnit.SECONDS);
return listObjectsV2Response.hasContents();
} catch (NoSuchKeyException e) {
LOG.error("Could not get directory entry for {}", uri);
return false;
} catch (ExecutionException | InterruptedException e) {
} catch (TimeoutException | ExecutionException | InterruptedException e) {
throw new IOException(e);
}
}
Expand Down Expand Up @@ -544,7 +558,7 @@ public boolean touch(URI uri) throws IOException {
.metadataDirective(MetadataDirective.REPLACE)
.build();

s3AsyncClient.copyObject(request).get();
s3AsyncClient.copyObject(request).get(15, TimeUnit.SECONDS);
long newUpdateTime = getS3ObjectMetadata(uri).lastModified().toEpochMilli();
return newUpdateTime > s3ObjectMetadata.lastModified().toEpochMilli();
} catch (NoSuchKeyException e) {
Expand All @@ -559,7 +573,7 @@ public boolean touch(URI uri) throws IOException {
throw new IOException(ex);
}
return true;
} catch (S3Exception | ExecutionException | InterruptedException e) {
} catch (TimeoutException | S3Exception | ExecutionException | InterruptedException e) {
throw new IOException(e);
}
}
Expand Down

0 comments on commit 7497778

Please sign in to comment.