Skip to content

Commit

Permalink
Remove use of blobfsutils copyFromS3 test helper
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Aug 13, 2024
1 parent 908e769 commit a542cc3
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 60 deletions.
20 changes: 0 additions & 20 deletions astra/src/main/java/com/slack/astra/blobfs/BlobFsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class contains static methods that help with blobfs operations. */
@Deprecated
public class BlobFsUtils {

private static final Logger LOG = LoggerFactory.getLogger(BlobFsUtils.class);

public static final String SCHEME = "s3";
public static final String DELIMITER = "/";
public static final String FILE_FORMAT = "%s://%s/%s";
Expand All @@ -41,19 +36,4 @@ public static URI createURI(String bucket, String prefix, String fileName) {
? URI.create(String.format(FILE_FORMAT, SCHEME, bucket + DELIMITER + prefix, fileName))
: URI.create(String.format(FILE_FORMAT, SCHEME, bucket, fileName));
}

// TODO: Can we copy files without list files and a prefix only?
// TODO: Take a complete URI as this is the format stored in snapshot data
@Deprecated
public static String[] copyFromS3(
String bucket, String prefix, BlobFs s3BlobFs, Path localDirPath) throws Exception {
LOG.debug("Copying files from bucket={} prefix={} using directory", bucket, prefix);
URI directoryToCopy = createURI(bucket, prefix, "");
s3BlobFs.copyToLocalFile(directoryToCopy, localDirPath.toFile());
LOG.debug("Copying S3 files complete");
return Arrays.stream(localDirPath.toFile().listFiles())
.map(File::toString)
.distinct()
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.slack.astra.chunkManager;

import static com.slack.astra.blobfs.BlobFsUtils.copyFromS3;
import static com.slack.astra.blobfs.BlobFsUtils.copyToS3;
import static com.slack.astra.chunk.ReadWriteChunk.SCHEMA_FILE_NAME;
import static com.slack.astra.chunkManager.CachingChunkManager.ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG;
Expand Down Expand Up @@ -44,6 +43,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -262,7 +262,12 @@ public void testCreatesChunksOnAssignment() throws Exception {
.ignoreExceptions()
.until(
() ->
copyFromS3(TEST_S3_BUCKET, snapshotId, s3CrtBlobFs, Path.of("/tmp/test1")).length
Objects.requireNonNull(
chunkStore
.download(snapshotId, Path.of("/tmp/test1"))
.toFile()
.listFiles())
.length
> 0);
initAssignment(snapshotId);

Expand Down Expand Up @@ -300,7 +305,12 @@ public void testBasicChunkEviction() throws Exception {
.ignoreExceptions()
.until(
() ->
copyFromS3(TEST_S3_BUCKET, snapshotId, s3CrtBlobFs, Path.of("/tmp/test2")).length
Objects.requireNonNull(
chunkStore
.download(snapshotId, Path.of("/tmp/test2"))
.toFile()
.listFiles())
.length
> 0);
CacheNodeAssignment assignment = initAssignment(snapshotId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.slack.astra.logstore;

import static com.slack.astra.blobfs.BlobFsUtils.DELIMITER;
import static com.slack.astra.blobfs.BlobFsUtils.copyFromS3;
import static com.slack.astra.blobfs.BlobFsUtils.copyToS3;
import static com.slack.astra.logstore.LuceneIndexStoreImpl.COMMITS_TIMER;
import static com.slack.astra.logstore.LuceneIndexStoreImpl.MESSAGES_FAILED_COUNTER;
import static com.slack.astra.logstore.LuceneIndexStoreImpl.MESSAGES_RECEIVED_COUNTER;
Expand All @@ -18,7 +15,7 @@
import brave.Tracing;
import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.google.protobuf.ByteString;
import com.slack.astra.blobfs.S3CrtBlobFs;
import com.slack.astra.blobfs.ChunkStore;
import com.slack.astra.blobfs.s3.S3TestUtils;
import com.slack.astra.logstore.LogMessage.ReservedField;
import com.slack.astra.logstore.schema.SchemaAwareLogDocumentBuilderImpl;
Expand All @@ -38,6 +35,8 @@
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -337,18 +336,9 @@ public class SnapshotTester {
public SnapshotTester() throws IOException {}

@Test
public void testS3SnapshotWithPrefix() throws Exception {
testS3Snapshot("test-bucket-with-prefix", "snapshot_prefix1");
}

@Test
public void testS3SnapshotWithEmptyPrefix() throws Exception {
testS3Snapshot("test-bucket-no-prefix", "");
}

private void testS3Snapshot(String bucket, String prefix) throws Exception {
public void testS3Snapshot() throws Exception {
LuceneIndexStoreImpl logStore = strictLogStore.logStore;
addMessages(logStore, 1, 100, true);
addMessages(strictLogStore.logStore, 1, 100, true);
Collection<LogMessage> results =
findAllMessages(
strictLogStore.logSearcher, MessageUtil.TEST_DATASET_NAME, "Message1", 100);
Expand All @@ -359,24 +349,27 @@ private void testS3Snapshot(String bucket, String prefix) throws Exception {
assertThat(getTimerCount(REFRESHES_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1);
assertThat(getTimerCount(COMMITS_TIMER, strictLogStore.metricsRegistry)).isEqualTo(1);

Path dirPath = logStore.getDirectory().getDirectory().toAbsolutePath();
IndexCommit indexCommit = logStore.getIndexCommit();
Path dirPath = strictLogStore.logStore.getDirectory().getDirectory().toAbsolutePath();
IndexCommit indexCommit = strictLogStore.logStore.getIndexCommit();
Collection<String> activeFiles = indexCommit.getFileNames();

logStore.close();
strictLogStore.logStore.close();
strictLogStore.logSearcher.close();
strictLogStore.logStore = null;
strictLogStore.logSearcher = null;
assertThat(dirPath.toFile().listFiles().length).isGreaterThanOrEqualTo(activeFiles.size());

assertThat(Objects.requireNonNull(dirPath.toFile().listFiles()).length)
.isGreaterThanOrEqualTo(activeFiles.size());

// create an S3 client
S3AsyncClient s3AsyncClient =
S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint());
S3CrtBlobFs s3CrtBlobFs = new S3CrtBlobFs(s3AsyncClient);
String bucket = "snapshot-test";
s3AsyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get();
ChunkStore chunkStore = new ChunkStore(s3AsyncClient, bucket);

// Copy files to S3.
copyToS3(dirPath, activeFiles, bucket, prefix, s3CrtBlobFs);
String chunkId = UUID.randomUUID().toString();
chunkStore.upload(chunkId, dirPath);

for (String fileName : activeFiles) {
File fileToCopy = new File(dirPath.toString(), fileName);
Expand All @@ -385,10 +378,7 @@ private void testS3Snapshot(String bucket, String prefix) throws Exception {
.headObject(
HeadObjectRequest.builder()
.bucket(bucket)
.key(
prefix != null && !prefix.isEmpty()
? prefix + DELIMITER + fileName
: fileName)
.key(String.format("%s/%s", chunkId, fileName))
.build())
.get();
assertThat(headObjectResponse.contentLength()).isEqualTo(fileToCopy.length());
Expand All @@ -404,15 +394,11 @@ private void testS3Snapshot(String bucket, String prefix) throws Exception {
// then fail)
FileUtils.cleanDirectory(tmpPath.toFile());
// Download files from S3 to local FS.
String[] s3Files =
copyFromS3(
bucket,
prefix,
s3CrtBlobFs,
tmpPath.toAbsolutePath()); // IO java.util.concurrent.ExecutionException:
// software.amazon.awssdk.core.exception.SdkClientException: Unexpected exception
// occurred: s3metaRequest is not initialized yet
return s3Files.length == activeFiles.size();
chunkStore.download(chunkId, tmpPath.toAbsolutePath());
// the delta is the presence of the write.lock file, which is released but still in
// the directory
return Objects.requireNonNull(tmpPath.toFile().listFiles()).length
>= activeFiles.size();
});

// Search files in local FS.
Expand All @@ -425,9 +411,7 @@ private void testS3Snapshot(String bucket, String prefix) throws Exception {
assertThat(newResults.size()).isEqualTo(1);

// Clean up
logStore.releaseIndexCommit(indexCommit);
newSearcher.close();
s3CrtBlobFs.close();
}
}

Expand Down

0 comments on commit a542cc3

Please sign in to comment.