Skip to content

Commit

Permalink
Remove remaining blobfs implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Aug 13, 2024
1 parent a542cc3 commit a50d63e
Show file tree
Hide file tree
Showing 27 changed files with 252 additions and 1,004 deletions.
90 changes: 0 additions & 90 deletions astra/src/main/java/com/slack/astra/blobfs/BlobFs.java

This file was deleted.

39 changes: 0 additions & 39 deletions astra/src/main/java/com/slack/astra/blobfs/BlobFsUtils.java

This file was deleted.

23 changes: 14 additions & 9 deletions astra/src/main/java/com/slack/astra/blobfs/ChunkStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

Expand All @@ -31,15 +32,19 @@ public ChunkStore(S3AsyncClient s3AsyncClient, String bucketName) {

public void upload(String chunkId, Path directoryToUpload) {
try {
transferManager
.uploadDirectory(
UploadDirectoryRequest.builder()
.source(directoryToUpload)
.s3Prefix(chunkId)
.bucket(bucketName)
.build())
.completionFuture()
.get();
CompletedDirectoryUpload upload =
transferManager
.uploadDirectory(
UploadDirectoryRequest.builder()
.source(directoryToUpload)
.s3Prefix(chunkId)
.bucket(bucketName)
.build())
.completionFuture()
.get();
if (!upload.failedTransfers().isEmpty()) {
throw new IllegalStateException("Some or all files failed to upload");
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
92 changes: 92 additions & 0 deletions astra/src/main/java/com/slack/astra/blobfs/S3AsyncUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.slack.astra.blobfs;

import com.google.common.base.Preconditions;
import com.slack.astra.proto.config.AstraConfigs;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration;
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
import software.amazon.awssdk.services.s3.model.S3Exception;

public class S3AsyncUtil {
private static final Logger LOG = LoggerFactory.getLogger(S3AsyncUtil.class);

public static S3AsyncClient initS3Client(AstraConfigs.S3Config config) {
Preconditions.checkArgument(!isNullOrEmpty(config.getS3Region()));
String region = config.getS3Region();

AwsCredentialsProvider awsCredentialsProvider;
try {

if (!isNullOrEmpty(config.getS3AccessKey()) && !isNullOrEmpty(config.getS3SecretKey())) {
String accessKey = config.getS3AccessKey();
String secretKey = config.getS3SecretKey();
AwsBasicCredentials awsBasicCredentials = AwsBasicCredentials.create(accessKey, secretKey);
awsCredentialsProvider = StaticCredentialsProvider.create(awsBasicCredentials);
} else {
awsCredentialsProvider = DefaultCredentialsProvider.create();
}

// default to 5% of the heap size for the max crt off-heap or 1GiB (min for client)
long jvmMaxHeapSizeBytes = Runtime.getRuntime().maxMemory();
long defaultCrtMemoryLimit = Math.max(Math.round(jvmMaxHeapSizeBytes * 0.05), 1073741824);
long maxNativeMemoryLimitBytes =
Long.parseLong(
System.getProperty(
"astra.s3CrtBlobFs.maxNativeMemoryLimitBytes",
String.valueOf(defaultCrtMemoryLimit)));
LOG.info(
"Using a maxNativeMemoryLimitInBytes for the S3AsyncClient of '{}' bytes",
maxNativeMemoryLimitBytes);
S3CrtAsyncClientBuilder s3AsyncClient =
S3AsyncClient.crtBuilder()
.retryConfiguration(S3CrtRetryConfiguration.builder().numRetries(3).build())
.targetThroughputInGbps(config.getS3TargetThroughputGbps())
.region(Region.of(region))
.maxNativeMemoryLimitInBytes(maxNativeMemoryLimitBytes)
.credentialsProvider(awsCredentialsProvider);

// We add a healthcheck to prevent an error with the CRT client, where it will
// continue to attempt to read data from a socket that is no longer returning data
S3CrtHttpConfiguration.Builder httpConfigurationBuilder =
S3CrtHttpConfiguration.builder()
.proxyConfiguration(
S3CrtProxyConfiguration.builder().useEnvironmentVariableValues(false).build())
.connectionTimeout(Duration.ofSeconds(5))
.connectionHealthConfiguration(
S3CrtConnectionHealthConfiguration.builder()
.minimumThroughputTimeout(Duration.ofSeconds(3))
.minimumThroughputInBps(32000L)
.build());
s3AsyncClient.httpConfiguration(httpConfigurationBuilder.build());

if (!isNullOrEmpty(config.getS3EndPoint())) {
String endpoint = config.getS3EndPoint();
try {
s3AsyncClient.endpointOverride(new URI(endpoint));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
return s3AsyncClient.build();
} catch (S3Exception e) {
throw new RuntimeException("Could not initialize S3blobFs", e);
}
}

static boolean isNullOrEmpty(String target) {
return target == null || "".equals(target);
}
}
Loading

0 comments on commit a50d63e

Please sign in to comment.