Skip to content

Commit

Permalink
Remove snapshotpath, index type from snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Aug 14, 2024
1 parent a50d63e commit 04090c5
Show file tree
Hide file tree
Showing 42 changed files with 284 additions and 1,402 deletions.
27 changes: 2 additions & 25 deletions astra/src/main/java/com/slack/astra/chunk/ChunkInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static com.slack.astra.util.ArgValidationUtils.ensureTrue;

import com.slack.astra.metadata.snapshot.SnapshotMetadata;
import com.slack.astra.proto.metadata.Metadata;
import java.util.Objects;

/**
Expand Down Expand Up @@ -31,19 +30,16 @@ public static ChunkInfo fromSnapshotMetadata(SnapshotMetadata snapshotMetadata)
snapshotMetadata.endTimeEpochMs,
snapshotMetadata.maxOffset,
snapshotMetadata.partitionId,
snapshotMetadata.snapshotPath,
snapshotMetadata.sizeInBytesOnDisk);
}

public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String chunkPrefix) {
return new SnapshotMetadata(
chunkPrefix + chunkInfo.chunkId,
chunkInfo.snapshotPath,
chunkInfo.getDataStartTimeEpochMs(),
chunkInfo.getDataEndTimeEpochMs(),
chunkInfo.maxOffset,
chunkInfo.kafkaPartitionId,
Metadata.IndexType.LOGS_LUCENE9,
chunkInfo.sizeInBytesOnDisk);
}

Expand Down Expand Up @@ -77,14 +73,10 @@ public static SnapshotMetadata toSnapshotMetadata(ChunkInfo chunkInfo, String ch
// indexing and snapshotting and is not useful afterwards.
private long chunkSnapshotTimeEpochMs;

// Path to S3 snapshot.
private String snapshotPath;

// Size of chunk on disk in bytes
private long sizeInBytesOnDisk;

public ChunkInfo(
String chunkId, long chunkCreationTimeEpochMs, String kafkaPartitionId, String snapshotPath) {
public ChunkInfo(String chunkId, long chunkCreationTimeEpochMs, String kafkaPartitionId) {
// TODO: Should we set the snapshot time to creation time also?
this(
chunkId,
Expand All @@ -95,7 +87,6 @@ public ChunkInfo(
0,
DEFAULT_MAX_OFFSET,
kafkaPartitionId,
snapshotPath,
0);
}

Expand All @@ -108,7 +99,6 @@ public ChunkInfo(
long chunkSnapshotTimeEpochMs,
long maxOffset,
String kafkaPartitionId,
String snapshotPath,
long sizeInBytesOnDisk) {
ensureTrue(chunkId != null && !chunkId.isEmpty(), "Invalid chunk dataset name " + chunkId);
ensureTrue(
Expand All @@ -124,7 +114,6 @@ public ChunkInfo(
this.chunkSnapshotTimeEpochMs = chunkSnapshotTimeEpochMs;
this.maxOffset = maxOffset;
this.kafkaPartitionId = kafkaPartitionId;
this.snapshotPath = snapshotPath;
this.sizeInBytesOnDisk = sizeInBytesOnDisk;
}

Expand Down Expand Up @@ -164,14 +153,6 @@ public void setChunkLastUpdatedTimeEpochMs(long chunkLastUpdatedTimeEpochMs) {
this.chunkLastUpdatedTimeEpochMs = chunkLastUpdatedTimeEpochMs;
}

public void setSnapshotPath(String snapshotPath) {
this.snapshotPath = snapshotPath;
}

public String getSnapshotPath() {
return snapshotPath;
}

public long getSizeInBytesOnDisk() {
return sizeInBytesOnDisk;
}
Expand Down Expand Up @@ -238,8 +219,6 @@ public String toString() {
+ dataEndTimeEpochMs
+ ", chunkSnapshotTimeEpochMs="
+ chunkSnapshotTimeEpochMs
+ ", snapshotPath='"
+ snapshotPath
+ ", sizeInBytesOnDisk='"
+ sizeInBytesOnDisk
+ '}';
Expand All @@ -258,7 +237,6 @@ public boolean equals(Object o) {
&& chunkSnapshotTimeEpochMs == chunkInfo.chunkSnapshotTimeEpochMs
&& Objects.equals(chunkId, chunkInfo.chunkId)
&& Objects.equals(kafkaPartitionId, chunkInfo.kafkaPartitionId)
&& Objects.equals(snapshotPath, chunkInfo.snapshotPath)
&& sizeInBytesOnDisk == chunkInfo.sizeInBytesOnDisk;
}

Expand All @@ -272,7 +250,6 @@ public int hashCode() {
chunkLastUpdatedTimeEpochMs,
dataStartTimeEpochMs,
dataEndTimeEpochMs,
chunkSnapshotTimeEpochMs,
snapshotPath);
chunkSnapshotTimeEpochMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.slack.astra.metadata.search.SearchMetadataStore;
import com.slack.astra.metadata.snapshot.SnapshotMetadata;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.metadata.Metadata;
import io.micrometer.core.instrument.MeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,18 +65,16 @@ public void postSnapshot() {
SnapshotMetadata nonLiveSnapshotMetadata = toSnapshotMetadata(chunkInfo, "");
snapshotMetadataStore.createSync(nonLiveSnapshotMetadata);

// Update the live snapshot. Keep the same snapshotId and snapshotPath to
// Update the live snapshot. Keep the same snapshotId to
// ensure it's a live snapshot.
SnapshotMetadata updatedSnapshotMetadata =
new SnapshotMetadata(
liveSnapshotMetadata.snapshotId,
liveSnapshotMetadata.snapshotPath,
chunkInfo.getDataStartTimeEpochMs(),
chunkInfo.getDataEndTimeEpochMs(),
chunkInfo.getMaxOffset(),
chunkInfo.getKafkaPartitionId(),
Metadata.IndexType.LOGS_LUCENE9,
chunkInfo.getSizeInBytesOnDisk());
liveSnapshotMetadata.sizeInBytesOnDisk);
snapshotMetadataStore.updateSync(updatedSnapshotMetadata);
liveSnapshotMetadata = updatedSnapshotMetadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -152,7 +151,6 @@ public ReadOnlyChunkImpl(
Metadata.CacheSlotMetadata.CacheSlotState.FREE,
"",
Instant.now().toEpochMilli(),
List.of(Metadata.IndexType.LOGS_LUCENE9),
searchContext.hostname,
replicaSet);
cacheSlotMetadataStore.createSync(cacheSlotMetadata);
Expand Down Expand Up @@ -265,9 +263,8 @@ public void downloadChunkData() {
long durationNanos = assignmentTimer.stop(chunkAssignmentTimerSuccess);

LOG.info(
"Downloaded chunk with snapshot id '{}' at path '{}' in {} seconds, was {}",
"Downloaded chunk with snapshot id '{}' in {} seconds, was {}",
snapshotMetadata.snapshotId,
snapshotMetadata.snapshotPath,
TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS),
FileUtils.byteCountToDisplaySize(FileUtils.sizeOfDirectory(dataDirectory.toFile())));
} catch (Exception e) {
Expand Down Expand Up @@ -408,9 +405,8 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {
long durationNanos = assignmentTimer.stop(chunkAssignmentTimerSuccess);

LOG.debug(
"Downloaded chunk with snapshot id '{}' at path '{}' in {} seconds, was {}",
"Downloaded chunk with snapshot id '{}' in {} seconds, was {}",
snapshotMetadata.snapshotId,
snapshotMetadata.snapshotPath,
TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS),
FileUtils.byteCountToDisplaySize(FileUtils.sizeOfDirectory(dataDirectory.toFile())));
} catch (Exception e) {
Expand Down
6 changes: 2 additions & 4 deletions astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public abstract class ReadWriteChunk<T> implements Chunk<T> {
public static final String INDEX_FILES_UPLOAD = "index_files_upload";
public static final String INDEX_FILES_UPLOAD_FAILED = "index_files_upload_failed";
public static final String SNAPSHOT_TIMER = "snapshot.timer";
public static final String LIVE_SNAPSHOT_PREFIX = SnapshotMetadata.LIVE_SNAPSHOT_PATH + "_";
public static final String LIVE_SNAPSHOT_PREFIX = "LIVE_";
public static final String SCHEMA_FILE_NAME = "schema.json";

private final LogStore logStore;
Expand Down Expand Up @@ -111,8 +111,7 @@ protected ReadWriteChunk(
new ChunkInfo(
chunkDataPrefix + "_" + chunkCreationTime.getEpochSecond() + "_" + logStoreId,
chunkCreationTime.toEpochMilli(),
kafkaPartitionId,
SnapshotMetadata.LIVE_SNAPSHOT_PATH);
kafkaPartitionId);

readOnly = false;
this.meterRegistry = meterRegistry;
Expand Down Expand Up @@ -253,7 +252,6 @@ public boolean snapshotToS3(ChunkStore chunkStore) {
chunkStore.upload(chunkInfo.chunkId, dirPath);

snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER));
chunkInfo.setSnapshotPath(chunkStore.getRemotePath(chunkInfo.chunkId));
chunkInfo.setSizeInBytesOnDisk(totalBytes);
logger.info("Finished RW chunk snapshot to S3 {}.", chunkInfo);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public SearchResult<T> query(SearchQuery query, Duration queryTimeout) {
Tracing.currentTracer()
.startScopedSpan("ChunkManagerBase.chunkQuery");
span.tag("chunkId", chunk.id());
span.tag("chunkSnapshotPath", chunk.info().getSnapshotPath());
concurrentQueries.acquire();
try {
return chunk.query(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.slack.astra.metadata.snapshot.SnapshotMetadata;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.proto.metadata.Metadata;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -177,7 +176,7 @@ protected Map<String, Integer> createReplicasForUnassignedSnapshots() {
.filter(
snapshotMetadata ->
snapshotMetadata.endTimeEpochMs > snapshotExpiration
&& !SnapshotMetadata.isLive(snapshotMetadata)
&& !snapshotMetadata.isLive()
&& !existingReplicas.contains(snapshotMetadata.snapshotId))
.map(
(snapshotMetadata) -> {
Expand Down Expand Up @@ -249,7 +248,6 @@ public static ReplicaMetadata replicaMetadataFromSnapshotId(
replicaSet,
Instant.now().toEpochMilli(),
expireAfter.toEpochMilli(),
isRestored,
Metadata.IndexType.LOGS_LUCENE9);
isRestored);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.astra.blobfs.ChunkStore;
import com.slack.astra.metadata.replica.ReplicaMetadataStore;
import com.slack.astra.metadata.snapshot.SnapshotMetadata;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -175,7 +174,7 @@ protected int deleteExpiredSnapshotsWithoutReplicas() {
// served from the indexers. To avoid the whole headache of managing all the
// different states we could be in, we should just disable the deletion of live
// snapshots whole-cloth. We clean those up when a node boots anyhow
.filter(snapshotMetadata -> !SnapshotMetadata.isLive(snapshotMetadata))
.filter(snapshotMetadata -> !snapshotMetadata.isLive())
.map(
snapshotMetadata -> {
ListenableFuture<?> future =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

import com.slack.astra.metadata.core.AstraPartitionedMetadata;
import com.slack.astra.proto.metadata.Metadata;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
Expand All @@ -18,23 +16,18 @@ public class CacheSlotMetadata extends AstraPartitionedMetadata {
public final Metadata.CacheSlotMetadata.CacheSlotState cacheSlotState;
public final String replicaId;
public final long updatedTimeEpochMs;
public final List<Metadata.IndexType> supportedIndexTypes;

public CacheSlotMetadata(
String name,
Metadata.CacheSlotMetadata.CacheSlotState cacheSlotState,
String replicaId,
long updatedTimeEpochMs,
List<Metadata.IndexType> supportedIndexTypes,
String hostname,
String replicaSet) {
super(name);
checkArgument(hostname != null && !hostname.isEmpty(), "Hostname cannot be null or empty");
checkArgument(cacheSlotState != null, "Cache slot state cannot be null");
checkArgument(updatedTimeEpochMs > 0, "Updated time must be greater than 0");
checkArgument(
supportedIndexTypes != null && !supportedIndexTypes.isEmpty(),
"supported index types shouldn't be empty");
if (cacheSlotState.equals(Metadata.CacheSlotMetadata.CacheSlotState.FREE)) {
checkArgument(
replicaId != null && replicaId.isEmpty(),
Expand All @@ -50,7 +43,6 @@ public CacheSlotMetadata(
this.cacheSlotState = cacheSlotState;
this.replicaId = replicaId;
this.updatedTimeEpochMs = updatedTimeEpochMs;
this.supportedIndexTypes = Collections.unmodifiableList(supportedIndexTypes);
}

public String getHostname() {
Expand All @@ -67,8 +59,7 @@ public boolean equals(Object o) {
if (!hostname.equals(that.hostname)) return false;
if (!Objects.equals(replicaSet, that.replicaSet)) return false;
if (cacheSlotState != that.cacheSlotState) return false;
if (!replicaId.equals(that.replicaId)) return false;
return supportedIndexTypes.equals(that.supportedIndexTypes);
return replicaId.equals(that.replicaId);
}

@Override
Expand All @@ -79,7 +70,6 @@ public int hashCode() {
result = 31 * result + cacheSlotState.hashCode();
result = 31 * result + replicaId.hashCode();
result = 31 * result + (int) (updatedTimeEpochMs ^ (updatedTimeEpochMs >>> 32));
result = 31 * result + supportedIndexTypes.hashCode();
return result;
}

Expand All @@ -99,8 +89,6 @@ public String toString() {
+ '\''
+ ", updatedTimeEpochMs="
+ updatedTimeEpochMs
+ ", supportedIndexTypes="
+ supportedIndexTypes
+ ", name='"
+ name
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ private static CacheSlotMetadata fromCacheSlotMetadataProto(
cacheSlotMetadataProto.getCacheSlotState().name()),
cacheSlotMetadataProto.getReplicaId(),
cacheSlotMetadataProto.getUpdatedTimeEpochMs(),
cacheSlotMetadataProto.getSupportedIndexTypesList(),
cacheSlotMetadataProto.getHostname(),
cacheSlotMetadataProto.getReplicaSet());
}
Expand All @@ -25,7 +24,6 @@ private static Metadata.CacheSlotMetadata toCacheSlotMetadataProto(CacheSlotMeta
.setReplicaId(metadata.replicaId)
.setCacheSlotState(metadata.cacheSlotState)
.setUpdatedTimeEpochMs(metadata.updatedTimeEpochMs)
.addAllSupportedIndexTypes(metadata.supportedIndexTypes)
.setHostname(metadata.hostname)
.setReplicaSet(metadata.replicaSet)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public ListenableFuture<?> updateCacheSlotStateStateWithReplicaId(
newState,
replicaId,
Instant.now().toEpochMilli(),
cacheSlotMetadata.supportedIndexTypes,
cacheSlotMetadata.hostname,
cacheSlotMetadata.replicaSet);
// todo - consider refactoring this to return a completable future instead
Expand Down
Loading

0 comments on commit 04090c5

Please sign in to comment.