From 6c7faae980dcae8e0974936bffebf12481d8b28a Mon Sep 17 00:00:00 2001 From: Kai Chen Date: Fri, 31 May 2024 17:08:01 -0700 Subject: [PATCH] review --- .../java/com/slack/astra/chunk/ChunkInfo.java | 30 +++++++------ .../slack/astra/chunk/IndexingChunkImpl.java | 2 +- .../com/slack/astra/chunk/ReadWriteChunk.java | 10 ++--- .../metadata/snapshot/SnapshotMetadata.java | 19 ++++---- .../snapshot/SnapshotMetadataSerializer.java | 2 +- .../com/slack/astra/chunk/ChunkInfoTest.java | 3 +- .../astra/chunk/IndexingChunkImplTest.java | 2 +- .../AstraDistributedQueryServiceTest.java | 3 +- .../SnapshotMetadataSerializerTest.java | 44 ++++++++++++++++++- 9 files changed, 81 insertions(+), 34 deletions(-) diff --git a/astra/src/main/java/com/slack/astra/chunk/ChunkInfo.java b/astra/src/main/java/com/slack/astra/chunk/ChunkInfo.java index 54bec143dc..b7c09e598f 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ChunkInfo.java +++ b/astra/src/main/java/com/slack/astra/chunk/ChunkInfo.java @@ -31,7 +31,8 @@ public static ChunkInfo fromSnapshotMetadata(SnapshotMetadata snapshotMetadata) snapshotMetadata.endTimeEpochMs, snapshotMetadata.maxOffset, snapshotMetadata.partitionId, - snapshotMetadata.snapshotPath); + snapshotMetadata.snapshotPath, + snapshotMetadata.sizeInBytesOnDisk); } public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String chunkPrefix) { @@ -43,7 +44,7 @@ public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String ch chunkInfo.maxOffset, chunkInfo.kafkaPartitionId, Metadata.IndexType.LOGS_LUCENE9, - chunkInfo.sizeInBytes); + chunkInfo.sizeInBytesOnDisk); } /* A unique identifier for a the chunk. */ @@ -79,8 +80,8 @@ public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String ch // Path to S3 snapshot. private String snapshotPath; - // Size of chunk in bytes - private long sizeInBytes; + // Size of chunk on disk in bytes + private long sizeInBytesOnDisk; public ChunkInfo( String chunkId, long chunkCreationTimeEpochMs, String kafkaPartitionId, String snapshotPath) { @@ -94,7 +95,8 @@ public ChunkInfo( 0, DEFAULT_MAX_OFFSET, kafkaPartitionId, - snapshotPath); + snapshotPath, + 0); } public ChunkInfo( @@ -106,7 +108,8 @@ public ChunkInfo( long chunkSnapshotTimeEpochMs, long maxOffset, String kafkaPartitionId, - String snapshotPath) { + String snapshotPath, + long sizeInBytesOnDisk) { ensureTrue(chunkId != null && !chunkId.isEmpty(), "Invalid chunk dataset name " + chunkId); ensureTrue( chunkCreationTimeEpochMs >= 0, @@ -122,6 +125,7 @@ public ChunkInfo( this.maxOffset = maxOffset; this.kafkaPartitionId = kafkaPartitionId; this.snapshotPath = snapshotPath; + this.sizeInBytesOnDisk = sizeInBytesOnDisk; } public long getChunkSnapshotTimeEpochMs() { @@ -168,12 +172,12 @@ public String getSnapshotPath() { return snapshotPath; } - public long getSizeInBytes() { - return sizeInBytes; + public long getSizeInBytesOnDisk() { + return sizeInBytesOnDisk; } - public void setSizeInBytes(long sizeInBytes) { - this.sizeInBytes = sizeInBytes; + public void setSizeInBytesOnDisk(long sizeInBytesOnDisk) { + this.sizeInBytesOnDisk = sizeInBytesOnDisk; } public void updateMaxOffset(long newOffset) { @@ -236,8 +240,8 @@ public String toString() { + chunkSnapshotTimeEpochMs + ", snapshotPath='" + snapshotPath - + ", sizeInBytes='" - + sizeInBytes + + ", sizeInBytesOnDisk='" + + sizeInBytesOnDisk + '}'; } @@ -255,7 +259,7 @@ public boolean equals(Object o) { && Objects.equals(chunkId, chunkInfo.chunkId) && Objects.equals(kafkaPartitionId, chunkInfo.kafkaPartitionId) && Objects.equals(snapshotPath, chunkInfo.snapshotPath) - && sizeInBytes == chunkInfo.sizeInBytes; + && sizeInBytesOnDisk == chunkInfo.sizeInBytesOnDisk; } @Override diff --git a/astra/src/main/java/com/slack/astra/chunk/IndexingChunkImpl.java b/astra/src/main/java/com/slack/astra/chunk/IndexingChunkImpl.java index ce07ad09d8..70fc76e74d 100644 --- a/astra/src/main/java/com/slack/astra/chunk/IndexingChunkImpl.java +++ b/astra/src/main/java/com/slack/astra/chunk/IndexingChunkImpl.java @@ -77,7 +77,7 @@ public void postSnapshot() { chunkInfo.getMaxOffset(), chunkInfo.getKafkaPartitionId(), Metadata.IndexType.LOGS_LUCENE9, - chunkInfo.getSizeInBytes()); + chunkInfo.getSizeInBytesOnDisk()); snapshotMetadataStore.updateSync(updatedSnapshotMetadata); liveSnapshotMetadata = updatedSnapshotMetadata; diff --git a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java index a7a13573ea..3f2dffcca5 100644 --- a/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java +++ b/astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java @@ -25,6 +25,7 @@ import io.micrometer.core.instrument.Timer; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; @@ -89,7 +90,6 @@ public abstract class ReadWriteChunk implements Chunk { // TODO: Move this flag into LogStore?. private boolean readOnly; - protected long size; protected ReadWriteChunk( LogStore logStore, @@ -245,9 +245,9 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) { // Upload files logger.info("{} active files in {} in index", filesToUpload.size(), dirPath); for (String fileName : filesToUpload) { - File fileToCopy = new File(dirPath.toString(), fileName); - totalBytes += fileToCopy.length(); - logger.debug("File name is {} ({} bytes)", fileName, fileToCopy.length()); + long sizeOfFile = Files.size(Path.of(dirPath + "/" + fileName)); + totalBytes += sizeOfFile; + logger.debug("File name is {} ({} bytes)", fileName, sizeOfFile); } this.fileUploadAttempts.increment(filesToUpload.size()); Timer.Sample snapshotTimer = Timer.start(meterRegistry); @@ -255,7 +255,7 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) { snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER)); this.fileUploadFailures.increment(filesToUpload.size() - success); chunkInfo.setSnapshotPath(createURI(bucket, prefix, "").toString()); - chunkInfo.setSizeInBytes(totalBytes); + chunkInfo.setSizeInBytesOnDisk(totalBytes); logger.info("Finished RW chunk snapshot to S3 {}.", chunkInfo); return true; } catch (Exception e) { diff --git a/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadata.java b/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadata.java index 1a01983e1c..0f0dd25873 100644 --- a/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadata.java +++ b/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadata.java @@ -36,7 +36,7 @@ public static boolean isLive(SnapshotMetadata snapshotMetadata) { public final long maxOffset; public final String partitionId; public final Metadata.IndexType indexType; - public long sizeInBytes; + public long sizeInBytesOnDisk; public SnapshotMetadata( String snapshotId, @@ -46,7 +46,7 @@ public SnapshotMetadata( long maxOffset, String partitionId, Metadata.IndexType indexType, - long sizeInBytes) { + long sizeInBytesOnDisk) { this( snapshotId, snapshotPath, @@ -55,7 +55,7 @@ public SnapshotMetadata( endTimeEpochMs, maxOffset, partitionId, - sizeInBytes, + sizeInBytesOnDisk, indexType); } @@ -67,7 +67,7 @@ private SnapshotMetadata( long endTimeEpochMs, long maxOffset, String partitionId, - long sizeInBytes, + long sizeInBytesOnDisk, Metadata.IndexType indexType) { super(name); checkArgument(snapshotId != null && !snapshotId.isEmpty(), "snapshotId can't be null or empty"); @@ -81,7 +81,6 @@ private SnapshotMetadata( partitionId != null && !partitionId.isEmpty(), "partitionId can't be null or empty"); checkArgument( snapshotPath != null && !snapshotPath.isEmpty(), "snapshotPath can't be null or empty"); - checkArgument(sizeInBytes >= 0, "size should be greater than or equal to zero."); this.snapshotPath = snapshotPath; this.snapshotId = snapshotId; @@ -90,7 +89,7 @@ private SnapshotMetadata( this.maxOffset = maxOffset; this.partitionId = partitionId; this.indexType = indexType; - this.sizeInBytes = sizeInBytes; + this.sizeInBytesOnDisk = sizeInBytesOnDisk; } @Override @@ -110,7 +109,7 @@ public boolean equals(Object o) { return false; if (partitionId != null ? !partitionId.equals(that.partitionId) : that.partitionId != null) return false; - if (sizeInBytes != that.sizeInBytes) return false; + if (sizeInBytesOnDisk != that.sizeInBytesOnDisk) return false; return indexType == that.indexType; } @@ -124,7 +123,7 @@ public int hashCode() { result = 31 * result + (int) (maxOffset ^ (maxOffset >>> 32)); result = 31 * result + (partitionId != null ? partitionId.hashCode() : 0); result = 31 * result + (indexType != null ? indexType.hashCode() : 0); - result = 31 * result + Long.hashCode(sizeInBytes); + result = 31 * result + Long.hashCode(sizeInBytesOnDisk); return result; } @@ -152,8 +151,8 @@ public String toString() { + '\'' + ", indexType=" + indexType - + ", sizeInBytes=" - + sizeInBytes + + ", sizeInBytesOnDisk=" + + sizeInBytesOnDisk + '}'; } diff --git a/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializer.java b/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializer.java index f25aef80cc..f2224adfb3 100644 --- a/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializer.java +++ b/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializer.java @@ -17,7 +17,7 @@ private static Metadata.SnapshotMetadata toSnapshotMetadataProto( .setPartitionId(snapshotMetadata.partitionId) .setMaxOffset(snapshotMetadata.maxOffset) .setIndexType(snapshotMetadata.indexType) - .setSizeInBytes(snapshotMetadata.sizeInBytes) + .setSizeInBytes(snapshotMetadata.sizeInBytesOnDisk) .build(); } diff --git a/astra/src/test/java/com/slack/astra/chunk/ChunkInfoTest.java b/astra/src/test/java/com/slack/astra/chunk/ChunkInfoTest.java index a057ada7da..c547d6993b 100644 --- a/astra/src/test/java/com/slack/astra/chunk/ChunkInfoTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/ChunkInfoTest.java @@ -357,7 +357,8 @@ public void snapshotMetadataConversion() { dataEnd, 1000, TEST_KAFKA_PARTITION_ID, - TEST_SNAPSHOT_PATH); + TEST_SNAPSHOT_PATH, + 0); assertThat(fromSnapshotMetadata(toSnapshotMetadata(chunkInfo, ""))).isEqualTo(chunkInfo); } } diff --git a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java index 0324ce89c3..60e7044a57 100644 --- a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java @@ -737,7 +737,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception { .contains(SnapshotMetadata.LIVE_SNAPSHOT_PATH); // Check total size of objects uploaded was correctly tracked - assertThat(chunk.info().getSizeInBytes()) + assertThat(chunk.info().getSizeInBytesOnDisk()) .isEqualTo(objectsResponse.contents().stream().mapToLong(S3Object::size).sum()); chunk.close(); diff --git a/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java b/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java index 86be317386..0831b04c16 100644 --- a/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java +++ b/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java @@ -969,7 +969,8 @@ private SnapshotMetadata createSnapshot( chunkEndTime.toEpochMilli(), 1234, partition, - isLive ? LIVE_SNAPSHOT_PATH : "cacheSnapshotPath"); + isLive ? LIVE_SNAPSHOT_PATH : "cacheSnapshotPath", + 0); SnapshotMetadata snapshotMetadata = toSnapshotMetadata(chunkInfo, isLive ? LIVE_SNAPSHOT_PREFIX : ""); diff --git a/astra/src/test/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializerTest.java b/astra/src/test/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializerTest.java index ba838b1573..2d8987e124 100644 --- a/astra/src/test/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializerTest.java +++ b/astra/src/test/java/com/slack/astra/metadata/snapshot/SnapshotMetadataSerializerTest.java @@ -5,6 +5,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; import com.google.protobuf.InvalidProtocolBufferException; +import com.slack.astra.proto.metadata.Metadata; import org.junit.jupiter.api.Test; public class SnapshotMetadataSerializerTest { @@ -18,10 +19,11 @@ public void testSnapshotMetadataSerializer() throws InvalidProtocolBufferExcepti final long endTime = 100; final long maxOffset = 123; final String partitionId = "1"; + final long sizeInBytes = 0; SnapshotMetadata snapshotMetadata = new SnapshotMetadata( - name, path, startTime, endTime, maxOffset, partitionId, LOGS_LUCENE9, 0); + name, path, startTime, endTime, maxOffset, partitionId, LOGS_LUCENE9, sizeInBytes); String serializedSnapshot = serDe.toJsonStr(snapshotMetadata); assertThat(serializedSnapshot).isNotEmpty(); @@ -37,6 +39,46 @@ public void testSnapshotMetadataSerializer() throws InvalidProtocolBufferExcepti assertThat(deserializedSnapshotMetadata.maxOffset).isEqualTo(maxOffset); assertThat(deserializedSnapshotMetadata.partitionId).isEqualTo(partitionId); assertThat(deserializedSnapshotMetadata.indexType).isEqualTo(LOGS_LUCENE9); + assertThat(deserializedSnapshotMetadata.sizeInBytesOnDisk).isEqualTo(sizeInBytes); + } + + @Test + public void testDeserializingWithoutSizeField() throws InvalidProtocolBufferException { + final String name = "testSnapshotId"; + final String path = "/testPath_" + name; + final long startTime = 1; + final long endTime = 100; + final long maxOffset = 123; + final String partitionId = "1"; + + Metadata.SnapshotMetadata protoSnapshotMetadata = + Metadata.SnapshotMetadata.newBuilder() + .setName(name) + .setSnapshotPath(path) + .setSnapshotId(name) + .setStartTimeEpochMs(startTime) + .setEndTimeEpochMs(endTime) + .setMaxOffset(maxOffset) + .setPartitionId(partitionId) + .setIndexType(LOGS_LUCENE9) + // leaving out the `size` field + .build(); + + SnapshotMetadata deserializedSnapshotMetadata = + serDe.fromJsonStr(serDe.printer.print(protoSnapshotMetadata)); + + // Assert size is 0 + assertThat(deserializedSnapshotMetadata.sizeInBytesOnDisk).isEqualTo(0); + + // Assert everything else is deserialized correctly + assertThat(deserializedSnapshotMetadata.name).isEqualTo(name); + assertThat(deserializedSnapshotMetadata.snapshotPath).isEqualTo(path); + assertThat(deserializedSnapshotMetadata.snapshotId).isEqualTo(name); + assertThat(deserializedSnapshotMetadata.startTimeEpochMs).isEqualTo(startTime); + assertThat(deserializedSnapshotMetadata.endTimeEpochMs).isEqualTo(endTime); + assertThat(deserializedSnapshotMetadata.maxOffset).isEqualTo(maxOffset); + assertThat(deserializedSnapshotMetadata.partitionId).isEqualTo(partitionId); + assertThat(deserializedSnapshotMetadata.indexType).isEqualTo(LOGS_LUCENE9); } @Test