Skip to content

Commit

Permalink
Make ZK Cache Init Timeout Configurable (#1153)
Browse files Browse the repository at this point in the history
  • Loading branch information
ermontross authored Nov 11, 2024
1 parent b344875 commit 713a6fb
Show file tree
Hide file tree
Showing 51 changed files with 480 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {

private final MeterRegistry meterRegistry;
private final AsyncCuratorFramework curatorFramework;
private final AstraConfigs.ZookeeperConfig zkConfig;
private final BlobStore blobStore;
private final SearchContext searchContext;
private final String s3Bucket;
Expand All @@ -68,6 +69,7 @@ public class CachingChunkManager<T> extends ChunkManagerBase<T> {
public CachingChunkManager(
MeterRegistry registry,
AsyncCuratorFramework curatorFramework,
AstraConfigs.ZookeeperConfig zkConfig,
BlobStore blobStore,
SearchContext searchContext,
String s3Bucket,
Expand All @@ -77,6 +79,7 @@ public CachingChunkManager(
long capacityBytes) {
this.meterRegistry = registry;
this.curatorFramework = curatorFramework;
this.zkConfig = zkConfig;
this.blobStore = blobStore;
this.searchContext = searchContext;
this.s3Bucket = s3Bucket;
Expand All @@ -91,12 +94,13 @@ public CachingChunkManager(
protected void startUp() throws Exception {
LOG.info("Starting caching chunk manager");

replicaMetadataStore = new ReplicaMetadataStore(curatorFramework);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework);
searchMetadataStore = new SearchMetadataStore(curatorFramework, false);
cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework);
cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework, cacheNodeId);
cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework);
replicaMetadataStore = new ReplicaMetadataStore(curatorFramework, zkConfig);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig);
searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false);
cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework, zkConfig);
cacheNodeAssignmentStore =
new CacheNodeAssignmentStore(curatorFramework, zkConfig, cacheNodeId);
cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework, zkConfig);

if (Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) {
cacheNodeAssignmentStore.addListener(cacheNodeAssignmentChangeListener);
Expand Down Expand Up @@ -158,13 +162,15 @@ protected void shutDown() throws Exception {
public static CachingChunkManager<LogMessage> fromConfig(
MeterRegistry meterRegistry,
AsyncCuratorFramework curatorFramework,
AstraConfigs.ZookeeperConfig zkConfig,
AstraConfigs.S3Config s3Config,
AstraConfigs.CacheConfig cacheConfig,
BlobStore blobStore)
throws Exception {
return new CachingChunkManager<>(
meterRegistry,
curatorFramework,
zkConfig,
blobStore,
SearchContext.fromConfig(cacheConfig.getServerConfig()),
s3Config.getS3Bucket(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class IndexingChunkManager<T> extends ChunkManagerBase<T> {
private final AsyncCuratorFramework curatorFramework;
private final SearchContext searchContext;
private final AstraConfigs.IndexerConfig indexerConfig;
private final AstraConfigs.ZookeeperConfig zkConfig;
private ReadWriteChunk<T> activeChunk;

private final MeterRegistry meterRegistry;
Expand Down Expand Up @@ -120,7 +121,8 @@ public IndexingChunkManager(
ListeningExecutorService rolloverExecutorService,
AsyncCuratorFramework curatorFramework,
SearchContext searchContext,
AstraConfigs.IndexerConfig indexerConfig) {
AstraConfigs.IndexerConfig indexerConfig,
AstraConfigs.ZookeeperConfig zkConfig) {

ensureNonNullString(dataDirectory, "The data directory shouldn't be empty");
this.dataDirectory = new File(dataDirectory);
Expand All @@ -138,6 +140,7 @@ public IndexingChunkManager(
this.curatorFramework = curatorFramework;
this.searchContext = searchContext;
this.indexerConfig = indexerConfig;
this.zkConfig = zkConfig;

stopIngestion = true;
activeChunk = null;
Expand Down Expand Up @@ -384,8 +387,8 @@ public ListenableFuture<?> getRolloverFuture() {
protected void startUp() throws Exception {
LOG.info("Starting indexing chunk manager");

searchMetadataStore = new SearchMetadataStore(curatorFramework, false);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework);
searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false);
snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig);

stopIngestion = false;
}
Expand Down Expand Up @@ -444,6 +447,7 @@ public static IndexingChunkManager<LogMessage> fromConfig(
MeterRegistry meterRegistry,
AsyncCuratorFramework curatorFramework,
AstraConfigs.IndexerConfig indexerConfig,
AstraConfigs.ZookeeperConfig zkConfig,
BlobStore blobStore,
AstraConfigs.S3Config s3Config) {

Expand All @@ -459,6 +463,7 @@ public static IndexingChunkManager<LogMessage> fromConfig(
makeDefaultRollOverExecutor(),
curatorFramework,
SearchContext.fromConfig(indexerConfig.getServerConfig()),
indexerConfig);
indexerConfig,
zkConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.slack.astra.metadata.core.AstraPartitioningMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.proto.metadata.Metadata;
import java.util.List;
import org.apache.curator.x.async.AsyncCuratorFramework;
Expand All @@ -11,18 +12,22 @@
public class CacheNodeAssignmentStore extends AstraPartitioningMetadataStore<CacheNodeAssignment> {
public static final String CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH = "/cacheAssignment";

public CacheNodeAssignmentStore(AsyncCuratorFramework curator) {
public CacheNodeAssignmentStore(
AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig) {
super(
curator,
zkConfig,
CreateMode.PERSISTENT,
new CacheNodeAssignmentSerializer().toModelSerializer(),
CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH);
}

/** Restricts the cache node assignment store to only watching events for cacheNodeId */
public CacheNodeAssignmentStore(AsyncCuratorFramework curator, String cacheNodeId) {
public CacheNodeAssignmentStore(
AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig, String cacheNodeId) {
super(
curator,
zkConfig,
CreateMode.PERSISTENT,
new CacheNodeAssignmentSerializer().toModelSerializer(),
CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package com.slack.astra.metadata.cache;

import com.slack.astra.metadata.core.AstraMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.zookeeper.CreateMode;

public class CacheNodeMetadataStore extends AstraMetadataStore<CacheNodeMetadata> {
public static final String CACHE_NODE_METADATA_STORE_ZK_PATH = "/cacheNodes";

public CacheNodeMetadataStore(AsyncCuratorFramework curator) {
public CacheNodeMetadataStore(
AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig) {
super(
curator,
zkConfig,
CreateMode.EPHEMERAL,
true,
new CacheNodeMetadataSerializer().toModelSerializer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.slack.astra.metadata.core.AstraPartitioningMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.proto.metadata.Metadata;
import java.time.Instant;
import org.apache.curator.x.async.AsyncCuratorFramework;
Expand All @@ -15,9 +16,12 @@ public class CacheSlotMetadataStore extends AstraPartitioningMetadataStore<Cache
* Initializes a cache slot metadata store at the CACHE_SLOT_ZK_PATH. This should be used to
* create/update the cache slots, and for listening to all cache slot events.
*/
public CacheSlotMetadataStore(AsyncCuratorFramework curatorFramework) throws Exception {
public CacheSlotMetadataStore(
AsyncCuratorFramework curatorFramework, AstraConfigs.ZookeeperConfig zkConfig)
throws Exception {
super(
curatorFramework,
zkConfig,
CreateMode.EPHEMERAL,
new CacheSlotMetadataSerializer().toModelSerializer(),
CACHE_SLOT_ZK_PATH);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.slack.astra.metadata.core;

import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.util.RuntimeHalterImpl;
import java.io.Closeable;
import java.util.List;
Expand Down Expand Up @@ -50,15 +49,19 @@ public class AstraMetadataStore<T extends AstraMetadata> implements Closeable {
private final ExecutorService cacheInitializedService;
private final ModeledCacheListener<T> initializedListener = getCacheInitializedListener();

private final AstraConfigs.ZookeeperConfig zkConfig;

public AstraMetadataStore(
AsyncCuratorFramework curator,
AstraConfigs.ZookeeperConfig zkConfig,
CreateMode createMode,
boolean shouldCache,
ModelSerializer<T> modelSerializer,
String storeFolder) {

this.storeFolder = storeFolder;
this.zPath = ZPath.parseWithIds(String.format("%s/{name}", storeFolder));
this.zkConfig = zkConfig;

ModelSpec<T> modelSpec =
ModelSpec.builder(modelSerializer)
Expand Down Expand Up @@ -91,7 +94,7 @@ public void createSync(T metadataNode) {
try {
createAsync(metadataNode)
.toCompletableFuture()
.get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
.get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new InternalMetadataStoreException("Error creating node " + metadataNode, e);
}
Expand All @@ -106,7 +109,9 @@ public CompletionStage<T> getAsync(String path) {

public T getSync(String path) {
try {
return getAsync(path).toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
return getAsync(path)
.toCompletableFuture()
.get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new InternalMetadataStoreException("Error fetching node at path " + path, e);
}
Expand All @@ -122,7 +127,9 @@ public CompletionStage<Stat> hasAsync(String path) {

public boolean hasSync(String path) {
try {
return hasAsync(path).toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS)
return hasAsync(path)
.toCompletableFuture()
.get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS)
!= null;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new InternalMetadataStoreException("Error fetching node at path " + path, e);
Expand All @@ -137,7 +144,7 @@ public void updateSync(T metadataNode) {
try {
updateAsync(metadataNode)
.toCompletableFuture()
.get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
.get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new InternalMetadataStoreException("Error updating node: " + metadataNode, e);
}
Expand All @@ -149,7 +156,9 @@ public CompletionStage<Void> deleteAsync(String path) {

public void deleteSync(String path) {
try {
deleteAsync(path).toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
deleteAsync(path)
.toCompletableFuture()
.get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw new InternalMetadataStoreException("Error deleting node under at path: " + path, e);
}
Expand All @@ -163,7 +172,7 @@ public void deleteSync(T metadataNode) {
try {
deleteAsync(metadataNode)
.toCompletableFuture()
.get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
.get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw new InternalMetadataStoreException(
"Error deleting node under at path: " + metadataNode.name, e);
Expand All @@ -181,7 +190,9 @@ public CompletionStage<List<T>> listAsync() {

public List<T> listSync() {
try {
return listAsync().toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS);
return listAsync()
.toCompletableFuture()
.get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new InternalMetadataStoreException("Error getting cached nodes", e);
}
Expand Down Expand Up @@ -214,7 +225,7 @@ public void removeListener(AstraMetadataStoreChangeListener<T> watcher) {

private void awaitCacheInitialized() {
try {
if (!cacheInitialized.await(30, TimeUnit.SECONDS)) {
if (!cacheInitialized.await(zkConfig.getZkCacheInitTimeoutMs(), TimeUnit.MILLISECONDS)) {
// in the event we deadlock, go ahead and time this out at 30s and restart the pod
new RuntimeHalterImpl()
.handleFatal(
Expand Down
Loading

0 comments on commit 713a6fb

Please sign in to comment.