From b3719403247c4c68d32bb90d2dcc2f0e38228dc5 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Thu, 15 Feb 2024 09:59:55 -0700 Subject: [PATCH] Add persistent watcher for persistentNode changes --- .../metadata/core/KaldbMetadataStore.java | 65 ++++++++++++++++++- .../metadata/core/KaldbMetadataStoreTest.java | 65 ++++++++++++++++++- 2 files changed, 128 insertions(+), 2 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java index b685e9261f..838ca5fd01 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.curator.framework.recipes.nodes.PersistentNode; +import org.apache.curator.framework.recipes.watch.PersistentWatcher; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.ModelSerializer; @@ -27,6 +28,7 @@ import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +65,7 @@ public class KaldbMetadataStore implements Closeable { new ConcurrentHashMap<>(); private final Map persistentNodeMap = new ConcurrentHashMap<>(); + private final Map persistentWatcherMap = new ConcurrentHashMap<>(); public static final String PERSISTENT_NODE_RECREATED_COUNTER = "metadata_persistent_node_recreated"; @@ -112,7 +115,6 @@ public static boolean persistentEphemeralModeEnabled() { public CompletionStage createAsync(T metadataNode) { if (createMode == CreateMode.EPHEMERAL && persistentEphemeralModeEnabled()) { String nodePath = resolvePath(metadataNode); - // persistent node already implements NodeDataChanged PersistentNode node = new PersistentNode( curator.unwrap(), @@ -128,6 +130,42 @@ public CompletionStage createAsync(T metadataNode) { try { node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment()); + + // add a persistent watcher for node data changes on this persistent ephemeral node + // this is so when someone else updates a field on the ephemeral node, the owner also + // updates their local copy + PersistentWatcher persistentWatcher = + new PersistentWatcher(curator.unwrap(), node.getActualPath(), false); + persistentWatcher + .getListenable() + .addListener( + event -> { + try { + if (event.getType() == Watcher.Event.EventType.NodeDataChanged) { + node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + modeledClient + .withPath(ZPath.parse(event.getPath())) + .read() + .thenAccept( + (updated) -> { + try { + node.setData(modelSpec.serializer().serialize(updated)); + } catch (Exception e) { + LOG.error( + "Error attempting to set local node data - fatal ZK error", + e); + new RuntimeHalterImpl().handleFatal(e); + } + }); + } + } catch (Exception e) { + LOG.error( + "Error attempting to watch NodeDataChanged - fatal ZK error", e); + new RuntimeHalterImpl().handleFatal(e); + } + }); + persistentWatcherMap.put(nodePath, persistentWatcher); + persistentWatcher.start(); return nodePath; } catch (Exception e) { throw new CompletionException(e); @@ -158,6 +196,11 @@ public void createSync(T metadataNode) { } public CompletionStage getAsync(String path) { + PersistentNode node = getPersistentNodeIfExists(path); + if (node != null) { + return CompletableFuture.supplyAsync( + () -> modelSpec.serializer().deserialize(node.getData())); + } if (cachedModeledFramework != null) { return cachedModeledFramework.withPath(zPath.resolved(path)).readThrough(); } @@ -173,6 +216,12 @@ public T getSync(String path) { } public CompletionStage hasAsync(String path) { + PersistentNode node = getPersistentNodeIfExists(path); + if (node != null) { + // we return an empty stat so that we can distinguish between not there (null) and from our + // local cache + return CompletableFuture.supplyAsync(Stat::new); + } if (cachedModeledFramework != null) { awaitCacheInitialized(); return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists(); @@ -317,11 +366,17 @@ private PersistentNode getPersistentNodeIfExists(T metadataNode) { return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null); } + private PersistentNode getPersistentNodeIfExists(String path) { + return persistentNodeMap.getOrDefault(zPath.resolved(path).fullPath(), null); + } + private PersistentNode removePersistentNodeIfExists(T metadataNode) { + persistentWatcherMap.remove(resolvePath(metadataNode)).close(); return persistentNodeMap.remove(resolvePath(metadataNode)); } private PersistentNode removePersistentNodeIfExists(String path) { + persistentWatcherMap.remove(zPath.resolved(path).fullPath()).close(); return persistentNodeMap.remove(zPath.resolved(path).fullPath()); } @@ -342,6 +397,14 @@ public void initialized() { @Override public void close() { + persistentWatcherMap.forEach( + (_, persistentWatcher) -> { + try { + persistentWatcher.close(); + } catch (Exception ignored) { + } + }); + persistentNodeMap.forEach( (_, persistentNode) -> { try { diff --git a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java index 11524b56bd..1931be9022 100644 --- a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java @@ -14,6 +14,7 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.curator.RetryPolicy; @@ -31,7 +32,6 @@ import org.junit.jupiter.api.Test; public class KaldbMetadataStoreTest { - private TestingServer testingServer; private MeterRegistry meterRegistry; @@ -712,4 +712,67 @@ public TestMetadataStore() { curator.close(); } + + @Test + public void testWhenPersistentEphemeralIsUpdatedByAnotherCurator() + throws ExecutionException, InterruptedException { + System.setProperty(KaldbMetadataStore.PERSISTENT_EPHEMERAL_PROPERTY, "true"); + RetryPolicy retryPolicy = new RetryNTimes(1, 10); + CuratorFramework curator1 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator1.start(); + AsyncCuratorFramework asyncCuratorFramework1 = AsyncCuratorFramework.wrap(curator1); + + CuratorFramework curator2 = + CuratorFrameworkFactory.builder() + .connectString(zookeeperConfig.getZkConnectString()) + .namespace(zookeeperConfig.getZkPathPrefix()) + .connectionTimeoutMs(50) + .sessionTimeoutMs(50) + .retryPolicy(retryPolicy) + .build(); + curator2.start(); + AsyncCuratorFramework asyncCuratorFramework2 = AsyncCuratorFramework.wrap(curator2); + + class TestMetadataStore extends KaldbMetadataStore { + public TestMetadataStore(AsyncCuratorFramework curator) { + super( + curator, + CreateMode.EPHEMERAL, + true, + new JacksonModelSerializer<>(TestMetadata.class), + STORE_FOLDER, + meterRegistry); + } + } + + TestMetadata metadata1 = new TestMetadata("foo", "val1"); + try (KaldbMetadataStore store1 = new TestMetadataStore(asyncCuratorFramework1)) { + store1.createSync(metadata1); + + try (KaldbMetadataStore store2 = + new TestMetadataStore(asyncCuratorFramework2)) { + // curator2 updates the value + TestMetadata metadata2 = store2.getSync(metadata1.name); + metadata2.value = "val2"; + store2.updateSync(metadata2); + } + + // curator1 should pickup the new update + await().until(() -> store1.getSync(metadata1.name).getValue().equals("val2")); + // hasAsync should return an empty Stat, indicating that it was fetched from the + // persistentNode + assertThat(store1.hasAsync(metadata1.name).toCompletableFuture().get()).isEqualTo(new Stat()); + assertThat(store1.hasSync(metadata1.name)).isTrue(); + } + + curator2.close(); + curator1.close(); + } }