Skip to content

Commit

Permalink
Add persistent watcher for persistentNode changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 15, 2024
1 parent 1d8469e commit e4a0c14
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public static AsyncCuratorFramework build(
});

curator.start();

LOG.info(
"Started curator server with the following config zkhost: {}, path prefix: {}, "
+ "connection timeout ms: {}, session timeout ms {} and retry policy {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.framework.recipes.watch.PersistentWatcher;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
Expand All @@ -27,6 +28,7 @@
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {
new ConcurrentHashMap<>();

private final Map<String, PersistentNode> persistentNodeMap = new ConcurrentHashMap<>();
private final Map<String, PersistentWatcher> persistentWatcherMap = new ConcurrentHashMap<>();

public static final String PERSISTENT_NODE_RECREATED_COUNTER =
"metadata_persistent_node_recreated";
Expand Down Expand Up @@ -112,34 +115,85 @@ public static boolean persistentEphemeralModeEnabled() {
public CompletionStage<String> createAsync(T metadataNode) {
if (createMode == CreateMode.EPHEMERAL && persistentEphemeralModeEnabled()) {
String nodePath = resolvePath(metadataNode);
// persistent node already implements NodeDataChanged
PersistentNode node =
new PersistentNode(
curator.unwrap(),
createMode,
false,
nodePath,
modelSpec.serializer().serialize(metadataNode));
persistentNodeMap.put(nodePath, node);
node.start();
// todo - what happens when attempting to create over existing?
return CompletableFuture.supplyAsync(
() -> {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
}
});
return hasAsync(metadataNode.name)
.thenApplyAsync(
(stat) -> {
// it is possible that we have a node that hasn't been yet async persisted to ZK
if (stat != null || persistentNodeMap.containsKey(nodePath)) {
throw new CompletionException(
new IllegalArgumentException(
String.format("Node already exists at '%s'", nodePath)));
}

PersistentNode node =
new PersistentNode(
curator.unwrap(),
createMode,
false,
nodePath,
modelSpec.serializer().serialize(metadataNode));
persistentNodeMap.put(nodePath, node);
node.start();

try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());

// add a persistent watcher for node data changes on this persistent ephemeral
// node this is so when someone else updates a field on the ephemeral node, the
// owner also updates their local copy
PersistentWatcher persistentWatcher =
new PersistentWatcher(curator.unwrap(), node.getActualPath(), false);
persistentWatcher
.getListenable()
.addListener(
event -> {
try {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
node.waitForInitialCreate(
DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
modeledClient
.withPath(ZPath.parse(event.getPath()))
.read()
.thenAcceptAsync(
(updated) -> {
try {
if (node.getActualPath() != null) {
node.setData(
modelSpec.serializer().serialize(updated));
}
} catch (Exception e) {
LOG.error(
"Error attempting to set local node data - fatal ZK error",
e);
new RuntimeHalterImpl().handleFatal(e);
}
});
}
} catch (Exception e) {
LOG.error(
"Error attempting to watch NodeDataChanged - fatal ZK error", e);
new RuntimeHalterImpl().handleFatal(e);
}
});
persistentWatcherMap.put(nodePath, persistentWatcher);
persistentWatcher.start();
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
}
});
} else {
// by passing the version 0, this will throw if we attempt to create and it already exists
return modeledClient.set(metadataNode, 0);
}
}

// resolveForSet
/**
* Based off of the private ModelFrameWorkImp resolveForSet
*
* @see org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl.resolveForSet
*/
private String resolvePath(T model) {
if (modelSpec.path().isResolved()) {
return modelSpec.path().fullPath();
Expand All @@ -158,6 +212,11 @@ public void createSync(T metadataNode) {
}

public CompletionStage<T> getAsync(String path) {
PersistentNode node = getPersistentNodeIfExists(path);
if (node != null) {
return CompletableFuture.supplyAsync(
() -> modelSpec.serializer().deserialize(node.getData()));
}
if (cachedModeledFramework != null) {
return cachedModeledFramework.withPath(zPath.resolved(path)).readThrough();
}
Expand All @@ -173,6 +232,8 @@ public T getSync(String path) {
}

public CompletionStage<Stat> hasAsync(String path) {
// We don't use the persist node here, as we want to get the actual stat details which isn't
// available on the persistentnode
if (cachedModeledFramework != null) {
awaitCacheInitialized();
return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists();
Expand Down Expand Up @@ -317,11 +378,23 @@ private PersistentNode getPersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null);
}

private PersistentNode getPersistentNodeIfExists(String path) {
return persistentNodeMap.getOrDefault(zPath.resolved(path).fullPath(), null);
}

private PersistentNode removePersistentNodeIfExists(T metadataNode) {
PersistentWatcher watcher = persistentWatcherMap.remove(resolvePath(metadataNode));
if (watcher != null) {
watcher.close();
}
return persistentNodeMap.remove(resolvePath(metadataNode));
}

private PersistentNode removePersistentNodeIfExists(String path) {
PersistentWatcher watcher = persistentWatcherMap.remove(zPath.resolved(path).fullPath());
if (watcher != null) {
watcher.close();
}
return persistentNodeMap.remove(zPath.resolved(path).fullPath());
}

Expand All @@ -342,11 +415,21 @@ public void initialized() {

@Override
public void close() {
persistentWatcherMap.forEach(
(_, persistentWatcher) -> {
try {
persistentWatcher.close();
} catch (Exception e) {
LOG.error("Error removing persistent watchers", e);
}
});

persistentNodeMap.forEach(
(_, persistentNode) -> {
try {
persistentNode.close();
} catch (Exception ignored) {
} catch (Exception e) {
LOG.error("Error removing persistent nodes", e);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ public void shouldHandleChunkLivecycle() throws Exception {

// ensure that the chunk was marked LIVE
await().until(() -> KaldbMetadataTestUtils.listSyncUncached(searchMetadataStore).size() == 1);
assertThat(readOnlyChunk.getChunkMetadataState())
.isEqualTo(Metadata.CacheSlotMetadata.CacheSlotState.LIVE);
await()
.until(
() ->
readOnlyChunk.getChunkMetadataState()
== Metadata.CacheSlotMetadata.CacheSlotState.LIVE);

SearchResult<LogMessage> logMessageSearchResult =
readOnlyChunk.query(
Expand Down Expand Up @@ -230,6 +233,10 @@ public void shouldHandleChunkLivecycle() throws Exception {
assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count())
.isEqualTo(0);

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down Expand Up @@ -300,6 +307,10 @@ public void shouldHandleMissingS3Assets() throws Exception {
assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count())
.isEqualTo(1);

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down Expand Up @@ -370,6 +381,10 @@ public void shouldHandleMissingZkData() throws Exception {
assertThat(meterRegistry.get(CHUNK_ASSIGNMENT_TIMER).tag("successful", "false").timer().count())
.isEqualTo(1);

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down Expand Up @@ -473,6 +488,10 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception {
assertThat(files.findFirst().isPresent()).isFalse();
}

cacheSlotMetadataStore.close();
searchMetadataStore.close();
snapshotMetadataStore.close();
replicaMetadataStore.close();
curatorFramework.unwrap().close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void setUp() throws Exception {

@AfterEach
public void tearDown() throws IOException {
store.close();
curatorFramework.unwrap().close();
testingServer.close();
meterRegistry.close();
Expand Down Expand Up @@ -82,6 +83,7 @@ public void testUpdateNonFreeCacheSlotStateSync() throws Exception {
assertThat(cacheSlotMetadata.supportedIndexTypes).isEqualTo(SUPPORTED_INDEX_TYPES);
assertThat(cacheSlotMetadata.hostname).isEqualTo(hostname);

System.out.println("step1");
store.createSync(cacheSlotMetadata);
assertThat(KaldbMetadataTestUtils.listSyncUncached(store).size()).isEqualTo(1);

Expand All @@ -93,13 +95,15 @@ public void testUpdateNonFreeCacheSlotStateSync() throws Exception {
() ->
store.listSync().size() == 1
&& store.listSync().get(0).cacheSlotState == CacheSlotState.LIVE);
System.out.println("step2");
final CacheSlotMetadata liveNode = store.getSync(hostname, name);
assertThat(liveNode.name).isEqualTo(name);
assertThat(liveNode.cacheSlotState).isEqualTo(CacheSlotState.LIVE);
assertThat(liveNode.replicaId).isEqualTo(replicaId);
assertThat(liveNode.updatedTimeEpochMs).isGreaterThan(cacheSlotMetadata.updatedTimeEpochMs);
assertThat(liveNode.supportedIndexTypes).isEqualTo(SUPPORTED_INDEX_TYPES);

System.out.println("step3");
store
.updateNonFreeCacheSlotState(cacheSlotMetadata, CacheSlotState.EVICT)
.get(1, TimeUnit.SECONDS);
Expand All @@ -115,6 +119,7 @@ public void testUpdateNonFreeCacheSlotStateSync() throws Exception {
assertThat(evictNode.updatedTimeEpochMs).isGreaterThan(liveNode.updatedTimeEpochMs);
assertThat(evictNode.supportedIndexTypes).isEqualTo(SUPPORTED_INDEX_TYPES);

System.out.println("step4");
store
.updateNonFreeCacheSlotState(cacheSlotMetadata, CacheSlotState.FREE)
.get(1, TimeUnit.SECONDS);
Expand All @@ -130,7 +135,9 @@ public void testUpdateNonFreeCacheSlotStateSync() throws Exception {
assertThat(freeNode.updatedTimeEpochMs).isGreaterThan(evictNode.updatedTimeEpochMs);
assertThat(freeNode.supportedIndexTypes).isEqualTo(SUPPORTED_INDEX_TYPES);

System.out.println("step5");
// Only non-free states can be set.

assertThatIllegalArgumentException()
.isThrownBy(
() ->
Expand Down
Loading

0 comments on commit e4a0c14

Please sign in to comment.