Skip to content

Commit

Permalink
Add AWS S3 Transfer Manager (#680)
Browse files Browse the repository at this point in the history
* Add experimental transfer manager code

* Add Transfer Manager to S3CrtBlobFs

Adds transfer manager for upload/download methods, adds tests, fixes an edge case with getS3ObjectMetadata

* Disable flaky test

* Improve race condition in flaky test

* PR feedback

---------

Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Sep 21, 2023
1 parent a6bc9c5 commit c4787d5
Show file tree
Hide file tree
Showing 9 changed files with 505 additions and 24 deletions.
5 changes: 5 additions & 0 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@
<artifactId>aws-crt</artifactId>
<version>0.22.1</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down
101 changes: 93 additions & 8 deletions kaldb/src/main/java/com/slack/kaldb/blobfs/s3/S3CrtBlobFs.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,35 @@
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;

import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

/**
* This class is a duplicate of the original S3BlobFs, but modified to support the new S3 CRT client
* and S3 transfer manager. As part of this all internal api calls to S3 were moved to async, as
* this is the only client type supported by the new CRT code.
*
* <p>Todo - this class would hugely benefit from a clean sheet rewrite, as a lot of the original
* assumptions this was based on no longer apply. Additionally, several retrofits have been made to
* support new API approaches which has left this overly complex.
*/
public class S3CrtBlobFs extends BlobFs {
public static final String S3_SCHEME = "s3://";
private static final Logger LOG = LoggerFactory.getLogger(S3CrtBlobFs.class);
private static final String DELIMITER = "/";
private static final int LIST_MAX_KEYS = 2500;

private final S3AsyncClient s3AsyncClient;
private final S3TransferManager transferManager;

public S3CrtBlobFs(S3AsyncClient s3AsyncClient) {
this.s3AsyncClient = s3AsyncClient;
this.transferManager = S3TransferManager.builder().s3Client(s3AsyncClient).build();
}

static boolean isNullOrEmpty(String target) {
Expand Down Expand Up @@ -156,7 +174,11 @@ private HeadObjectResponse getS3ObjectMetadata(URI uri) throws IOException {
try {
return s3AsyncClient.headObject(headObjectRequest).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
if (e instanceof ExecutionException && e.getCause() instanceof NoSuchKeyException) {
throw NoSuchKeyException.builder().cause(e.getCause()).build();
} else {
throw new IOException(e);
}
}
}

Expand Down Expand Up @@ -491,21 +513,84 @@ public void copyToLocalFile(URI srcUri, File dstFile) throws Exception {
URI base = getBase(srcUri);
FileUtils.forceMkdir(dstFile.getParentFile());
String prefix = sanitizePath(base.relativize(srcUri).getPath());
GetObjectRequest getObjectRequest =
GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build();

s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toFile(dstFile)).get();
if (isDirectory(srcUri)) {
CompletedDirectoryDownload completedDirectoryDownload =
transferManager
.downloadDirectory(
DownloadDirectoryRequest.builder()
.destination(dstFile.toPath())
.bucket(srcUri.getHost())
.listObjectsV2RequestTransformer(
builder -> {
builder.maxKeys(LIST_MAX_KEYS);
builder.prefix(prefix);
})
.build())
.completionFuture()
.get();
if (!completedDirectoryDownload.failedTransfers().isEmpty()) {
completedDirectoryDownload
.failedTransfers()
.forEach(
failedFileDownload -> LOG.warn("Failed to download file '{}'", failedFileDownload));
throw new IllegalStateException(
String.format(
"Was unable to download all files - failed %s",
completedDirectoryDownload.failedTransfers().size()));
}
} else {
GetObjectRequest getObjectRequest =
GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build();
transferManager
.downloadFile(
DownloadFileRequest.builder()
.getObjectRequest(getObjectRequest)
.destination(dstFile)
.build())
.completionFuture()
.get();
}
}

@Override
public void copyFromLocalFile(File srcFile, URI dstUri) throws Exception {
LOG.debug("Copy {} from local to {}", srcFile.getAbsolutePath(), dstUri);
URI base = getBase(dstUri);
String prefix = sanitizePath(base.relativize(dstUri).getPath());
PutObjectRequest putObjectRequest =
PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix).build();

s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromFile(srcFile)).get();
if (srcFile.isDirectory()) {
CompletedDirectoryUpload completedDirectoryUpload =
transferManager
.uploadDirectory(
UploadDirectoryRequest.builder()
.source(srcFile.toPath())
.bucket(dstUri.getHost())
.build())
.completionFuture()
.get();

if (!completedDirectoryUpload.failedTransfers().isEmpty()) {
completedDirectoryUpload
.failedTransfers()
.forEach(failedFileUpload -> LOG.warn("Failed to upload file '{}'", failedFileUpload));
throw new IllegalStateException(
String.format(
"Was unable to upload all files - failed %s",
completedDirectoryUpload.failedTransfers().size()));
}
} else {
PutObjectRequest putObjectRequest =
PutObjectRequest.builder().bucket(dstUri.getHost()).key(prefix).build();
transferManager
.uploadFile(
UploadFileRequest.builder()
.putObjectRequest(putObjectRequest)
.source(srcFile)
.build())
.completionFuture()
.get();
}
}

@Override
Expand Down
19 changes: 8 additions & 11 deletions kaldb/src/main/java/com/slack/kaldb/logstore/BlobFsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -44,18 +45,14 @@ public static URI createURI(String bucket, String prefix, String fileName) {
// TODO: Take a complete URI as this is the format stored in snapshot data
public static String[] copyFromS3(
String bucket, String prefix, BlobFs s3BlobFs, Path localDirPath) throws Exception {
String[] s3Files = s3BlobFs.listFiles(createURI(bucket, prefix, ""), true);
LOG.info(
"Copying files from bucket={} prefix={} filesToCopy={}", bucket, prefix, s3Files.length);
for (String fileName : s3Files) {
URI fileToCopy = URI.create(fileName);
File toFile =
new File(
localDirPath.toString(), Paths.get(fileToCopy.getPath()).getFileName().toString());
s3BlobFs.copyToLocalFile(fileToCopy, toFile);
}
LOG.info("Copying files from bucket={} prefix={} using directory", bucket, prefix);
URI directoryToCopy = createURI(bucket, prefix, "");
s3BlobFs.copyToLocalFile(directoryToCopy, localDirPath.toFile());
LOG.info("Copying S3 files complete");
return s3Files;
return Arrays.stream(localDirPath.toFile().listFiles())
.map(File::toString)
.distinct()
.toArray(String[]::new);
}

public static void copyToLocalPath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;

@Deprecated
public class S3BlobFsTest {
@RegisterExtension
public static final S3MockExtension S3_MOCK_EXTENSION =
Expand Down
Loading

0 comments on commit c4787d5

Please sign in to comment.