Skip to content

Commit

Permalink
Add better error handling for s3 transfer manager (#1103)
Browse files Browse the repository at this point in the history
* Add better error handling for s3 transfer manager

* Switch list to threadsafe variant

---------

Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Oct 2, 2024
1 parent aaab576 commit bf9538c
Showing 1 changed file with 40 additions and 11 deletions.
51 changes: 40 additions & 11 deletions astra/src/main/java/com/slack/astra/blobfs/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

import com.slack.astra.chunk.ReadWriteChunk;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
Expand All @@ -18,6 +20,7 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
Expand All @@ -29,6 +32,8 @@
* stored to a consistent location of "{prefix}/{filename}".
*/
public class BlobStore {
private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);

protected final String bucketName;
protected final S3AsyncClient s3AsyncClient;
private final S3TransferManager transferManager;
Expand Down Expand Up @@ -64,6 +69,14 @@ public void upload(String prefix, Path directoryToUpload) {
.completionFuture()
.get();
if (!upload.failedTransfers().isEmpty()) {
// Log each failed transfer with its exception
upload
.failedTransfers()
.forEach(
failedFileUpload ->
LOG.error(
"Error attempting to upload file to S3", failedFileUpload.exception()));

throw new IllegalStateException(
String.format(
"Some files failed to upload - attempted to upload %s files, failed %s.",
Expand All @@ -87,15 +100,31 @@ public void download(String prefix, Path destinationDirectory) {
assert !destinationDirectory.toFile().exists() || destinationDirectory.toFile().isDirectory();

try {
transferManager
.downloadDirectory(
DownloadDirectoryRequest.builder()
.bucket(bucketName)
.destination(destinationDirectory)
.listObjectsV2RequestTransformer(l -> l.prefix(prefix))
.build())
.completionFuture()
.get();
CompletedDirectoryDownload download =
transferManager
.downloadDirectory(
DownloadDirectoryRequest.builder()
.bucket(bucketName)
.destination(destinationDirectory)
.listObjectsV2RequestTransformer(l -> l.prefix(prefix))
.build())
.completionFuture()
.get();

if (!download.failedTransfers().isEmpty()) {
// Log each failed transfer with its exception
download
.failedTransfers()
.forEach(
failedFileUpload ->
LOG.error(
"Error attempting to download file from S3", failedFileUpload.exception()));

throw new IllegalStateException(
String.format(
"Some files failed to download - failed to download %s files.",
download.failedTransfers().size()));
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -137,7 +166,7 @@ public List<String> listFiles(String prefix) {
ListObjectsV2Publisher asyncPaginatedListResponse =
s3AsyncClient.listObjectsV2Paginator(listRequest);

List<String> filesList = new ArrayList<>();
List<String> filesList = new CopyOnWriteArrayList<>();
try {
asyncPaginatedListResponse
.subscribe(
Expand Down

0 comments on commit bf9538c

Please sign in to comment.