Skip to content

Commit

Permalink
Add persistent ephemeral node support to metadata store
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 14, 2024
1 parent bfa3a76 commit 47a3179
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static com.slack.kaldb.util.ArgValidationUtils.ensureTrue;

import com.slack.kaldb.proto.config.KaldbConfigs;
import com.slack.kaldb.util.RuntimeHalterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.curator.RetryPolicy;
Expand Down Expand Up @@ -73,9 +72,10 @@ public static AsyncCuratorFramework build(
&& curatorEvent.getWatchedEvent().getState()
== Watcher.Event.KeeperState.Expired) {
LOG.warn("The ZK session has expired {}.", curatorEvent);
new RuntimeHalterImpl().handleFatal(new Throwable("ZK session expired."));
// new RuntimeHalterImpl().handleFatal(new Throwable("ZK session expired."));
}
});

curator.start();

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
Expand All @@ -35,6 +38,12 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {

private final ZPath zPath;

private final CreateMode createMode;

private final AsyncCuratorFramework curator;

private final ModelSpec<T> modelSpec;

private final CountDownLatch cacheInitialized = new CountDownLatch(1);

protected final ModeledFramework<T> modeledClient;
Expand All @@ -44,17 +53,21 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {
private final Map<KaldbMetadataStoreChangeListener<T>, ModeledCacheListener<T>> listenerMap =
new ConcurrentHashMap<>();

private final Map<String, PersistentNode> persistentNodeMap = new ConcurrentHashMap<>();

public KaldbMetadataStore(
AsyncCuratorFramework curator,
CreateMode createMode,
boolean shouldCache,
ModelSerializer<T> modelSerializer,
String storeFolder) {

this.createMode = createMode;
this.curator = curator;
this.storeFolder = storeFolder;
this.zPath = ZPath.parseWithIds(String.format("%s/{name}", storeFolder));

ModelSpec<T> modelSpec =
this.modelSpec =
ModelSpec.builder(modelSerializer)
.withPath(zPath)
.withCreateOptions(
Expand All @@ -73,8 +86,41 @@ public KaldbMetadataStore(
}

public CompletionStage<String> createAsync(T metadataNode) {
// by passing the version 0, this will throw if we attempt to create and it already exists
return modeledClient.set(metadataNode, 0);
if (createMode == CreateMode.EPHEMERAL) {
String nodePath = resolvePath(metadataNode);
// persistent node already implements NodeDataChanged
PersistentNode node =
new PersistentNode(
curator.unwrap(),
createMode,
false,
nodePath,
modelSpec.serializer().serialize(metadataNode));
persistentNodeMap.put(nodePath, node);
node.start();
// todo - what happens when attempting to create over existing?
return new CompletableFuture<String>()
.completeAsync(
() -> {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
}
});
} else {
// by passing the version 0, this will throw if we attempt to create and it already exists
return modeledClient.set(metadataNode, 0);
}
}

// resolveForSet
private String resolvePath(T model) {
if (modelSpec.path().isResolved()) {
return modelSpec.path().fullPath();
}
return modelSpec.path().resolved(model).fullPath();
}

public void createSync(T metadataNode) {
Expand Down Expand Up @@ -120,7 +166,18 @@ public boolean hasSync(String path) {
}

public CompletionStage<Stat> updateAsync(T metadataNode) {
return modeledClient.update(metadataNode);
PersistentNode node = getPersistentNodeIfExists(metadataNode);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.setData(modelSpec.serializer().serialize(metadataNode));
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
throw new CompletionException(e);
}
} else {
return modeledClient.update(metadataNode);
}
}

public void updateSync(T metadataNode) {
Expand All @@ -134,7 +191,18 @@ public void updateSync(T metadataNode) {
}

public CompletionStage<Void> deleteAsync(String path) {
return modeledClient.withPath(zPath.resolved(path)).delete();
PersistentNode node = removePersistentNodeIfExists(path);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.close();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
throw new CompletionException(e);
}
} else {
return modeledClient.withPath(zPath.resolved(path)).delete();
}
}

public void deleteSync(String path) {
Expand All @@ -146,7 +214,18 @@ public void deleteSync(String path) {
}

public CompletionStage<Void> deleteAsync(T metadataNode) {
return modeledClient.withPath(zPath.resolved(metadataNode)).delete();
PersistentNode node = removePersistentNodeIfExists(metadataNode);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.close();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
throw new CompletionException(e);
}
} else {
return modeledClient.withPath(zPath.resolved(metadataNode)).delete();
}
}

public void deleteSync(T metadataNode) {
Expand Down Expand Up @@ -210,6 +289,18 @@ private void awaitCacheInitialized() {
}
}

private PersistentNode getPersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null);
}

private PersistentNode removePersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.remove(resolvePath(metadataNode));
}

private PersistentNode removePersistentNodeIfExists(String path) {
return persistentNodeMap.remove(path);
}

private ModeledCacheListener<T> getCacheInitializedListener() {
return new ModeledCacheListener<T>() {
@Override
Expand All @@ -227,9 +318,17 @@ public void initialized() {

@Override
public void close() {
persistentNodeMap.forEach(
(_, persistentNode) -> {
try {
persistentNode.close();
} catch (Exception ignored) {
}
});

if (cachedModeledFramework != null) {
listenerMap.forEach(
(kaldbMetadataStoreChangeListener, tModeledCacheListener) ->
(_, tModeledCacheListener) ->
cachedModeledFramework.listenable().removeListener(tModeledCacheListener));
cachedModeledFramework.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.zookeeper.CreateMode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class KaldbMetadataStoreTest {
Expand Down Expand Up @@ -266,7 +265,6 @@ public EphemeralMetadataStore() {
}

@Test
@Disabled("ZK reconnect support currently disabled")
public void testListenersWithZkReconnect() throws Exception {
class TestMetadataStore extends KaldbMetadataStore<TestMetadata> {
public TestMetadataStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.zookeeper.server.EphemeralType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -432,7 +431,6 @@ public EphemeralMetadataStore() {
}

@Test
@Disabled("ZK reconnect support currently disabled")
void testListenersWithZkReconnect() throws Exception {
class TestMetadataStore extends KaldbPartitioningMetadataStore<ExampleMetadata> {
public TestMetadataStore() {
Expand Down

0 comments on commit 47a3179

Please sign in to comment.