Skip to content

Commit

Permalink
Add persistent watcher for persistentNode changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 15, 2024
1 parent 1d8469e commit b371940
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.framework.recipes.watch.PersistentWatcher;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
Expand All @@ -27,6 +28,7 @@
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {
new ConcurrentHashMap<>();

private final Map<String, PersistentNode> persistentNodeMap = new ConcurrentHashMap<>();
private final Map<String, PersistentWatcher> persistentWatcherMap = new ConcurrentHashMap<>();

public static final String PERSISTENT_NODE_RECREATED_COUNTER =
"metadata_persistent_node_recreated";
Expand Down Expand Up @@ -112,7 +115,6 @@ public static boolean persistentEphemeralModeEnabled() {
public CompletionStage<String> createAsync(T metadataNode) {
if (createMode == CreateMode.EPHEMERAL && persistentEphemeralModeEnabled()) {
String nodePath = resolvePath(metadataNode);
// persistent node already implements NodeDataChanged
PersistentNode node =
new PersistentNode(
curator.unwrap(),
Expand All @@ -128,6 +130,42 @@ public CompletionStage<String> createAsync(T metadataNode) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());

// add a persistent watcher for node data changes on this persistent ephemeral node
// this is so when someone else updates a field on the ephemeral node, the owner also
// updates their local copy
PersistentWatcher persistentWatcher =
new PersistentWatcher(curator.unwrap(), node.getActualPath(), false);
persistentWatcher
.getListenable()
.addListener(
event -> {
try {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
modeledClient
.withPath(ZPath.parse(event.getPath()))
.read()
.thenAccept(
(updated) -> {
try {
node.setData(modelSpec.serializer().serialize(updated));
} catch (Exception e) {
LOG.error(
"Error attempting to set local node data - fatal ZK error",
e);
new RuntimeHalterImpl().handleFatal(e);
}
});
}
} catch (Exception e) {
LOG.error(
"Error attempting to watch NodeDataChanged - fatal ZK error", e);
new RuntimeHalterImpl().handleFatal(e);
}
});
persistentWatcherMap.put(nodePath, persistentWatcher);
persistentWatcher.start();
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
Expand Down Expand Up @@ -158,6 +196,11 @@ public void createSync(T metadataNode) {
}

public CompletionStage<T> getAsync(String path) {
PersistentNode node = getPersistentNodeIfExists(path);
if (node != null) {
return CompletableFuture.supplyAsync(
() -> modelSpec.serializer().deserialize(node.getData()));
}
if (cachedModeledFramework != null) {
return cachedModeledFramework.withPath(zPath.resolved(path)).readThrough();
}
Expand All @@ -173,6 +216,12 @@ public T getSync(String path) {
}

public CompletionStage<Stat> hasAsync(String path) {
PersistentNode node = getPersistentNodeIfExists(path);
if (node != null) {
// we return an empty stat so that we can distinguish between not there (null) and from our
// local cache
return CompletableFuture.supplyAsync(Stat::new);
}
if (cachedModeledFramework != null) {
awaitCacheInitialized();
return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists();
Expand Down Expand Up @@ -317,11 +366,17 @@ private PersistentNode getPersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null);
}

private PersistentNode getPersistentNodeIfExists(String path) {
return persistentNodeMap.getOrDefault(zPath.resolved(path).fullPath(), null);
}

private PersistentNode removePersistentNodeIfExists(T metadataNode) {
persistentWatcherMap.remove(resolvePath(metadataNode)).close();
return persistentNodeMap.remove(resolvePath(metadataNode));
}

private PersistentNode removePersistentNodeIfExists(String path) {
persistentWatcherMap.remove(zPath.resolved(path).fullPath()).close();
return persistentNodeMap.remove(zPath.resolved(path).fullPath());
}

Expand All @@ -342,6 +397,14 @@ public void initialized() {

@Override
public void close() {
persistentWatcherMap.forEach(
(_, persistentWatcher) -> {
try {
persistentWatcher.close();
} catch (Exception ignored) {
}
});

persistentNodeMap.forEach(
(_, persistentNode) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.RetryPolicy;
Expand All @@ -31,7 +32,6 @@
import org.junit.jupiter.api.Test;

public class KaldbMetadataStoreTest {

private TestingServer testingServer;
private MeterRegistry meterRegistry;

Expand Down Expand Up @@ -712,4 +712,67 @@ public TestMetadataStore() {

curator.close();
}

@Test
public void testWhenPersistentEphemeralIsUpdatedByAnotherCurator()
throws ExecutionException, InterruptedException {
System.setProperty(KaldbMetadataStore.PERSISTENT_EPHEMERAL_PROPERTY, "true");
RetryPolicy retryPolicy = new RetryNTimes(1, 10);
CuratorFramework curator1 =
CuratorFrameworkFactory.builder()
.connectString(zookeeperConfig.getZkConnectString())
.namespace(zookeeperConfig.getZkPathPrefix())
.connectionTimeoutMs(50)
.sessionTimeoutMs(50)
.retryPolicy(retryPolicy)
.build();
curator1.start();
AsyncCuratorFramework asyncCuratorFramework1 = AsyncCuratorFramework.wrap(curator1);

CuratorFramework curator2 =
CuratorFrameworkFactory.builder()
.connectString(zookeeperConfig.getZkConnectString())
.namespace(zookeeperConfig.getZkPathPrefix())
.connectionTimeoutMs(50)
.sessionTimeoutMs(50)
.retryPolicy(retryPolicy)
.build();
curator2.start();
AsyncCuratorFramework asyncCuratorFramework2 = AsyncCuratorFramework.wrap(curator2);

class TestMetadataStore extends KaldbMetadataStore<TestMetadata> {
public TestMetadataStore(AsyncCuratorFramework curator) {
super(
curator,
CreateMode.EPHEMERAL,
true,
new JacksonModelSerializer<>(TestMetadata.class),
STORE_FOLDER,
meterRegistry);
}
}

TestMetadata metadata1 = new TestMetadata("foo", "val1");
try (KaldbMetadataStore<TestMetadata> store1 = new TestMetadataStore(asyncCuratorFramework1)) {
store1.createSync(metadata1);

try (KaldbMetadataStore<TestMetadata> store2 =
new TestMetadataStore(asyncCuratorFramework2)) {
// curator2 updates the value
TestMetadata metadata2 = store2.getSync(metadata1.name);
metadata2.value = "val2";
store2.updateSync(metadata2);
}

// curator1 should pickup the new update
await().until(() -> store1.getSync(metadata1.name).getValue().equals("val2"));
// hasAsync should return an empty Stat, indicating that it was fetched from the
// persistentNode
assertThat(store1.hasAsync(metadata1.name).toCompletableFuture().get()).isEqualTo(new Stat());
assertThat(store1.hasSync(metadata1.name)).isTrue();
}

curator2.close();
curator1.close();
}
}

0 comments on commit b371940

Please sign in to comment.