Skip to content

Commit

Permalink
Initial chunk store implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Aug 12, 2024
1 parent 0cf0a22 commit 5a396a8
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 35 deletions.
100 changes: 100 additions & 0 deletions astra/src/main/java/com/slack/astra/blobfs/ChunkStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.slack.astra.blobfs;

import static software.amazon.awssdk.services.s3.model.ListObjectsV2Request.builder;

import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

public class ChunkStore {

private final String bucketName;
private final S3AsyncClient s3AsyncClient;
private final S3TransferManager transferManager;

public ChunkStore(S3AsyncClient s3AsyncClient, String bucketName) {
this.bucketName = bucketName;
this.s3AsyncClient = s3AsyncClient;
this.transferManager = S3TransferManager.builder().s3Client(s3AsyncClient).build();
}

public void upload(String chunkId, Path directoryToUpload) {
try {
transferManager
.uploadDirectory(
UploadDirectoryRequest.builder()
.source(directoryToUpload)
.s3Prefix(chunkId)
.bucket(bucketName)
.build())
.completionFuture()
.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

// todo - make destination optional
public Path download(String chunkId, Path destination) {
try {
transferManager
.downloadDirectory(
DownloadDirectoryRequest.builder()
.bucket(bucketName)
.destination(destination)
.listObjectsV2RequestTransformer(l -> l.prefix(chunkId))
.build())
.completionFuture()
.get();
return destination;
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

public boolean delete(String chunkId) {
ListObjectsV2Request listRequest = builder().bucket(bucketName).prefix(chunkId).build();
ListObjectsV2Publisher asyncPaginatedListResponse =
s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicBoolean deleted = new AtomicBoolean(false);
try {
asyncPaginatedListResponse
.subscribe(
listResponse -> {
List<ObjectIdentifier> objects =
listResponse.contents().stream()
.map(s3Object -> ObjectIdentifier.builder().key(s3Object.key()).build())
.toList();
if (objects.isEmpty()) {
return;
}
DeleteObjectsRequest deleteRequest =
DeleteObjectsRequest.builder()
.bucket(bucketName)
.delete(Delete.builder().objects(objects).build())
.build();
try {
s3AsyncClient.deleteObjects(deleteRequest).get();
deleted.set(true);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.get();
return deleted.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
36 changes: 17 additions & 19 deletions astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.google.common.annotations.VisibleForTesting;
import com.slack.astra.blobfs.BlobFs;
import com.slack.astra.blobfs.ChunkStore;
import com.slack.astra.logstore.search.LogIndexSearcher;
import com.slack.astra.logstore.search.LogIndexSearcherImpl;
import com.slack.astra.logstore.search.SearchQuery;
Expand Down Expand Up @@ -64,7 +64,6 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private Metadata.CacheNodeAssignment.CacheNodeAssignmentState lastKnownAssignmentState;

private final String dataDirectoryPrefix;
private final String s3Bucket;
protected final SearchContext searchContext;
protected final String slotId;
private final CacheSlotMetadataStore cacheSlotMetadataStore;
Expand All @@ -73,7 +72,7 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
private final SearchMetadataStore searchMetadataStore;
private CacheNodeAssignmentStore cacheNodeAssignmentStore;
private final MeterRegistry meterRegistry;
private final BlobFs blobFs;
private final ChunkStore chunkStore;

public static final String CHUNK_ASSIGNMENT_TIMER = "chunk_assignment_timer";
public static final String CHUNK_EVICTION_TIMER = "chunk_eviction_timer";
Expand All @@ -91,7 +90,7 @@ public class ReadOnlyChunkImpl<T> implements Chunk<T> {
public ReadOnlyChunkImpl(
AsyncCuratorFramework curatorFramework,
MeterRegistry meterRegistry,
BlobFs blobFs,
ChunkStore chunkStore,
SearchContext searchContext,
String s3Bucket,
String dataDirectoryPrefix,
Expand All @@ -107,7 +106,7 @@ public ReadOnlyChunkImpl(
this(
curatorFramework,
meterRegistry,
blobFs,
chunkStore,
searchContext,
s3Bucket,
dataDirectoryPrefix,
Expand All @@ -125,7 +124,7 @@ public ReadOnlyChunkImpl(
public ReadOnlyChunkImpl(
AsyncCuratorFramework curatorFramework,
MeterRegistry meterRegistry,
BlobFs blobFs,
ChunkStore chunkStore,
SearchContext searchContext,
String s3Bucket,
String dataDirectoryPrefix,
Expand All @@ -136,8 +135,7 @@ public ReadOnlyChunkImpl(
SearchMetadataStore searchMetadataStore)
throws Exception {
this.meterRegistry = meterRegistry;
this.blobFs = blobFs;
this.s3Bucket = s3Bucket;
this.chunkStore = chunkStore;
this.dataDirectoryPrefix = dataDirectoryPrefix;
this.searchContext = searchContext;
this.slotId = UUID.randomUUID().toString();
Expand Down Expand Up @@ -235,12 +233,12 @@ public void downloadChunkData() {
}
}
}
// init SerialS3DownloaderImpl w/ bucket, snapshotId, blob, data directory
SerialS3ChunkDownloaderImpl chunkDownloader =
new SerialS3ChunkDownloaderImpl(
s3Bucket, snapshotMetadata.snapshotId, blobFs, dataDirectory);
if (chunkDownloader.download()) {
throw new IOException("No files found on blob storage, released slot for re-assignment");

chunkStore.download(snapshotMetadata.snapshotId, dataDirectory);
try (Stream<Path> fileList = Files.list(dataDirectory)) {
if (fileList.findAny().isEmpty()) {
throw new IOException("No files found on blob storage, released slot for re-assignment");
}
}

Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME);
Expand Down Expand Up @@ -379,11 +377,11 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
}

SnapshotMetadata snapshotMetadata = getSnapshotMetadata(cacheSlotMetadata.replicaId);
SerialS3ChunkDownloaderImpl chunkDownloader =
new SerialS3ChunkDownloaderImpl(
s3Bucket, snapshotMetadata.snapshotId, blobFs, dataDirectory);
if (chunkDownloader.download()) {
throw new IOException("No files found on blob storage, released slot for re-assignment");
chunkStore.download(snapshotMetadata.snapshotId, dataDirectory);
try (Stream<Path> fileList = Files.list(dataDirectory)) {
if (fileList.findAny().isEmpty()) {
throw new IOException("No files found on blob storage, released slot for re-assignment");
}
}

Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME);
Expand Down
2 changes: 2 additions & 0 deletions astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) {
}
this.fileUploadAttempts.increment(filesToUpload.size());
Timer.Sample snapshotTimer = Timer.start(meterRegistry);

final int success = copyToS3(dirPath, filesToUpload, bucket, prefix, blobFs);

snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER));
this.fileUploadFailures.increment(filesToUpload.size() - success);
chunkInfo.setSnapshotPath(createURI(bucket, prefix, "").toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* SerialS3ChunkDownloaderImpl downloads all the chunk related data from S3 to a local store one
* file at a time.
*/
@Deprecated
public class SerialS3ChunkDownloaderImpl implements ChunkDownloader {
private final String s3Bucket;
private final String snapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static com.slack.astra.clusterManager.CacheNodeAssignmentService.snapshotMetadataBySnapshotId;

import com.slack.astra.blobfs.BlobFs;
import com.slack.astra.blobfs.ChunkStore;
import com.slack.astra.chunk.Chunk;
import com.slack.astra.chunk.ReadOnlyChunkImpl;
import com.slack.astra.chunk.SearchContext;
Expand Down Expand Up @@ -39,7 +39,7 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {

private final MeterRegistry meterRegistry;
private final AsyncCuratorFramework curatorFramework;
private final BlobFs blobFs;
private final ChunkStore chunkStore;
private final SearchContext searchContext;
private final String s3Bucket;
private final String dataDirectoryPrefix;
Expand All @@ -61,7 +61,7 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {
public CachingChunkManager(
MeterRegistry registry,
AsyncCuratorFramework curatorFramework,
BlobFs blobFs,
ChunkStore chunkStore,
SearchContext searchContext,
String s3Bucket,
String dataDirectoryPrefix,
Expand All @@ -70,7 +70,7 @@ public CachingChunkManager(
long capacityBytes) {
this.meterRegistry = registry;
this.curatorFramework = curatorFramework;
this.blobFs = blobFs;
this.chunkStore = chunkStore;
this.searchContext = searchContext;
this.s3Bucket = s3Bucket;
this.dataDirectoryPrefix = dataDirectoryPrefix;
Expand Down Expand Up @@ -103,7 +103,7 @@ protected void startUp() throws Exception {
new ReadOnlyChunkImpl<>(
curatorFramework,
meterRegistry,
blobFs,
chunkStore,
searchContext,
s3Bucket,
dataDirectoryPrefix,
Expand Down Expand Up @@ -153,12 +153,12 @@ public static CachingChunkManager<LogMessage> fromConfig(
AsyncCuratorFramework curatorFramework,
AstraConfigs.S3Config s3Config,
AstraConfigs.CacheConfig cacheConfig,
BlobFs blobFs)
ChunkStore chunkStore)
throws Exception {
return new CachingChunkManager<>(
meterRegistry,
curatorFramework,
blobFs,
chunkStore,
SearchContext.fromConfig(cacheConfig.getServerConfig()),
s3Config.getS3Bucket(),
cacheConfig.getDataDirectory(),
Expand Down Expand Up @@ -211,7 +211,7 @@ private void onAssignmentHandler(CacheNodeAssignment assignment) {
new ReadOnlyChunkImpl<>(
curatorFramework,
meterRegistry,
blobFs,
chunkStore,
searchContext,
s3Bucket,
dataDirectoryPrefix,
Expand Down
7 changes: 5 additions & 2 deletions astra/src/main/java/com/slack/astra/server/Astra.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import com.slack.astra.blobfs.BlobFs;
import com.slack.astra.blobfs.ChunkStore;
import com.slack.astra.blobfs.s3.S3CrtBlobFs;
import com.slack.astra.bulkIngestApi.BulkIngestApi;
import com.slack.astra.bulkIngestApi.BulkIngestKafkaProducer;
Expand Down Expand Up @@ -142,9 +143,10 @@ public void start() throws Exception {

// Initialize blobfs. Only S3 is supported currently.
S3CrtBlobFs s3BlobFs = new S3CrtBlobFs(s3Client);
ChunkStore chunkStore = new ChunkStore(s3Client, astraConfig.getS3Config().getS3Bucket());

Set<Service> services =
getServices(curatorFramework, astraConfig, s3BlobFs, prometheusMeterRegistry);
getServices(curatorFramework, astraConfig, s3BlobFs, chunkStore, prometheusMeterRegistry);
serviceManager = new ServiceManager(services);
serviceManager.addListener(getServiceManagerListener(), MoreExecutors.directExecutor());

Expand All @@ -155,6 +157,7 @@ private static Set<Service> getServices(
AsyncCuratorFramework curatorFramework,
AstraConfigs.AstraConfig astraConfig,
BlobFs blobFs,
ChunkStore chunkStore,
PrometheusMeterRegistry meterRegistry)
throws Exception {
Set<Service> services = new HashSet<>();
Expand Down Expand Up @@ -238,7 +241,7 @@ private static Set<Service> getServices(
curatorFramework,
astraConfig.getS3Config(),
astraConfig.getCacheConfig(),
blobFs);
chunkStore);
services.add(chunkManager);

HpaMetricMetadataStore hpaMetricMetadataStore =
Expand Down
Loading

0 comments on commit 5a396a8

Please sign in to comment.