Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
kx-chen committed Jun 4, 2024
1 parent 414ef9c commit 6c7faae
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 34 deletions.
30 changes: 17 additions & 13 deletions astra/src/main/java/com/slack/astra/chunk/ChunkInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public static ChunkInfo fromSnapshotMetadata(SnapshotMetadata snapshotMetadata)
snapshotMetadata.endTimeEpochMs,
snapshotMetadata.maxOffset,
snapshotMetadata.partitionId,
snapshotMetadata.snapshotPath);
snapshotMetadata.snapshotPath,
snapshotMetadata.sizeInBytesOnDisk);
}

public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String chunkPrefix) {
Expand All @@ -43,7 +44,7 @@ public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String ch
chunkInfo.maxOffset,
chunkInfo.kafkaPartitionId,
Metadata.IndexType.LOGS_LUCENE9,
chunkInfo.sizeInBytes);
chunkInfo.sizeInBytesOnDisk);
}

/* A unique identifier for a the chunk. */
Expand Down Expand Up @@ -79,8 +80,8 @@ public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String ch
// Path to S3 snapshot.
private String snapshotPath;

// Size of chunk in bytes
private long sizeInBytes;
// Size of chunk on disk in bytes
private long sizeInBytesOnDisk;

public ChunkInfo(
String chunkId, long chunkCreationTimeEpochMs, String kafkaPartitionId, String snapshotPath) {
Expand All @@ -94,7 +95,8 @@ public ChunkInfo(
0,
DEFAULT_MAX_OFFSET,
kafkaPartitionId,
snapshotPath);
snapshotPath,
0);
}

public ChunkInfo(
Expand All @@ -106,7 +108,8 @@ public ChunkInfo(
long chunkSnapshotTimeEpochMs,
long maxOffset,
String kafkaPartitionId,
String snapshotPath) {
String snapshotPath,
long sizeInBytesOnDisk) {
ensureTrue(chunkId != null && !chunkId.isEmpty(), "Invalid chunk dataset name " + chunkId);
ensureTrue(
chunkCreationTimeEpochMs >= 0,
Expand All @@ -122,6 +125,7 @@ public ChunkInfo(
this.maxOffset = maxOffset;
this.kafkaPartitionId = kafkaPartitionId;
this.snapshotPath = snapshotPath;
this.sizeInBytesOnDisk = sizeInBytesOnDisk;
}

public long getChunkSnapshotTimeEpochMs() {
Expand Down Expand Up @@ -168,12 +172,12 @@ public String getSnapshotPath() {
return snapshotPath;
}

public long getSizeInBytes() {
return sizeInBytes;
public long getSizeInBytesOnDisk() {
return sizeInBytesOnDisk;
}

public void setSizeInBytes(long sizeInBytes) {
this.sizeInBytes = sizeInBytes;
public void setSizeInBytesOnDisk(long sizeInBytesOnDisk) {
this.sizeInBytesOnDisk = sizeInBytesOnDisk;
}

public void updateMaxOffset(long newOffset) {
Expand Down Expand Up @@ -236,8 +240,8 @@ public String toString() {
+ chunkSnapshotTimeEpochMs
+ ", snapshotPath='"
+ snapshotPath
+ ", sizeInBytes='"
+ sizeInBytes
+ ", sizeInBytesOnDisk='"
+ sizeInBytesOnDisk
+ '}';
}

Expand All @@ -255,7 +259,7 @@ public boolean equals(Object o) {
&& Objects.equals(chunkId, chunkInfo.chunkId)
&& Objects.equals(kafkaPartitionId, chunkInfo.kafkaPartitionId)
&& Objects.equals(snapshotPath, chunkInfo.snapshotPath)
&& sizeInBytes == chunkInfo.sizeInBytes;
&& sizeInBytesOnDisk == chunkInfo.sizeInBytesOnDisk;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void postSnapshot() {
chunkInfo.getMaxOffset(),
chunkInfo.getKafkaPartitionId(),
Metadata.IndexType.LOGS_LUCENE9,
chunkInfo.getSizeInBytes());
chunkInfo.getSizeInBytesOnDisk());
snapshotMetadataStore.updateSync(updatedSnapshotMetadata);
liveSnapshotMetadata = updatedSnapshotMetadata;

Expand Down
10 changes: 5 additions & 5 deletions astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.micrometer.core.instrument.Timer;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -89,7 +90,6 @@ public abstract class ReadWriteChunk<T> implements Chunk<T> {

// TODO: Move this flag into LogStore?.
private boolean readOnly;
protected long size;

protected ReadWriteChunk(
LogStore logStore,
Expand Down Expand Up @@ -245,17 +245,17 @@ public boolean snapshotToS3(String bucket, String prefix, BlobFs blobFs) {
// Upload files
logger.info("{} active files in {} in index", filesToUpload.size(), dirPath);
for (String fileName : filesToUpload) {
File fileToCopy = new File(dirPath.toString(), fileName);
totalBytes += fileToCopy.length();
logger.debug("File name is {} ({} bytes)", fileName, fileToCopy.length());
long sizeOfFile = Files.size(Path.of(dirPath + "/" + fileName));
totalBytes += sizeOfFile;
logger.debug("File name is {} ({} bytes)", fileName, sizeOfFile);
}
this.fileUploadAttempts.increment(filesToUpload.size());
Timer.Sample snapshotTimer = Timer.start(meterRegistry);
final int success = copyToS3(dirPath, filesToUpload, bucket, prefix, blobFs);
snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER));
this.fileUploadFailures.increment(filesToUpload.size() - success);
chunkInfo.setSnapshotPath(createURI(bucket, prefix, "").toString());
chunkInfo.setSizeInBytes(totalBytes);
chunkInfo.setSizeInBytesOnDisk(totalBytes);
logger.info("Finished RW chunk snapshot to S3 {}.", chunkInfo);
return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static boolean isLive(SnapshotMetadata snapshotMetadata) {
public final long maxOffset;
public final String partitionId;
public final Metadata.IndexType indexType;
public long sizeInBytes;
public long sizeInBytesOnDisk;

public SnapshotMetadata(
String snapshotId,
Expand All @@ -46,7 +46,7 @@ public SnapshotMetadata(
long maxOffset,
String partitionId,
Metadata.IndexType indexType,
long sizeInBytes) {
long sizeInBytesOnDisk) {
this(
snapshotId,
snapshotPath,
Expand All @@ -55,7 +55,7 @@ public SnapshotMetadata(
endTimeEpochMs,
maxOffset,
partitionId,
sizeInBytes,
sizeInBytesOnDisk,
indexType);
}

Expand All @@ -67,7 +67,7 @@ private SnapshotMetadata(
long endTimeEpochMs,
long maxOffset,
String partitionId,
long sizeInBytes,
long sizeInBytesOnDisk,
Metadata.IndexType indexType) {
super(name);
checkArgument(snapshotId != null && !snapshotId.isEmpty(), "snapshotId can't be null or empty");
Expand All @@ -81,7 +81,6 @@ private SnapshotMetadata(
partitionId != null && !partitionId.isEmpty(), "partitionId can't be null or empty");
checkArgument(
snapshotPath != null && !snapshotPath.isEmpty(), "snapshotPath can't be null or empty");
checkArgument(sizeInBytes >= 0, "size should be greater than or equal to zero.");

this.snapshotPath = snapshotPath;
this.snapshotId = snapshotId;
Expand All @@ -90,7 +89,7 @@ private SnapshotMetadata(
this.maxOffset = maxOffset;
this.partitionId = partitionId;
this.indexType = indexType;
this.sizeInBytes = sizeInBytes;
this.sizeInBytesOnDisk = sizeInBytesOnDisk;
}

@Override
Expand All @@ -110,7 +109,7 @@ public boolean equals(Object o) {
return false;
if (partitionId != null ? !partitionId.equals(that.partitionId) : that.partitionId != null)
return false;
if (sizeInBytes != that.sizeInBytes) return false;
if (sizeInBytesOnDisk != that.sizeInBytesOnDisk) return false;
return indexType == that.indexType;
}

Expand All @@ -124,7 +123,7 @@ public int hashCode() {
result = 31 * result + (int) (maxOffset ^ (maxOffset >>> 32));
result = 31 * result + (partitionId != null ? partitionId.hashCode() : 0);
result = 31 * result + (indexType != null ? indexType.hashCode() : 0);
result = 31 * result + Long.hashCode(sizeInBytes);
result = 31 * result + Long.hashCode(sizeInBytesOnDisk);
return result;
}

Expand Down Expand Up @@ -152,8 +151,8 @@ public String toString() {
+ '\''
+ ", indexType="
+ indexType
+ ", sizeInBytes="
+ sizeInBytes
+ ", sizeInBytesOnDisk="
+ sizeInBytesOnDisk
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private static Metadata.SnapshotMetadata toSnapshotMetadataProto(
.setPartitionId(snapshotMetadata.partitionId)
.setMaxOffset(snapshotMetadata.maxOffset)
.setIndexType(snapshotMetadata.indexType)
.setSizeInBytes(snapshotMetadata.sizeInBytes)
.setSizeInBytes(snapshotMetadata.sizeInBytesOnDisk)
.build();
}

Expand Down
3 changes: 2 additions & 1 deletion astra/src/test/java/com/slack/astra/chunk/ChunkInfoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ public void snapshotMetadataConversion() {
dataEnd,
1000,
TEST_KAFKA_PARTITION_ID,
TEST_SNAPSHOT_PATH);
TEST_SNAPSHOT_PATH,
0);
assertThat(fromSnapshotMetadata(toSnapshotMetadata(chunkInfo, ""))).isEqualTo(chunkInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception {
.contains(SnapshotMetadata.LIVE_SNAPSHOT_PATH);

// Check total size of objects uploaded was correctly tracked
assertThat(chunk.info().getSizeInBytes())
assertThat(chunk.info().getSizeInBytesOnDisk())
.isEqualTo(objectsResponse.contents().stream().mapToLong(S3Object::size).sum());

chunk.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,8 @@ private SnapshotMetadata createSnapshot(
chunkEndTime.toEpochMilli(),
1234,
partition,
isLive ? LIVE_SNAPSHOT_PATH : "cacheSnapshotPath");
isLive ? LIVE_SNAPSHOT_PATH : "cacheSnapshotPath",
0);
SnapshotMetadata snapshotMetadata =
toSnapshotMetadata(chunkInfo, isLive ? LIVE_SNAPSHOT_PREFIX : "");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

import com.google.protobuf.InvalidProtocolBufferException;
import com.slack.astra.proto.metadata.Metadata;
import org.junit.jupiter.api.Test;

public class SnapshotMetadataSerializerTest {
Expand All @@ -18,10 +19,11 @@ public void testSnapshotMetadataSerializer() throws InvalidProtocolBufferExcepti
final long endTime = 100;
final long maxOffset = 123;
final String partitionId = "1";
final long sizeInBytes = 0;

SnapshotMetadata snapshotMetadata =
new SnapshotMetadata(
name, path, startTime, endTime, maxOffset, partitionId, LOGS_LUCENE9, 0);
name, path, startTime, endTime, maxOffset, partitionId, LOGS_LUCENE9, sizeInBytes);

String serializedSnapshot = serDe.toJsonStr(snapshotMetadata);
assertThat(serializedSnapshot).isNotEmpty();
Expand All @@ -37,6 +39,46 @@ public void testSnapshotMetadataSerializer() throws InvalidProtocolBufferExcepti
assertThat(deserializedSnapshotMetadata.maxOffset).isEqualTo(maxOffset);
assertThat(deserializedSnapshotMetadata.partitionId).isEqualTo(partitionId);
assertThat(deserializedSnapshotMetadata.indexType).isEqualTo(LOGS_LUCENE9);
assertThat(deserializedSnapshotMetadata.sizeInBytesOnDisk).isEqualTo(sizeInBytes);
}

@Test
public void testDeserializingWithoutSizeField() throws InvalidProtocolBufferException {
final String name = "testSnapshotId";
final String path = "/testPath_" + name;
final long startTime = 1;
final long endTime = 100;
final long maxOffset = 123;
final String partitionId = "1";

Metadata.SnapshotMetadata protoSnapshotMetadata =
Metadata.SnapshotMetadata.newBuilder()
.setName(name)
.setSnapshotPath(path)
.setSnapshotId(name)
.setStartTimeEpochMs(startTime)
.setEndTimeEpochMs(endTime)
.setMaxOffset(maxOffset)
.setPartitionId(partitionId)
.setIndexType(LOGS_LUCENE9)
// leaving out the `size` field
.build();

SnapshotMetadata deserializedSnapshotMetadata =
serDe.fromJsonStr(serDe.printer.print(protoSnapshotMetadata));

// Assert size is 0
assertThat(deserializedSnapshotMetadata.sizeInBytesOnDisk).isEqualTo(0);

// Assert everything else is deserialized correctly
assertThat(deserializedSnapshotMetadata.name).isEqualTo(name);
assertThat(deserializedSnapshotMetadata.snapshotPath).isEqualTo(path);
assertThat(deserializedSnapshotMetadata.snapshotId).isEqualTo(name);
assertThat(deserializedSnapshotMetadata.startTimeEpochMs).isEqualTo(startTime);
assertThat(deserializedSnapshotMetadata.endTimeEpochMs).isEqualTo(endTime);
assertThat(deserializedSnapshotMetadata.maxOffset).isEqualTo(maxOffset);
assertThat(deserializedSnapshotMetadata.partitionId).isEqualTo(partitionId);
assertThat(deserializedSnapshotMetadata.indexType).isEqualTo(LOGS_LUCENE9);
}

@Test
Expand Down

0 comments on commit 6c7faae

Please sign in to comment.