From f026f94f529796b9fb4600f98054eac34ac4171d Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Tue, 6 Aug 2024 13:36:50 -0700 Subject: [PATCH] Add optional partition filters for ZK partitioning store --- .../chunkManager/CachingChunkManager.java | 2 +- .../cache/CacheNodeAssignmentStore.java | 11 +++ .../core/AstraPartitioningMetadataStore.java | 61 +++++++++++++--- .../AstraPartitioningMetadataStoreTest.java | 72 +++++++++++++++++-- 4 files changed, 130 insertions(+), 16 deletions(-) diff --git a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java index 3066a2593a..dd20758331 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java @@ -88,7 +88,7 @@ protected void startUp() throws Exception { snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); searchMetadataStore = new SearchMetadataStore(curatorFramework, false); cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); - cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework); + cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework, cacheNodeId); cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework); if (Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) { diff --git a/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java b/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java index f91300cae5..56b83eeecf 100644 --- a/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java @@ -4,6 +4,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.slack.astra.metadata.core.AstraPartitioningMetadataStore; import com.slack.astra.proto.metadata.Metadata; +import java.util.List; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.zookeeper.CreateMode; @@ -18,6 +19,16 @@ public CacheNodeAssignmentStore(AsyncCuratorFramework curator) { CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH); } + /** Restricts the cache node assignment store to only watching events for cacheNodeId */ + public CacheNodeAssignmentStore(AsyncCuratorFramework curator, String cacheNodeId) { + super( + curator, + CreateMode.PERSISTENT, + new CacheNodeAssignmentSerializer().toModelSerializer(), + CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH, + List.of(cacheNodeId)); + } + public ListenableFuture updateAssignmentState( final CacheNodeAssignment cacheNodeAssignment, final Metadata.CacheNodeAssignment.CacheNodeAssignmentState state) { diff --git a/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java index ed71f22059..b08b3800c4 100644 --- a/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java @@ -54,17 +54,28 @@ public class AstraPartitioningMetadataStore private final CreateMode createMode; protected final ModelSerializer modelSerializer; private final Watcher watcher; + private final List partitionFilters; public AstraPartitioningMetadataStore( AsyncCuratorFramework curator, CreateMode createMode, ModelSerializer modelSerializer, String storeFolder) { + this(curator, createMode, modelSerializer, storeFolder, List.of()); + } + + public AstraPartitioningMetadataStore( + AsyncCuratorFramework curator, + CreateMode createMode, + ModelSerializer modelSerializer, + String storeFolder, + List partitionFilters) { this.curator = curator; this.storeFolder = storeFolder; this.createMode = createMode; this.modelSerializer = modelSerializer; this.watcher = buildWatcher(); + this.partitionFilters = partitionFilters; // register watchers for when partitions are added or removed curator @@ -88,15 +99,32 @@ public AstraPartitioningMetadataStore( return CompletableFuture.failedFuture(throwable); } }) - .thenAccept((children) -> children.forEach(this::getOrCreateMetadataStore)) + .thenAccept( + (children) -> { + if (partitionFilters.isEmpty()) { + children.forEach(this::getOrCreateMetadataStore); + } else { + children.stream() + .filter(partitionFilters::contains) + .forEach(this::getOrCreateMetadataStore); + } + }) .toCompletableFuture() // wait for all the stores to be initialized prior to exiting the constructor .join(); - LOG.info( - "The metadata store for folder '{}' was initialized with {} partitions", - storeFolder, - metadataStoreMap.size()); + if (partitionFilters.isEmpty()) { + LOG.info( + "The metadata store for folder '{}' was initialized with {} partitions", + storeFolder, + metadataStoreMap.size()); + } else { + LOG.info( + "The metadata store for folder '{}' was initialized with {} partitions (using partition filters: {})", + storeFolder, + metadataStoreMap.size(), + String.join(",", partitionFilters)); + } } /** @@ -118,8 +146,14 @@ private Watcher buildWatcher() { .forPath(storeFolder) .thenAcceptAsync( (partitions) -> { - // create internal stores foreach partition that do not already exist - partitions.forEach(this::getOrCreateMetadataStore); + if (partitionFilters.isEmpty()) { + // create internal stores foreach partition that do not already exist + partitions.forEach(this::getOrCreateMetadataStore); + } else { + partitions.stream() + .filter(partitionFilters::contains) + .forEach(this::getOrCreateMetadataStore); + } // remove metadata stores that exist in memory but no longer exist on ZK Set partitionsToRemove = @@ -253,6 +287,15 @@ public List listSync() { } private AstraMetadataStore getOrCreateMetadataStore(String partition) { + if (!partitionFilters.isEmpty() && !partitionFilters.contains(partition)) { + LOG.error( + "Partitioning metadata store attempted to use partition {}, filters restricted to {}", + partition, + String.join(",", partitionFilters)); + throw new InternalMetadataStoreException( + "Partitioning metadata store using filters that does not include provided partition"); + } + return metadataStoreMap.computeIfAbsent( partition, (p1) -> { @@ -289,12 +332,12 @@ public void addListener(AstraMetadataStoreChangeListener watcher) { // add this watcher to the list for new stores to add listeners.add(watcher); // add this watcher to existing stores - metadataStoreMap.forEach((partition, store) -> store.addListener(watcher)); + metadataStoreMap.forEach((_, store) -> store.addListener(watcher)); } public void removeListener(AstraMetadataStoreChangeListener watcher) { listeners.remove(watcher); - metadataStoreMap.forEach((partition, store) -> store.removeListener(watcher)); + metadataStoreMap.forEach((_, store) -> store.removeListener(watcher)); } @Override diff --git a/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java b/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java index c26635ef78..822c20cb1e 100644 --- a/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java +++ b/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java @@ -2,6 +2,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import com.fasterxml.jackson.annotation.JsonCreator; @@ -112,17 +113,22 @@ public ExampleMetadata fromJsonStr(String data) { static class FixedPartitionExampleMetadata extends AstraPartitionedMetadata { + private final String partitionId; private String extraField = null; - public FixedPartitionExampleMetadata(String name) { + public FixedPartitionExampleMetadata(String name, String partitionId) { super(name); + this.partitionId = partitionId; } @JsonCreator public FixedPartitionExampleMetadata( - @JsonProperty("name") String name, @JsonProperty("extraField") String extraField) { + @JsonProperty("name") String name, + @JsonProperty("extraField") String extraField, + @JsonProperty("partitionId") String partitionId) { super(name); this.extraField = extraField; + this.partitionId = partitionId; } public void setExtraField(String extraField) { @@ -137,7 +143,7 @@ public String getExtraField() { @JsonIgnore public String getPartition() { // use a fixed partition - return "1"; + return partitionId; } @Override @@ -640,16 +646,18 @@ public TestMetadataStore() { store.addListener(beforeListener); AtomicInteger afterCounter = new AtomicInteger(0); - FixedPartitionExampleMetadata metadata0 = new FixedPartitionExampleMetadata("foo0", "val1"); + FixedPartitionExampleMetadata metadata0 = + new FixedPartitionExampleMetadata("foo0", "val1", "1"); store.createSync(metadata0); - FixedPartitionExampleMetadata metadata1 = new FixedPartitionExampleMetadata("foo1", "val1"); + FixedPartitionExampleMetadata metadata1 = + new FixedPartitionExampleMetadata("foo1", "val1", "1"); store.createSync(metadata1); // create metadata for (int i = 2; i < 10; i++) { FixedPartitionExampleMetadata otherMetadata = - new FixedPartitionExampleMetadata("foo" + i, "val1"); + new FixedPartitionExampleMetadata("foo" + i, "val1", "1"); store.createSync(otherMetadata); } @@ -678,4 +686,56 @@ public TestMetadataStore() { await().until(afterCounter::get, (value) -> value == 2); } } + + @Test + void testPartitionFilters() throws Exception { + final String partitionStoreFolder = "/test_partition_filters"; + + class TestMetadataStore extends AstraPartitioningMetadataStore { + public TestMetadataStore() { + super( + curatorFramework, + CreateMode.PERSISTENT, + new FixedPartitionMetadataSerializer().toModelSerializer(), + partitionStoreFolder); + } + } + + class FilteredTestMetadataStore + extends AstraPartitioningMetadataStore { + public FilteredTestMetadataStore() { + super( + curatorFramework, + CreateMode.PERSISTENT, + new FixedPartitionMetadataSerializer().toModelSerializer(), + partitionStoreFolder, + List.of("1", "2", "4")); + } + } + + try (AstraPartitioningMetadataStore store = + new TestMetadataStore()) { + store.createSync(new FixedPartitionExampleMetadata("example1", "1")); + store.createSync(new FixedPartitionExampleMetadata("example2", "2")); + store.createSync(new FixedPartitionExampleMetadata("example3", "3")); + store.createSync(new FixedPartitionExampleMetadata("example4", "3")); + } + + AtomicInteger counter = new AtomicInteger(0); + try (AstraPartitioningMetadataStore store = + new FilteredTestMetadataStore()) { + store.addListener(_ -> counter.incrementAndGet()); + + store.createSync(new FixedPartitionExampleMetadata("example5", "1")); + await().until(() -> counter.get() >= 1); + + assertThatThrownBy( + () -> store.createSync(new FixedPartitionExampleMetadata("example6", "3"))); + + // example1, example2, and example 5 + assertThat(store.listSync().size()).isEqualTo(3); + + store.createSync(new FixedPartitionExampleMetadata("example7", "4")); + } + } }