Skip to content

Commit

Permalink
POC of S3 streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Sep 18, 2024
1 parent 2209408 commit f54c61f
Show file tree
Hide file tree
Showing 26 changed files with 1,224 additions and 116 deletions.
36 changes: 36 additions & 0 deletions astra/src/main/java/com/slack/astra/blobfs/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@

import static software.amazon.awssdk.services.s3.model.ListObjectsV2Request.builder;

import com.slack.astra.chunk.ReadWriteChunk;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

/**
Expand All @@ -29,6 +33,8 @@ public class BlobStore {
private final S3AsyncClient s3AsyncClient;
private final S3TransferManager transferManager;

// todo - shared cache here?
// what about chunks across multiple files?
public BlobStore(S3AsyncClient s3AsyncClient, String bucketName) {
this.bucketName = bucketName;
this.s3AsyncClient = s3AsyncClient;
Expand Down Expand Up @@ -71,6 +77,15 @@ public void upload(String prefix, Path directoryToUpload) {
}
}

// todo - DO NOT DO THIS?
public S3AsyncClient getS3AsyncClient() {
return s3AsyncClient;
}

public String getBucketName() {
return bucketName;
}

/**
* Downloads a directory from the object store by prefix
*
Expand Down Expand Up @@ -99,6 +114,27 @@ public void download(String prefix, Path destinationDirectory) {
}
}

public byte[] getSchema(String chunkId) {
try {
return transferManager
.download(
DownloadRequest.builder()
.getObjectRequest(
GetObjectRequest.builder()
.bucket(bucketName)
.key(String.format("%s/%s", chunkId, ReadWriteChunk.SCHEMA_FILE_NAME))
.build())
.responseTransformer(new ByteArrayAsyncResponseTransformer<>())
.build())
.completionFuture()
.get()
.result()
.asByteArray();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Lists all files found on object store by the complete object key (including prefix). This would
* included what is generally considered the directory (ie foo/bar.example)
Expand Down
4 changes: 3 additions & 1 deletion astra/src/main/java/com/slack/astra/blobfs/S3AsyncUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public static S3AsyncClient initS3Client(AstraConfigs.S3Config config) {
maxNativeMemoryLimitBytes);
S3CrtAsyncClientBuilder s3AsyncClient =
S3AsyncClient.crtBuilder()
.retryConfiguration(S3CrtRetryConfiguration.builder().numRetries(3).build())
.retryConfiguration(S3CrtRetryConfiguration.builder()
.numRetries(10)
.build())
.targetThroughputInGbps(config.getS3TargetThroughputGbps())
.region(Region.of(region))
.maxNativeMemoryLimitInBytes(maxNativeMemoryLimitBytes)
Expand Down
222 changes: 222 additions & 0 deletions astra/src/main/java/com/slack/astra/blobfs/S3IndexInput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package com.slack.astra.blobfs;

import static com.slack.astra.util.SizeConstant.MB;

import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;

public class S3IndexInput extends IndexInput {
private static final Logger LOG = LoggerFactory.getLogger(S3IndexInput.class);
private final BlobStore blobStore;
private final S3AsyncClient s3AsyncClient;

private final String chunkId;
private final String objectName;

private final long pageSize = 1 * MB;

private final Map<Long, byte[]> cachedData = new HashMap<>();

private long relativePos = 0;
private Long size;
private Long fileOffset = 0L;
private Long maxLength = null;

public S3IndexInput(
String resourceDescription,
BlobStore blobStore,
S3AsyncClient s3AsyncClient,
String chunkId,
String objectName,
Long fileOffset,
Map<Long, byte[]> cachedData,
Long length) {

super(resourceDescription);
this.blobStore = blobStore;
this.s3AsyncClient = s3AsyncClient;
this.chunkId = chunkId;
this.objectName = objectName;

this.relativePos = 0;
this.fileOffset = fileOffset;

this.cachedData.putAll(cachedData);
this.maxLength = length;
}

public S3IndexInput(
BlobStore blobStore, String resourceDescription, String chunkId, String objectName) {
super(resourceDescription);
this.blobStore = blobStore;
this.s3AsyncClient = blobStore.getS3AsyncClient();
this.chunkId = chunkId;
this.objectName = objectName;
}

private byte[] getData(long key) throws ExecutionException, InterruptedException {
if (cachedData.containsKey(key)) {
return cachedData.get(key);
} else {
cachedData.clear();

long readFrom = key * pageSize;

// todo - does using size help at all?
long readTo = Math.min((key + 1) * pageSize, Long.MAX_VALUE);

LOG.debug(
"Attempting to download {}, currentPos {}, key {}, bucketName {}, from {} to {} for chunk {}",
objectName,
relativePos,
key,
blobStore.getBucketName(),
readFrom,
readTo,
chunkId);

Stopwatch timeDownload = Stopwatch.createStarted();
byte[] response =
s3AsyncClient
.getObject(
GetObjectRequest.builder()
.bucket(blobStore.getBucketName())
.key(String.format("%s/%s", chunkId, objectName))
.range(String.format("bytes=%s-%s", readFrom, readTo))
.build(),
AsyncResponseTransformer.toBytes())
.get()
.asByteArray();

LOG.debug(
"Downloaded {} - byte length {} in {} ms for chunk {}",
objectName,
response.length,
timeDownload.elapsed(TimeUnit.MILLISECONDS),
chunkId);
cachedData.put(key, response);
return response;
}
}

@Override
public void close() throws IOException {
// nothing to close/cleanup
}

@Override
public long getFilePointer() {
return relativePos;
}

@Override
public void seek(long pos) {
relativePos = pos;
}

@Override
public long length() {
if (maxLength != null) {
return maxLength;
}

if (size == null) {
try {
size =
s3AsyncClient
.headObject(
HeadObjectRequest.builder()
.bucket(blobStore.getBucketName())
.key(String.format("%s/%s", chunkId, objectName))
.build())
.get()
.contentLength();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error reading length", e);
throw new RuntimeException(e);
}
}
return size;
}

/**
* Creates a slice of this index input, with the given description, offset, and length. The slice
* is sought to the beginning.
*/
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
LOG.debug(
"Slicing {} for chunk ID {}, offset {} length {}", objectName, chunkId, offset, length);
return new S3IndexInput(
sliceDescription,
blobStore,
s3AsyncClient,
chunkId,
objectName,
offset,
cachedData,
length);
}

@Override
public byte readByte() {
try {
long getCacheKey = Math.floorDiv(relativePos + fileOffset, pageSize);
int byteArrayPos = Math.toIntExact(relativePos + fileOffset - (getCacheKey * pageSize));
relativePos++;
return getData(getCacheKey)[byteArrayPos];
} catch (ExecutionException | InterruptedException e) {
LOG.error("Error reading byte", e);
throw new RuntimeException(e);
}
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
long getCacheKey = Math.floorDiv(relativePos + fileOffset, pageSize);
if (relativePos + fileOffset + len > ((getCacheKey + 1) * pageSize)) {
LOG.debug(
"Will need to page in content for {}, currentPos {}, len {}, currentCacheKey {}, chunkId {}",
objectName,
relativePos + fileOffset,
len,
getCacheKey,
chunkId);
}

for (int i = 0; i < len; i++) {
b[offset + i] = readByte();
}
}

/**
* {@inheritDoc}
*
* <p><b>Warning:</b> Lucene never closes cloned {@code IndexInput}s, it will only call {@link
* #close()} on the original object.
*
* <p>If you access the cloned IndexInput after closing the original object, any <code>readXXX
* </code> methods will throw {@link AlreadyClosedException}.
*
* <p>This method is NOT thread safe, so if the current {@code IndexInput} is being used by one
* thread while {@code clone} is called by another, disaster could strike.
*/
@Override
public IndexInput clone() {
// todo - instead of an entirely new object consider reworking this?
LOG.debug("Cloning object - chunkId {}, objectName {}", chunkId, objectName);
return super.clone();
}
}
Loading

0 comments on commit f54c61f

Please sign in to comment.