From 664aa34745ccf3971fcce313886bec6dfbb9d62f Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Mon, 12 Aug 2024 14:26:46 -0700 Subject: [PATCH] Initial chunk store implementation --- .../com/slack/astra/blobfs/ChunkStore.java | 100 +++++++++++++++++ .../slack/astra/chunk/ReadOnlyChunkImpl.java | 36 +++--- .../com/slack/astra/chunk/ReadWriteChunk.java | 2 + .../chunk/SerialS3ChunkDownloaderImpl.java | 1 + .../chunkManager/CachingChunkManager.java | 16 +-- .../java/com/slack/astra/server/Astra.java | 7 +- .../slack/astra/blobfs/ChunkStoreTest.java | 106 ++++++++++++++++++ .../astra/chunk/ReadOnlyChunkImplTest.java | 13 ++- .../chunkManager/CachingChunkManagerTest.java | 5 +- 9 files changed, 251 insertions(+), 35 deletions(-) create mode 100644 astra/src/main/java/com/slack/astra/blobfs/ChunkStore.java create mode 100644 astra/src/test/java/com/slack/astra/blobfs/ChunkStoreTest.java diff --git a/astra/src/main/java/com/slack/astra/blobfs/ChunkStore.java b/astra/src/main/java/com/slack/astra/blobfs/ChunkStore.java new file mode 100644 index 0000000000..806fbfdcd7 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/blobfs/ChunkStore.java @@ -0,0 +1,100 @@ +package com.slack.astra.blobfs; + +import static software.amazon.awssdk.services.s3.model.ListObjectsV2Request.builder; + +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest; +import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; + +public class ChunkStore { + + private final String bucketName; + private final S3AsyncClient s3AsyncClient; + private final S3TransferManager transferManager; + + public ChunkStore(S3AsyncClient s3AsyncClient, String bucketName) { + this.bucketName = bucketName; + this.s3AsyncClient = s3AsyncClient; + this.transferManager = S3TransferManager.builder().s3Client(s3AsyncClient).build(); + } + + public void upload(String chunkId, Path directoryToUpload) { + try { + transferManager + .uploadDirectory( + UploadDirectoryRequest.builder() + .source(directoryToUpload) + .s3Prefix(chunkId) + .bucket(bucketName) + .build()) + .completionFuture() + .get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + // todo - make destination optional + public Path download(String chunkId, Path destination) { + try { + transferManager + .downloadDirectory( + DownloadDirectoryRequest.builder() + .bucket(bucketName) + .destination(destination) + .listObjectsV2RequestTransformer(l -> l.prefix(chunkId)) + .build()) + .completionFuture() + .get(); + return destination; + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + public boolean delete(String chunkId) { + ListObjectsV2Request listRequest = builder().bucket(bucketName).prefix(chunkId).build(); + ListObjectsV2Publisher asyncPaginatedListResponse = + s3AsyncClient.listObjectsV2Paginator(listRequest); + + AtomicBoolean deleted = new AtomicBoolean(false); + try { + asyncPaginatedListResponse + .subscribe( + listResponse -> { + List objects = + listResponse.contents().stream() + .map(s3Object -> ObjectIdentifier.builder().key(s3Object.key()).build()) + .toList(); + if (objects.isEmpty()) { + return; + } + DeleteObjectsRequest deleteRequest = + DeleteObjectsRequest.builder() + .bucket(bucketName) + .delete(Delete.builder().objects(objects).build()) + .build(); + try { + s3AsyncClient.deleteObjects(deleteRequest).get(); + deleted.set(true); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .get(); + return deleted.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java b/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java index 5646254eca..8dc1b5bd3c 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java @@ -4,7 +4,7 @@ import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS; import com.google.common.annotations.VisibleForTesting; -import com.slack.astra.blobfs.BlobFs; +import com.slack.astra.blobfs.ChunkStore; import com.slack.astra.logstore.search.LogIndexSearcher; import com.slack.astra.logstore.search.LogIndexSearcherImpl; import com.slack.astra.logstore.search.SearchQuery; @@ -64,7 +64,6 @@ public class ReadOnlyChunkImpl implements Chunk { private Metadata.CacheNodeAssignment.CacheNodeAssignmentState lastKnownAssignmentState; private final String dataDirectoryPrefix; - private final String s3Bucket; protected final SearchContext searchContext; protected final String slotId; private final CacheSlotMetadataStore cacheSlotMetadataStore; @@ -73,7 +72,7 @@ public class ReadOnlyChunkImpl implements Chunk { private final SearchMetadataStore searchMetadataStore; private CacheNodeAssignmentStore cacheNodeAssignmentStore; private final MeterRegistry meterRegistry; - private final BlobFs blobFs; + private final ChunkStore chunkStore; public static final String CHUNK_ASSIGNMENT_TIMER = "chunk_assignment_timer"; public static final String CHUNK_EVICTION_TIMER = "chunk_eviction_timer"; @@ -91,7 +90,7 @@ public class ReadOnlyChunkImpl implements Chunk { public ReadOnlyChunkImpl( AsyncCuratorFramework curatorFramework, MeterRegistry meterRegistry, - BlobFs blobFs, + ChunkStore chunkStore, SearchContext searchContext, String s3Bucket, String dataDirectoryPrefix, @@ -107,7 +106,7 @@ public ReadOnlyChunkImpl( this( curatorFramework, meterRegistry, - blobFs, + chunkStore, searchContext, s3Bucket, dataDirectoryPrefix, @@ -125,7 +124,7 @@ public ReadOnlyChunkImpl( public ReadOnlyChunkImpl( AsyncCuratorFramework curatorFramework, MeterRegistry meterRegistry, - BlobFs blobFs, + ChunkStore chunkStore, SearchContext searchContext, String s3Bucket, String dataDirectoryPrefix, @@ -136,8 +135,7 @@ public ReadOnlyChunkImpl( SearchMetadataStore searchMetadataStore) throws Exception { this.meterRegistry = meterRegistry; - this.blobFs = blobFs; - this.s3Bucket = s3Bucket; + this.chunkStore = chunkStore; this.dataDirectoryPrefix = dataDirectoryPrefix; this.searchContext = searchContext; this.slotId = UUID.randomUUID().toString(); @@ -235,12 +233,12 @@ public void downloadChunkData() { } } } - // init SerialS3DownloaderImpl w/ bucket, snapshotId, blob, data directory - SerialS3ChunkDownloaderImpl chunkDownloader = - new SerialS3ChunkDownloaderImpl( - s3Bucket, snapshotMetadata.snapshotId, blobFs, dataDirectory); - if (chunkDownloader.download()) { - throw new IOException("No files found on blob storage, released slot for re-assignment"); + + chunkStore.download(snapshotMetadata.snapshotId, dataDirectory); + try (Stream fileList = Files.list(dataDirectory)) { + if (fileList.findAny().isEmpty()) { + throw new IOException("No files found on blob storage, released slot for re-assignment"); + } } Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME); @@ -379,11 +377,11 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) { } SnapshotMetadata snapshotMetadata = getSnapshotMetadata(cacheSlotMetadata.replicaId); - SerialS3ChunkDownloaderImpl chunkDownloader = - new SerialS3ChunkDownloaderImpl( - s3Bucket, snapshotMetadata.snapshotId, blobFs, dataDirectory); - if (chunkDownloader.download()) { - throw new IOException("No files found on blob storage, released slot for re-assignment"); + chunkStore.download(snapshotMetadata.snapshotId, dataDirectory); + try (Stream fileList = Files.list(dataDirectory)) { + if (fileList.findAny().isEmpty()) { + throw new IOException("No files found on blob storage, released slot for re-assignment"); + } } Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME); diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java index fe10992218..49fe7bc5c1 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java @@ -254,7 +254,9 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) { } this.fileUploadAttempts.increment(filesToUpload.size()); Timer.Sample snapshotTimer = Timer.start(meterRegistry); + final int success = copyToS3(dirPath, filesToUpload, bucket, prefix, blobFs); + snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER)); this.fileUploadFailures.increment(filesToUpload.size() - success); chunkInfo.setSnapshotPath(createURI(bucket, prefix, "").toString()); diff --git a/astra/src/main/java/com/slack/astra/chunk/SerialS3ChunkDownloaderImpl.java b/astra/src/main/java/com/slack/astra/chunk/SerialS3ChunkDownloaderImpl.java index 1c50493329..c88f0488a3 100644 --- a/astra/src/main/java/com/slack/astra/chunk/SerialS3ChunkDownloaderImpl.java +++ b/astra/src/main/java/com/slack/astra/chunk/SerialS3ChunkDownloaderImpl.java @@ -9,6 +9,7 @@ * SerialS3ChunkDownloaderImpl downloads all the chunk related data from S3 to a local store one * file at a time. */ +@Deprecated public class SerialS3ChunkDownloaderImpl implements ChunkDownloader { private final String s3Bucket; private final String snapshotId; diff --git a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java index dd20758331..e454bc2825 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java @@ -2,7 +2,7 @@ import static com.slack.astra.clusterManager.CacheNodeAssignmentService.snapshotMetadataBySnapshotId; -import com.slack.astra.blobfs.BlobFs; +import com.slack.astra.blobfs.ChunkStore; import com.slack.astra.chunk.Chunk; import com.slack.astra.chunk.ReadOnlyChunkImpl; import com.slack.astra.chunk.SearchContext; @@ -39,7 +39,7 @@ public class CachingChunkManager extends ChunkManagerBase { private final MeterRegistry meterRegistry; private final AsyncCuratorFramework curatorFramework; - private final BlobFs blobFs; + private final ChunkStore chunkStore; private final SearchContext searchContext; private final String s3Bucket; private final String dataDirectoryPrefix; @@ -61,7 +61,7 @@ public class CachingChunkManager extends ChunkManagerBase { public CachingChunkManager( MeterRegistry registry, AsyncCuratorFramework curatorFramework, - BlobFs blobFs, + ChunkStore chunkStore, SearchContext searchContext, String s3Bucket, String dataDirectoryPrefix, @@ -70,7 +70,7 @@ public CachingChunkManager( long capacityBytes) { this.meterRegistry = registry; this.curatorFramework = curatorFramework; - this.blobFs = blobFs; + this.chunkStore = chunkStore; this.searchContext = searchContext; this.s3Bucket = s3Bucket; this.dataDirectoryPrefix = dataDirectoryPrefix; @@ -103,7 +103,7 @@ protected void startUp() throws Exception { new ReadOnlyChunkImpl<>( curatorFramework, meterRegistry, - blobFs, + chunkStore, searchContext, s3Bucket, dataDirectoryPrefix, @@ -153,12 +153,12 @@ public static CachingChunkManager fromConfig( AsyncCuratorFramework curatorFramework, AstraConfigs.S3Config s3Config, AstraConfigs.CacheConfig cacheConfig, - BlobFs blobFs) + ChunkStore chunkStore) throws Exception { return new CachingChunkManager<>( meterRegistry, curatorFramework, - blobFs, + chunkStore, SearchContext.fromConfig(cacheConfig.getServerConfig()), s3Config.getS3Bucket(), cacheConfig.getDataDirectory(), @@ -211,7 +211,7 @@ private void onAssignmentHandler(CacheNodeAssignment assignment) { new ReadOnlyChunkImpl<>( curatorFramework, meterRegistry, - blobFs, + chunkStore, searchContext, s3Bucket, dataDirectoryPrefix, diff --git a/astra/src/main/java/com/slack/astra/server/Astra.java b/astra/src/main/java/com/slack/astra/server/Astra.java index 0cb2370f60..d8f69a5a41 100644 --- a/astra/src/main/java/com/slack/astra/server/Astra.java +++ b/astra/src/main/java/com/slack/astra/server/Astra.java @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.ServiceManager; import com.slack.astra.blobfs.BlobFs; +import com.slack.astra.blobfs.ChunkStore; import com.slack.astra.blobfs.s3.S3CrtBlobFs; import com.slack.astra.bulkIngestApi.BulkIngestApi; import com.slack.astra.bulkIngestApi.BulkIngestKafkaProducer; @@ -142,9 +143,10 @@ public void start() throws Exception { // Initialize blobfs. Only S3 is supported currently. S3CrtBlobFs s3BlobFs = new S3CrtBlobFs(s3Client); + ChunkStore chunkStore = new ChunkStore(s3Client, astraConfig.getS3Config().getS3Bucket()); Set services = - getServices(curatorFramework, astraConfig, s3BlobFs, prometheusMeterRegistry); + getServices(curatorFramework, astraConfig, s3BlobFs, chunkStore, prometheusMeterRegistry); serviceManager = new ServiceManager(services); serviceManager.addListener(getServiceManagerListener(), MoreExecutors.directExecutor()); @@ -155,6 +157,7 @@ private static Set getServices( AsyncCuratorFramework curatorFramework, AstraConfigs.AstraConfig astraConfig, BlobFs blobFs, + ChunkStore chunkStore, PrometheusMeterRegistry meterRegistry) throws Exception { Set services = new HashSet<>(); @@ -238,7 +241,7 @@ private static Set getServices( curatorFramework, astraConfig.getS3Config(), astraConfig.getCacheConfig(), - blobFs); + chunkStore); services.add(chunkManager); HpaMetricMetadataStore hpaMetricMetadataStore = diff --git a/astra/src/test/java/com/slack/astra/blobfs/ChunkStoreTest.java b/astra/src/test/java/com/slack/astra/blobfs/ChunkStoreTest.java new file mode 100644 index 0000000000..d3125fed9e --- /dev/null +++ b/astra/src/test/java/com/slack/astra/blobfs/ChunkStoreTest.java @@ -0,0 +1,106 @@ +package com.slack.astra.blobfs; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import com.adobe.testing.s3mock.junit5.S3MockExtension; +import com.slack.astra.blobfs.s3.S3TestUtils; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; + +class ChunkStoreTest { + private static final String TEST_BUCKET = "chunkStoreTest"; + + @RegisterExtension + public static final S3MockExtension S3_MOCK_EXTENSION = + S3MockExtension.builder() + .silent() + .withInitialBuckets(TEST_BUCKET) + .withSecureConnection(false) + .build(); + + private final S3AsyncClient s3Client = + S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint()); + + @Test + void testUploadDownload() throws IOException { + ChunkStore chunkStore = new ChunkStore(s3Client, TEST_BUCKET); + + Path directoryUpload = Files.createTempDirectory(""); + Path foo = Files.createTempFile(directoryUpload, "", ""); + try (FileWriter fileWriter = new FileWriter(foo.toFile())) { + fileWriter.write("Example test"); + } + String chunkId = UUID.randomUUID().toString(); + chunkStore.upload(chunkId, directoryUpload); + + // what goes up, must come down + Path directoryDownloaded = Files.createTempDirectory(""); + chunkStore.download(chunkId, directoryDownloaded); + + File[] filesDownloaded = directoryDownloaded.toFile().listFiles(); + assertThat(Objects.requireNonNull(filesDownloaded).length).isEqualTo(1); + + // contents of the file we uploaded should match + assertThat(Files.readAllBytes(filesDownloaded[0].toPath())).isEqualTo(Files.readAllBytes(foo)); + } + + @Test + void testDownloadDoesNotExist() throws IOException { + ChunkStore chunkStore = new ChunkStore(s3Client, TEST_BUCKET); + Path directoryDownloaded = Files.createTempDirectory(""); + chunkStore.download(UUID.randomUUID().toString(), directoryDownloaded); + } + + @Test + void testDeleteMultipleFiles() throws IOException, ExecutionException, InterruptedException { + ChunkStore chunkStore = new ChunkStore(s3Client, TEST_BUCKET); + + Path directoryUpload = Files.createTempDirectory(""); + Path foo = Files.createTempFile(directoryUpload, "", ""); + try (FileWriter fileWriter = new FileWriter(foo.toFile())) { + fileWriter.write("Example test 1"); + } + Path bar = Files.createTempFile(directoryUpload, "", ""); + try (FileWriter fileWriter = new FileWriter(bar.toFile())) { + fileWriter.write("Example test 2"); + } + String chunkId = UUID.randomUUID().toString(); + chunkStore.upload(chunkId, directoryUpload); + assertThat( + s3Client + .listObjects( + ListObjectsRequest.builder().bucket(TEST_BUCKET).prefix(chunkId).build()) + .get() + .contents() + .size()) + .isEqualTo(2); + + boolean deleted = chunkStore.delete(chunkId); + assertThat(deleted).isTrue(); + assertThat( + s3Client + .listObjects( + ListObjectsRequest.builder().bucket(TEST_BUCKET).prefix(chunkId).build()) + .get() + .contents() + .size()) + .isEqualTo(0); + } + + @Test + void testDeleteDoesNotExist() { + ChunkStore chunkStore = new ChunkStore(s3Client, TEST_BUCKET); + boolean deleted = chunkStore.delete(UUID.randomUUID().toString()); + assertThat(deleted).isFalse(); + } +} diff --git a/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java index cf579a446c..4dc0d2dccb 100644 --- a/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java @@ -17,6 +17,7 @@ import brave.Tracing; import com.adobe.testing.s3mock.junit5.S3MockExtension; +import com.slack.astra.blobfs.ChunkStore; import com.slack.astra.blobfs.LocalBlobFs; import com.slack.astra.blobfs.s3.S3CrtBlobFs; import com.slack.astra.blobfs.s3.S3TestUtils; @@ -73,6 +74,7 @@ public class ReadOnlyChunkImplTest { private TestingServer testingServer; private MeterRegistry meterRegistry; private S3CrtBlobFs s3CrtBlobFs; + private ChunkStore chunkStore; @RegisterExtension public static final S3MockExtension S3_MOCK_EXTENSION = @@ -91,6 +93,7 @@ public void startup() throws Exception { S3AsyncClient s3AsyncClient = S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint()); s3CrtBlobFs = new S3CrtBlobFs(s3AsyncClient); + chunkStore = new ChunkStore(s3AsyncClient, TEST_S3_BUCKET); } @AfterEach @@ -132,7 +135,7 @@ public void shouldHandleChunkLivecycle() throws Exception { new ReadOnlyChunkImpl<>( curatorFramework, meterRegistry, - s3CrtBlobFs, + chunkStore, searchContext, AstraConfig.getS3Config().getS3Bucket(), AstraConfig.getCacheConfig().getDataDirectory(), @@ -264,7 +267,7 @@ public void shouldHandleMissingS3Assets() throws Exception { new ReadOnlyChunkImpl<>( curatorFramework, meterRegistry, - s3CrtBlobFs, + chunkStore, SearchContext.fromConfig(AstraConfig.getCacheConfig().getServerConfig()), AstraConfig.getS3Config().getS3Bucket(), AstraConfig.getCacheConfig().getDataDirectory(), @@ -330,7 +333,7 @@ public void shouldHandleMissingZkData() throws Exception { new ReadOnlyChunkImpl<>( curatorFramework, meterRegistry, - s3CrtBlobFs, + chunkStore, SearchContext.fromConfig(AstraConfig.getCacheConfig().getServerConfig()), AstraConfig.getS3Config().getS3Bucket(), AstraConfig.getCacheConfig().getDataDirectory(), @@ -397,7 +400,7 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception { new ReadOnlyChunkImpl<>( curatorFramework, meterRegistry, - s3CrtBlobFs, + chunkStore, SearchContext.fromConfig(AstraConfig.getCacheConfig().getServerConfig()), AstraConfig.getS3Config().getS3Bucket(), AstraConfig.getCacheConfig().getDataDirectory(), @@ -528,7 +531,7 @@ public void shouldHandleDynamicChunkSizeLifecycle() throws Exception { new ReadOnlyChunkImpl<>( curatorFramework, meterRegistry, - s3CrtBlobFs, + chunkStore, searchContext, AstraConfig.getS3Config().getS3Bucket(), AstraConfig.getCacheConfig().getDataDirectory(), diff --git a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java index 84333b2510..d7c52e8d56 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java @@ -16,6 +16,7 @@ import static org.awaitility.Awaitility.await; import com.adobe.testing.s3mock.junit5.S3MockExtension; +import com.slack.astra.blobfs.ChunkStore; import com.slack.astra.blobfs.LocalBlobFs; import com.slack.astra.blobfs.s3.S3CrtBlobFs; import com.slack.astra.blobfs.s3.S3TestUtils; @@ -64,6 +65,7 @@ public class CachingChunkManagerTest { private TestingServer testingServer; private MeterRegistry meterRegistry; private S3CrtBlobFs s3CrtBlobFs; + private ChunkStore chunkStore; @RegisterExtension public static final S3MockExtension S3_MOCK_EXTENSION = @@ -86,6 +88,7 @@ public void startup() throws Exception { S3AsyncClient s3AsyncClient = S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint()); s3CrtBlobFs = new S3CrtBlobFs(s3AsyncClient); + chunkStore = new ChunkStore(s3AsyncClient, TEST_S3_BUCKET); } @AfterEach @@ -146,7 +149,7 @@ private CachingChunkManager initChunkManager() throws TimeoutExcepti new CachingChunkManager<>( meterRegistry, curatorFramework, - s3CrtBlobFs, + chunkStore, SearchContext.fromConfig(AstraConfig.getCacheConfig().getServerConfig()), AstraConfig.getS3Config().getS3Bucket(), AstraConfig.getCacheConfig().getDataDirectory(),