Skip to content

Commit

Permalink
Add persistent ephemeral node support to metadata store
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 14, 2024
1 parent bfa3a76 commit d5e4111
Show file tree
Hide file tree
Showing 45 changed files with 638 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public void close() throws IOException {
handleChunkEviction(cacheSlotMetadata);
}
cacheSlotMetadataStore.removeListener(cacheSlotListener);
cacheSlotMetadataStore.close();
// the cacheSlotMetadataStore is a passed param, and shouldn't be closed here
LOG.debug("Closed chunk");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public CachingChunkManager(
protected void startUp() throws Exception {
LOG.info("Starting caching chunk manager");

replicaMetadataStore = new ReplicaMetadataStore(curatorFramework);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework);
searchMetadataStore = new SearchMetadataStore(curatorFramework, false);
cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework);
replicaMetadataStore = new ReplicaMetadataStore(curatorFramework, meterRegistry);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, meterRegistry);
searchMetadataStore = new SearchMetadataStore(curatorFramework, false, meterRegistry);
cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework, meterRegistry);

for (int i = 0; i < slotCountPerInstance; i++) {
chunkList.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ public ListenableFuture<?> getRolloverFuture() {
protected void startUp() throws Exception {
LOG.info("Starting indexing chunk manager");

searchMetadataStore = new SearchMetadataStore(curatorFramework, false);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework);
searchMetadataStore = new SearchMetadataStore(curatorFramework, false, meterRegistry);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, meterRegistry);

stopIngestion = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.slack.kaldb.metadata.core.KaldbPartitioningMetadataStore;
import com.slack.kaldb.proto.metadata.Metadata;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Instant;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.zookeeper.CreateMode;
Expand All @@ -15,12 +16,14 @@ public class CacheSlotMetadataStore extends KaldbPartitioningMetadataStore<Cache
* Initializes a cache slot metadata store at the CACHE_SLOT_ZK_PATH. This should be used to
* create/update the cache slots, and for listening to all cache slot events.
*/
public CacheSlotMetadataStore(AsyncCuratorFramework curatorFramework) throws Exception {
public CacheSlotMetadataStore(AsyncCuratorFramework curatorFramework, MeterRegistry meterRegistry)
throws Exception {
super(
curatorFramework,
CreateMode.EPHEMERAL,
new CacheSlotMetadataSerializer().toModelSerializer(),
CACHE_SLOT_ZK_PATH);
CACHE_SLOT_ZK_PATH,
meterRegistry);
}

/** Update the cache slot state, if the slot is not FREE. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ public static AsyncCuratorFramework build(
&& curatorEvent.getWatchedEvent().getState()
== Watcher.Event.KeeperState.Expired) {
LOG.warn("The ZK session has expired {}.", curatorEvent);
new RuntimeHalterImpl().handleFatal(new Throwable("ZK session expired."));

if (!KaldbMetadataStore.persistentEphemeralModeEnabled()) {
new RuntimeHalterImpl().handleFatal(new Throwable("ZK session expired."));
}
}
});

curator.start();

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.slack.kaldb.util.RuntimeHalterImpl;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.modeled.ModelSerializer;
Expand All @@ -23,18 +28,31 @@
import org.apache.curator.x.async.modeled.cached.ModeledCacheListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* KaldbMetadataStore is a class which provides consistent ZK apis for all the metadata store class.
*
* <p>Every method provides an async and a sync API. In general, use the async API you are
* performing batch operations and a sync if you are performing a synchronous operation on a node.
*
* <p><a href="https://curator.apache.org/docs/recipes-persistent-node">Persistent node recipie</a>
*/
public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(KaldbMetadataStore.class);

public static String PERSISTENT_EPHEMERAL_PROPERTY = "kaldb.metadata.persistentEphemeral";
protected final String storeFolder;

private final ZPath zPath;

private final CreateMode createMode;

private final AsyncCuratorFramework curator;

private final ModelSpec<T> modelSpec;

private final CountDownLatch cacheInitialized = new CountDownLatch(1);

protected final ModeledFramework<T> modeledClient;
Expand All @@ -44,17 +62,26 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {
private final Map<KaldbMetadataStoreChangeListener<T>, ModeledCacheListener<T>> listenerMap =
new ConcurrentHashMap<>();

private final Map<String, PersistentNode> persistentNodeMap = new ConcurrentHashMap<>();

public static final String PERSISTENT_NODE_RECREATED_COUNTER =
"metadata_persistent_node_recreated";
private final Counter persistentNodeRecreatedCounter;

public KaldbMetadataStore(
AsyncCuratorFramework curator,
CreateMode createMode,
boolean shouldCache,
ModelSerializer<T> modelSerializer,
String storeFolder) {
String storeFolder,
MeterRegistry meterRegistry) {

this.createMode = createMode;
this.curator = curator;
this.storeFolder = storeFolder;
this.zPath = ZPath.parseWithIds(String.format("%s/{name}", storeFolder));

ModelSpec<T> modelSpec =
this.modelSpec =
ModelSpec.builder(modelSerializer)
.withPath(zPath)
.withCreateOptions(
Expand All @@ -70,11 +97,54 @@ public KaldbMetadataStore(
} else {
cachedModeledFramework = null;
}

persistentNodeRecreatedCounter = meterRegistry.counter(PERSISTENT_NODE_RECREATED_COUNTER);
LOG.info(
"Persistent ephemeral mode '{}' enabled - {}",
PERSISTENT_EPHEMERAL_PROPERTY,
persistentEphemeralModeEnabled());
}

public static boolean persistentEphemeralModeEnabled() {
return Boolean.parseBoolean(System.getProperty(PERSISTENT_EPHEMERAL_PROPERTY, "true"));
}

public CompletionStage<String> createAsync(T metadataNode) {
// by passing the version 0, this will throw if we attempt to create and it already exists
return modeledClient.set(metadataNode, 0);
if (createMode == CreateMode.EPHEMERAL && persistentEphemeralModeEnabled()) {
String nodePath = resolvePath(metadataNode);
// persistent node already implements NodeDataChanged
PersistentNode node =
new PersistentNode(
curator.unwrap(),
createMode,
false,
nodePath,
modelSpec.serializer().serialize(metadataNode));
persistentNodeMap.put(nodePath, node);
node.start();
// todo - what happens when attempting to create over existing?
return CompletableFuture.supplyAsync(
() -> {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.getListenable().addListener(_ -> persistentNodeRecreatedCounter.increment());
return nodePath;
} catch (Exception e) {
throw new CompletionException(e);
}
});
} else {
// by passing the version 0, this will throw if we attempt to create and it already exists
return modeledClient.set(metadataNode, 0);
}
}

// resolveForSet
private String resolvePath(T model) {
if (modelSpec.path().isResolved()) {
return modelSpec.path().fullPath();
}
return modelSpec.path().resolved(model).fullPath();
}

public void createSync(T metadataNode) {
Expand Down Expand Up @@ -120,7 +190,18 @@ public boolean hasSync(String path) {
}

public CompletionStage<Stat> updateAsync(T metadataNode) {
return modeledClient.update(metadataNode);
PersistentNode node = getPersistentNodeIfExists(metadataNode);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.setData(modelSpec.serializer().serialize(metadataNode));
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
throw new CompletionException(e);
}
} else {
return modeledClient.update(metadataNode);
}
}

public void updateSync(T metadataNode) {
Expand All @@ -134,7 +215,18 @@ public void updateSync(T metadataNode) {
}

public CompletionStage<Void> deleteAsync(String path) {
return modeledClient.withPath(zPath.resolved(path)).delete();
PersistentNode node = removePersistentNodeIfExists(path);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.close();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
throw new CompletionException(e);
}
} else {
return modeledClient.withPath(zPath.resolved(path)).delete();
}
}

public void deleteSync(String path) {
Expand All @@ -146,7 +238,18 @@ public void deleteSync(String path) {
}

public CompletionStage<Void> deleteAsync(T metadataNode) {
return modeledClient.withPath(zPath.resolved(metadataNode)).delete();
PersistentNode node = removePersistentNodeIfExists(metadataNode);
if (node != null) {
try {
node.waitForInitialCreate(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
node.close();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
throw new CompletionException(e);
}
} else {
return modeledClient.withPath(zPath.resolved(metadataNode)).delete();
}
}

public void deleteSync(T metadataNode) {
Expand Down Expand Up @@ -210,6 +313,18 @@ private void awaitCacheInitialized() {
}
}

private PersistentNode getPersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.getOrDefault(resolvePath(metadataNode), null);
}

private PersistentNode removePersistentNodeIfExists(T metadataNode) {
return persistentNodeMap.remove(resolvePath(metadataNode));
}

private PersistentNode removePersistentNodeIfExists(String path) {
return persistentNodeMap.remove(zPath.resolved(path).fullPath());
}

private ModeledCacheListener<T> getCacheInitializedListener() {
return new ModeledCacheListener<T>() {
@Override
Expand All @@ -227,9 +342,17 @@ public void initialized() {

@Override
public void close() {
persistentNodeMap.forEach(
(_, persistentNode) -> {
try {
persistentNode.close();
} catch (Exception ignored) {
}
});

if (cachedModeledFramework != null) {
listenerMap.forEach(
(kaldbMetadataStoreChangeListener, tModeledCacheListener) ->
(_, tModeledCacheListener) ->
cachedModeledFramework.listenable().removeListener(tModeledCacheListener));
cachedModeledFramework.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.google.common.collect.Sets;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -55,11 +56,15 @@ public class KaldbPartitioningMetadataStore<T extends KaldbPartitionedMetadata>
protected final ModelSerializer<T> modelSerializer;
private final Watcher watcher;

private final MeterRegistry meterRegistry;

public KaldbPartitioningMetadataStore(
AsyncCuratorFramework curator,
CreateMode createMode,
ModelSerializer<T> modelSerializer,
String storeFolder) {
String storeFolder,
MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.curator = curator;
this.storeFolder = storeFolder;
this.createMode = createMode;
Expand Down Expand Up @@ -108,7 +113,7 @@ public KaldbPartitioningMetadataStore(
* map, and removes stores that are in the map that no longer exist in ZK.
*
* @see KaldbMetadataStore#KaldbMetadataStore(AsyncCuratorFramework, CreateMode, boolean,
* ModelSerializer, String)
* ModelSerializer, String, MeterRegistry)
*/
private Watcher buildWatcher() {
return event -> {
Expand Down Expand Up @@ -261,7 +266,8 @@ private KaldbMetadataStore<T> getOrCreateMetadataStore(String partition) {
"Creating new metadata store for partition - {}, at path - {}", partition, path);

KaldbMetadataStore<T> newStore =
new KaldbMetadataStore<>(curator, createMode, true, modelSerializer, path);
new KaldbMetadataStore<>(
curator, createMode, true, modelSerializer, path, meterRegistry);
listeners.forEach(newStore::addListener);

return newStore;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package com.slack.kaldb.metadata.dataset;

import com.slack.kaldb.metadata.core.KaldbMetadataStore;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.zookeeper.CreateMode;

public class DatasetMetadataStore extends KaldbMetadataStore<DatasetMetadata> {
// TODO: The path should be dataset, but leaving it as /service for backwards compatibility.
public static final String DATASET_METADATA_STORE_ZK_PATH = "/service";

public DatasetMetadataStore(AsyncCuratorFramework curator, boolean shouldCache) throws Exception {
public DatasetMetadataStore(
AsyncCuratorFramework curator, boolean shouldCache, MeterRegistry meterRegistry)
throws Exception {
super(
curator,
CreateMode.PERSISTENT,
shouldCache,
new DatasetMetadataSerializer().toModelSerializer(),
DATASET_METADATA_STORE_ZK_PATH);
DATASET_METADATA_STORE_ZK_PATH,
meterRegistry);
}
}
Loading

0 comments on commit d5e4111

Please sign in to comment.