diff --git a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java index 030dee047e..f706cc6aa4 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/CachingChunkManager.java @@ -42,6 +42,7 @@ public class CachingChunkManager extends ChunkManagerBase { private final MeterRegistry meterRegistry; private final AsyncCuratorFramework curatorFramework; + private final AstraConfigs.ZookeeperConfig zkConfig; private final BlobStore blobStore; private final SearchContext searchContext; private final String s3Bucket; @@ -68,6 +69,7 @@ public class CachingChunkManager extends ChunkManagerBase { public CachingChunkManager( MeterRegistry registry, AsyncCuratorFramework curatorFramework, + AstraConfigs.ZookeeperConfig zkConfig, BlobStore blobStore, SearchContext searchContext, String s3Bucket, @@ -77,6 +79,7 @@ public CachingChunkManager( long capacityBytes) { this.meterRegistry = registry; this.curatorFramework = curatorFramework; + this.zkConfig = zkConfig; this.blobStore = blobStore; this.searchContext = searchContext; this.s3Bucket = s3Bucket; @@ -91,12 +94,13 @@ public CachingChunkManager( protected void startUp() throws Exception { LOG.info("Starting caching chunk manager"); - replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - searchMetadataStore = new SearchMetadataStore(curatorFramework, false); - cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); - cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework, cacheNodeId); - cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework); + replicaMetadataStore = new ReplicaMetadataStore(curatorFramework, zkConfig); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false); + cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework, zkConfig); + cacheNodeAssignmentStore = + new CacheNodeAssignmentStore(curatorFramework, zkConfig, cacheNodeId); + cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework, zkConfig); if (Boolean.getBoolean(ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG)) { cacheNodeAssignmentStore.addListener(cacheNodeAssignmentChangeListener); @@ -158,6 +162,7 @@ protected void shutDown() throws Exception { public static CachingChunkManager fromConfig( MeterRegistry meterRegistry, AsyncCuratorFramework curatorFramework, + AstraConfigs.ZookeeperConfig zkConfig, AstraConfigs.S3Config s3Config, AstraConfigs.CacheConfig cacheConfig, BlobStore blobStore) @@ -165,6 +170,7 @@ public static CachingChunkManager fromConfig( return new CachingChunkManager<>( meterRegistry, curatorFramework, + zkConfig, blobStore, SearchContext.fromConfig(cacheConfig.getServerConfig()), s3Config.getS3Bucket(), diff --git a/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java b/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java index 590f76d609..a4e6661d03 100644 --- a/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java +++ b/astra/src/main/java/com/slack/astra/chunkManager/IndexingChunkManager.java @@ -65,6 +65,7 @@ public class IndexingChunkManager extends ChunkManagerBase { private final AsyncCuratorFramework curatorFramework; private final SearchContext searchContext; private final AstraConfigs.IndexerConfig indexerConfig; + private final AstraConfigs.ZookeeperConfig zkConfig; private ReadWriteChunk activeChunk; private final MeterRegistry meterRegistry; @@ -120,7 +121,8 @@ public IndexingChunkManager( ListeningExecutorService rolloverExecutorService, AsyncCuratorFramework curatorFramework, SearchContext searchContext, - AstraConfigs.IndexerConfig indexerConfig) { + AstraConfigs.IndexerConfig indexerConfig, + AstraConfigs.ZookeeperConfig zkConfig) { ensureNonNullString(dataDirectory, "The data directory shouldn't be empty"); this.dataDirectory = new File(dataDirectory); @@ -138,6 +140,7 @@ public IndexingChunkManager( this.curatorFramework = curatorFramework; this.searchContext = searchContext; this.indexerConfig = indexerConfig; + this.zkConfig = zkConfig; stopIngestion = true; activeChunk = null; @@ -384,8 +387,8 @@ public ListenableFuture getRolloverFuture() { protected void startUp() throws Exception { LOG.info("Starting indexing chunk manager"); - searchMetadataStore = new SearchMetadataStore(curatorFramework, false); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); stopIngestion = false; } @@ -444,6 +447,7 @@ public static IndexingChunkManager fromConfig( MeterRegistry meterRegistry, AsyncCuratorFramework curatorFramework, AstraConfigs.IndexerConfig indexerConfig, + AstraConfigs.ZookeeperConfig zkConfig, BlobStore blobStore, AstraConfigs.S3Config s3Config) { @@ -459,6 +463,7 @@ public static IndexingChunkManager fromConfig( makeDefaultRollOverExecutor(), curatorFramework, SearchContext.fromConfig(indexerConfig.getServerConfig()), - indexerConfig); + indexerConfig, + zkConfig); } } diff --git a/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java b/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java index 56b83eeecf..ac2c8930ec 100644 --- a/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeAssignmentStore.java @@ -3,6 +3,7 @@ import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.slack.astra.metadata.core.AstraPartitioningMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import com.slack.astra.proto.metadata.Metadata; import java.util.List; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -11,18 +12,22 @@ public class CacheNodeAssignmentStore extends AstraPartitioningMetadataStore { public static final String CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH = "/cacheAssignment"; - public CacheNodeAssignmentStore(AsyncCuratorFramework curator) { + public CacheNodeAssignmentStore( + AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig) { super( curator, + zkConfig, CreateMode.PERSISTENT, new CacheNodeAssignmentSerializer().toModelSerializer(), CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH); } /** Restricts the cache node assignment store to only watching events for cacheNodeId */ - public CacheNodeAssignmentStore(AsyncCuratorFramework curator, String cacheNodeId) { + public CacheNodeAssignmentStore( + AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig, String cacheNodeId) { super( curator, + zkConfig, CreateMode.PERSISTENT, new CacheNodeAssignmentSerializer().toModelSerializer(), CACHE_NODE_ASSIGNMENT_STORE_ZK_PATH, diff --git a/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeMetadataStore.java index 7f20ae0f4c..28834524b1 100644 --- a/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/cache/CacheNodeMetadataStore.java @@ -1,15 +1,18 @@ package com.slack.astra.metadata.cache; import com.slack.astra.metadata.core.AstraMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.zookeeper.CreateMode; public class CacheNodeMetadataStore extends AstraMetadataStore { public static final String CACHE_NODE_METADATA_STORE_ZK_PATH = "/cacheNodes"; - public CacheNodeMetadataStore(AsyncCuratorFramework curator) { + public CacheNodeMetadataStore( + AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig) { super( curator, + zkConfig, CreateMode.EPHEMERAL, true, new CacheNodeMetadataSerializer().toModelSerializer(), diff --git a/astra/src/main/java/com/slack/astra/metadata/cache/CacheSlotMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/cache/CacheSlotMetadataStore.java index ad870148ca..94e87b840d 100644 --- a/astra/src/main/java/com/slack/astra/metadata/cache/CacheSlotMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/cache/CacheSlotMetadataStore.java @@ -3,6 +3,7 @@ import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; import com.slack.astra.metadata.core.AstraPartitioningMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import com.slack.astra.proto.metadata.Metadata; import java.time.Instant; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -15,9 +16,12 @@ public class CacheSlotMetadataStore extends AstraPartitioningMetadataStore implements Closeable { private final ExecutorService cacheInitializedService; private final ModeledCacheListener initializedListener = getCacheInitializedListener(); + private final AstraConfigs.ZookeeperConfig zkConfig; + public AstraMetadataStore( AsyncCuratorFramework curator, + AstraConfigs.ZookeeperConfig zkConfig, CreateMode createMode, boolean shouldCache, ModelSerializer modelSerializer, @@ -59,6 +61,7 @@ public AstraMetadataStore( this.storeFolder = storeFolder; this.zPath = ZPath.parseWithIds(String.format("%s/{name}", storeFolder)); + this.zkConfig = zkConfig; ModelSpec modelSpec = ModelSpec.builder(modelSerializer) @@ -91,7 +94,7 @@ public void createSync(T metadataNode) { try { createAsync(metadataNode) .toCompletableFuture() - .get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error creating node " + metadataNode, e); } @@ -106,7 +109,9 @@ public CompletionStage getAsync(String path) { public T getSync(String path) { try { - return getAsync(path).toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + return getAsync(path) + .toCompletableFuture() + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error fetching node at path " + path, e); } @@ -122,7 +127,9 @@ public CompletionStage hasAsync(String path) { public boolean hasSync(String path) { try { - return hasAsync(path).toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS) + return hasAsync(path) + .toCompletableFuture() + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS) != null; } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error fetching node at path " + path, e); @@ -137,7 +144,7 @@ public void updateSync(T metadataNode) { try { updateAsync(metadataNode) .toCompletableFuture() - .get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error updating node: " + metadataNode, e); } @@ -149,7 +156,9 @@ public CompletionStage deleteAsync(String path) { public void deleteSync(String path) { try { - deleteAsync(path).toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + deleteAsync(path) + .toCompletableFuture() + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { throw new InternalMetadataStoreException("Error deleting node under at path: " + path, e); } @@ -163,7 +172,7 @@ public void deleteSync(T metadataNode) { try { deleteAsync(metadataNode) .toCompletableFuture() - .get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { throw new InternalMetadataStoreException( "Error deleting node under at path: " + metadataNode.name, e); @@ -181,7 +190,9 @@ public CompletionStage> listAsync() { public List listSync() { try { - return listAsync().toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + return listAsync() + .toCompletableFuture() + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error getting cached nodes", e); } @@ -214,7 +225,7 @@ public void removeListener(AstraMetadataStoreChangeListener watcher) { private void awaitCacheInitialized() { try { - if (!cacheInitialized.await(30, TimeUnit.SECONDS)) { + if (!cacheInitialized.await(zkConfig.getZkCacheInitTimeoutMs(), TimeUnit.MILLISECONDS)) { // in the event we deadlock, go ahead and time this out at 30s and restart the pod new RuntimeHalterImpl() .handleFatal( diff --git a/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java index b08b3800c4..fb577eaa17 100644 --- a/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStore.java @@ -1,8 +1,7 @@ package com.slack.astra.metadata.core; -import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS; - import com.google.common.collect.Sets; +import com.slack.astra.proto.config.AstraConfigs; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -55,17 +54,20 @@ public class AstraPartitioningMetadataStore protected final ModelSerializer modelSerializer; private final Watcher watcher; private final List partitionFilters; + private final AstraConfigs.ZookeeperConfig zkConfig; public AstraPartitioningMetadataStore( AsyncCuratorFramework curator, + AstraConfigs.ZookeeperConfig zkConfig, CreateMode createMode, ModelSerializer modelSerializer, String storeFolder) { - this(curator, createMode, modelSerializer, storeFolder, List.of()); + this(curator, zkConfig, createMode, modelSerializer, storeFolder, List.of()); } public AstraPartitioningMetadataStore( AsyncCuratorFramework curator, + AstraConfigs.ZookeeperConfig zkConfig, CreateMode createMode, ModelSerializer modelSerializer, String storeFolder, @@ -76,6 +78,7 @@ public AstraPartitioningMetadataStore( this.modelSerializer = modelSerializer; this.watcher = buildWatcher(); this.partitionFilters = partitionFilters; + this.zkConfig = zkConfig; // register watchers for when partitions are added or removed curator @@ -135,8 +138,8 @@ public AstraPartitioningMetadataStore( *

This method creates stores internally when they are detected in ZK storing them to the store * map, and removes stores that are in the map that no longer exist in ZK. * - * @see AstraMetadataStore#AstraMetadataStore(AsyncCuratorFramework, CreateMode, boolean, - * ModelSerializer, String) + * @see AstraMetadataStore#AstraMetadataStore(AsyncCuratorFramework, AstraConfigs.ZookeeperConfig, + * CreateMode, boolean, ModelSerializer, String) */ private Watcher buildWatcher() { return event -> { @@ -189,7 +192,7 @@ public void createSync(T metadataNode) { try { createAsync(metadataNode) .toCompletableFuture() - .get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error creating node " + metadataNode, e); } @@ -203,7 +206,7 @@ public T getSync(String partition, String path) { try { return getAsync(partition, path) .toCompletableFuture() - .get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error fetching node at path " + path, e); } @@ -227,7 +230,9 @@ public CompletionStage findAsync(String path) { */ public T findSync(String path) { try { - return findAsync(path).toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + return findAsync(path) + .toCompletableFuture() + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error fetching node at path " + path, e); } @@ -241,7 +246,7 @@ public void updateSync(T metadataNode) { try { updateAsync(metadataNode) .toCompletableFuture() - .get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new InternalMetadataStoreException("Error updating node: " + metadataNode, e); } @@ -255,7 +260,7 @@ public void deleteSync(T metadataNode) { try { deleteAsync(metadataNode) .toCompletableFuture() - .get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { throw new InternalMetadataStoreException( "Error deleting node under at path: " + metadataNode.name, e); @@ -280,7 +285,9 @@ public CompletableFuture> listAsync() { public List listSync() { try { - return listAsync().toCompletableFuture().get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.SECONDS); + return listAsync() + .toCompletableFuture() + .get(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { throw new InternalMetadataStoreException("Error listing nodes", e); } @@ -304,7 +311,7 @@ private AstraMetadataStore getOrCreateMetadataStore(String partition) { "Creating new metadata store for partition - {}, at path - {}", partition, path); AstraMetadataStore newStore = - new AstraMetadataStore<>(curator, createMode, true, modelSerializer, path); + new AstraMetadataStore<>(curator, zkConfig, createMode, true, modelSerializer, path); listeners.forEach(newStore::addListener); return newStore; diff --git a/astra/src/main/java/com/slack/astra/metadata/dataset/DatasetMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/dataset/DatasetMetadataStore.java index ccaf0b6cf7..50be14e79d 100644 --- a/astra/src/main/java/com/slack/astra/metadata/dataset/DatasetMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/dataset/DatasetMetadataStore.java @@ -1,6 +1,7 @@ package com.slack.astra.metadata.dataset; import com.slack.astra.metadata.core.AstraMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.zookeeper.CreateMode; @@ -8,9 +9,12 @@ public class DatasetMetadataStore extends AstraMetadataStore { // TODO: The path should be dataset, but leaving it as /service for backwards compatibility. public static final String DATASET_METADATA_STORE_ZK_PATH = "/service"; - public DatasetMetadataStore(AsyncCuratorFramework curator, boolean shouldCache) throws Exception { + public DatasetMetadataStore( + AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig, boolean shouldCache) + throws Exception { super( curator, + zkConfig, CreateMode.PERSISTENT, shouldCache, new DatasetMetadataSerializer().toModelSerializer(), diff --git a/astra/src/main/java/com/slack/astra/metadata/hpa/HpaMetricMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/hpa/HpaMetricMetadataStore.java index 1dcae5d42e..23fe26c88d 100644 --- a/astra/src/main/java/com/slack/astra/metadata/hpa/HpaMetricMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/hpa/HpaMetricMetadataStore.java @@ -1,15 +1,18 @@ package com.slack.astra.metadata.hpa; import com.slack.astra.metadata.core.AstraMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.zookeeper.CreateMode; public class HpaMetricMetadataStore extends AstraMetadataStore { public static final String AUTOSCALER_METADATA_STORE_ZK_PATH = "/hpa_metrics"; - public HpaMetricMetadataStore(AsyncCuratorFramework curator, boolean shouldCache) { + public HpaMetricMetadataStore( + AsyncCuratorFramework curator, AstraConfigs.ZookeeperConfig zkConfig, boolean shouldCache) { super( curator, + zkConfig, CreateMode.EPHEMERAL, shouldCache, new HpaMetricMetadataSerializer().toModelSerializer(), diff --git a/astra/src/main/java/com/slack/astra/metadata/recovery/RecoveryNodeMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/recovery/RecoveryNodeMetadataStore.java index 5f883c1da5..b7dae241f7 100644 --- a/astra/src/main/java/com/slack/astra/metadata/recovery/RecoveryNodeMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/recovery/RecoveryNodeMetadataStore.java @@ -1,6 +1,7 @@ package com.slack.astra.metadata.recovery; import com.slack.astra.metadata.core.AstraMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.zookeeper.CreateMode; @@ -11,9 +12,13 @@ public class RecoveryNodeMetadataStore extends AstraMetadataStore { public static final String RECOVERY_TASK_ZK_PATH = "/recoveryTask"; - public RecoveryTaskMetadataStore(AsyncCuratorFramework curatorFramework, boolean shouldCache) + public RecoveryTaskMetadataStore( + AsyncCuratorFramework curatorFramework, + AstraConfigs.ZookeeperConfig zkConfig, + boolean shouldCache) throws Exception { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, shouldCache, new RecoveryTaskMetadataSerializer().toModelSerializer(), diff --git a/astra/src/main/java/com/slack/astra/metadata/replica/ReplicaMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/replica/ReplicaMetadataStore.java index 510d466b76..1afd041dd4 100644 --- a/astra/src/main/java/com/slack/astra/metadata/replica/ReplicaMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/replica/ReplicaMetadataStore.java @@ -1,15 +1,19 @@ package com.slack.astra.metadata.replica; import com.slack.astra.metadata.core.AstraPartitioningMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.zookeeper.CreateMode; public class ReplicaMetadataStore extends AstraPartitioningMetadataStore { public static final String REPLICA_STORE_ZK_PATH = "/partitioned_replica"; - public ReplicaMetadataStore(AsyncCuratorFramework curatorFramework) throws Exception { + public ReplicaMetadataStore( + AsyncCuratorFramework curatorFramework, AstraConfigs.ZookeeperConfig zkConfig) + throws Exception { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ReplicaMetadataSerializer().toModelSerializer(), REPLICA_STORE_ZK_PATH); diff --git a/astra/src/main/java/com/slack/astra/metadata/search/SearchMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/search/SearchMetadataStore.java index bc6c2c339f..fe68c157d3 100644 --- a/astra/src/main/java/com/slack/astra/metadata/search/SearchMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/search/SearchMetadataStore.java @@ -1,6 +1,7 @@ package com.slack.astra.metadata.search; import com.slack.astra.metadata.core.AstraMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.zookeeper.CreateMode; @@ -9,10 +10,14 @@ public class SearchMetadataStore extends AstraMetadataStore { public static final String SEARCH_METADATA_STORE_ZK_PATH = "/search"; - public SearchMetadataStore(AsyncCuratorFramework curatorFramework, boolean shouldCache) + public SearchMetadataStore( + AsyncCuratorFramework curatorFramework, + AstraConfigs.ZookeeperConfig zkConfig, + boolean shouldCache) throws Exception { super( curatorFramework, + zkConfig, CreateMode.EPHEMERAL, shouldCache, new SearchMetadataSerializer().toModelSerializer(), diff --git a/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataStore.java b/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataStore.java index f05d77974f..09b241d88e 100644 --- a/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataStore.java +++ b/astra/src/main/java/com/slack/astra/metadata/snapshot/SnapshotMetadataStore.java @@ -1,6 +1,7 @@ package com.slack.astra.metadata.snapshot; import com.slack.astra.metadata.core.AstraPartitioningMetadataStore; +import com.slack.astra.proto.config.AstraConfigs; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.zookeeper.CreateMode; @@ -9,9 +10,12 @@ public class SnapshotMetadataStore extends AstraPartitioningMetadataStore getServices( meterRegistry, curatorFramework, astraConfig.getIndexerConfig(), + astraConfig.getMetadataStoreConfig().getZookeeperConfig(), blobStore, astraConfig.getS3Config()); services.add(chunkManager); @@ -173,6 +174,7 @@ private static Set getServices( new AstraIndexer( chunkManager, curatorFramework, + astraConfig.getMetadataStoreConfig().getZookeeperConfig(), astraConfig.getIndexerConfig(), astraConfig.getIndexerConfig().getKafkaConfig(), meterRegistry); @@ -195,9 +197,15 @@ private static Set getServices( } if (roles.contains(AstraConfigs.NodeRole.QUERY)) { - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - DatasetMetadataStore datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig()); + DatasetMetadataStore datasetMetadataStore = + new DatasetMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); services.add( new CloseableLifecycleManager( @@ -234,13 +242,15 @@ private static Set getServices( CachingChunkManager.fromConfig( meterRegistry, curatorFramework, + astraConfig.getMetadataStoreConfig().getZookeeperConfig(), astraConfig.getS3Config(), astraConfig.getCacheConfig(), blobStore); services.add(chunkManager); HpaMetricMetadataStore hpaMetricMetadataStore = - new HpaMetricMetadataStore(curatorFramework, true); + new HpaMetricMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); services.add( new CloseableLifecycleManager( AstraConfigs.NodeRole.CACHE, List.of(hpaMetricMetadataStore))); @@ -269,16 +279,27 @@ private static Set getServices( final AstraConfigs.ManagerConfig managerConfig = astraConfig.getManagerConfig(); final int serverPort = managerConfig.getServerConfig().getServerPort(); - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig()); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig()); RecoveryTaskMetadataStore recoveryTaskMetadataStore = - new RecoveryTaskMetadataStore(curatorFramework, true); + new RecoveryTaskMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); RecoveryNodeMetadataStore recoveryNodeMetadataStore = - new RecoveryNodeMetadataStore(curatorFramework, true); - CacheSlotMetadataStore cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); - DatasetMetadataStore datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + new RecoveryNodeMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); + CacheSlotMetadataStore cacheSlotMetadataStore = + new CacheSlotMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig()); + DatasetMetadataStore datasetMetadataStore = + new DatasetMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); HpaMetricMetadataStore hpaMetricMetadataStore = - new HpaMetricMetadataStore(curatorFramework, true); + new HpaMetricMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); Duration requestTimeout = Duration.ofMillis(astraConfig.getManagerConfig().getServerConfig().getRequestTimeoutMs()); @@ -333,9 +354,12 @@ private static Set getServices( replicaMetadataStore, snapshotMetadataStore, blobStore, managerConfig, meterRegistry); services.add(snapshotDeletionService); - CacheNodeMetadataStore cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework); + CacheNodeMetadataStore cacheNodeMetadataStore = + new CacheNodeMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig()); CacheNodeAssignmentStore cacheNodeAssignmentStore = - new CacheNodeAssignmentStore(curatorFramework); + new CacheNodeAssignmentStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig()); ClusterHpaMetricService clusterHpaMetricService = new ClusterHpaMetricService( @@ -400,7 +424,9 @@ private static Set getServices( } if (roles.contains(AstraConfigs.NodeRole.PREPROCESSOR)) { - DatasetMetadataStore datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + DatasetMetadataStore datasetMetadataStore = + new DatasetMetadataStore( + curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true); final AstraConfigs.PreprocessorConfig preprocessorConfig = astraConfig.getPreprocessorConfig(); diff --git a/astra/src/main/java/com/slack/astra/server/AstraIndexer.java b/astra/src/main/java/com/slack/astra/server/AstraIndexer.java index 0b0ddb7589..5fad4253b0 100644 --- a/astra/src/main/java/com/slack/astra/server/AstraIndexer.java +++ b/astra/src/main/java/com/slack/astra/server/AstraIndexer.java @@ -28,6 +28,7 @@ public class AstraIndexer extends AbstractExecutionThreadService { private final AsyncCuratorFramework curatorFramework; private final MeterRegistry meterRegistry; + private final AstraConfigs.ZookeeperConfig zkConfig; private final AstraConfigs.IndexerConfig indexerConfig; private final AstraConfigs.KafkaConfig kafkaConfig; private final AstraKafkaConsumer kafkaConsumer; @@ -53,11 +54,13 @@ public class AstraIndexer extends AbstractExecutionThreadService { public AstraIndexer( IndexingChunkManager chunkManager, AsyncCuratorFramework curatorFramework, + AstraConfigs.ZookeeperConfig zkConfig, AstraConfigs.IndexerConfig indexerConfig, AstraConfigs.KafkaConfig kafkaConfig, MeterRegistry meterRegistry) { checkNotNull(chunkManager, "Chunk manager can't be null"); this.curatorFramework = curatorFramework; + this.zkConfig = zkConfig; this.indexerConfig = indexerConfig; this.kafkaConfig = kafkaConfig; this.meterRegistry = meterRegistry; @@ -88,9 +91,10 @@ protected void startUp() throws Exception { */ private long indexerPreStart() throws Exception { LOG.info("Starting Astra indexer pre start."); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); RecoveryTaskMetadataStore recoveryTaskMetadataStore = - new RecoveryTaskMetadataStore(curatorFramework, true); + new RecoveryTaskMetadataStore(curatorFramework, zkConfig, true); String partitionId = kafkaConfig.getKafkaTopicPartition(); long maxOffsetDelay = indexerConfig.getMaxOffsetDelayMessages(); diff --git a/astra/src/main/proto/astra_configs.proto b/astra/src/main/proto/astra_configs.proto index d00b42a1be..51ddee46cd 100644 --- a/astra/src/main/proto/astra_configs.proto +++ b/astra/src/main/proto/astra_configs.proto @@ -62,6 +62,7 @@ message ZookeeperConfig { int32 zk_session_timeout_ms = 3; int32 zk_connection_timeout_ms = 4; int32 sleep_between_retries_ms = 5; + int32 zk_cache_init_timeout_ms = 6; } // S3 Configuration. diff --git a/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java b/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java index 5b78dc6434..c84d85036f 100644 --- a/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java +++ b/astra/src/test/java/com/slack/astra/bulkIngestApi/BulkIngestKafkaProducerTest.java @@ -68,6 +68,7 @@ public void bootstrapCluster() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); @@ -92,7 +93,7 @@ public void bootstrapCluster() throws Exception { .setRateLimiterMaxBurstSeconds(1) .build(); - datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + datasetMetadataStore = new DatasetMetadataStore(curatorFramework, zkConfig, true); DatasetMetadata datasetMetadata = new DatasetMetadata( INDEX_NAME, diff --git a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java index 342b0be2f2..2a7d39e369 100644 --- a/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/IndexingChunkImplTest.java @@ -107,14 +107,17 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); registry = new SimpleMeterRegistry(); curatorFramework = CuratorBuilder.build(registry, zkConfig); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, true); final LuceneIndexStoreImpl logStore = LuceneIndexStoreImpl.makeLogStore( @@ -457,14 +460,17 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); registry = new SimpleMeterRegistry(); curatorFramework = CuratorBuilder.build(registry, zkConfig); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, true); final LuceneIndexStoreImpl logStore = LuceneIndexStoreImpl.makeLogStore( @@ -541,14 +547,15 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); registry = new SimpleMeterRegistry(); curatorFramework = CuratorBuilder.build(registry, zkConfig); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - searchMetadataStore = new SearchMetadataStore(curatorFramework, true); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, true); final LuceneIndexStoreImpl logStore = LuceneIndexStoreImpl.makeLogStore( diff --git a/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java index 442e02f416..6d9dc1dc8e 100644 --- a/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/ReadOnlyChunkImplTest.java @@ -107,20 +107,25 @@ public void shouldHandleChunkLivecycle() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); - CacheSlotMetadataStore cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore(curatorFramework, zkConfig); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, true); + CacheSlotMetadataStore cacheSlotMetadataStore = + new CacheSlotMetadataStore(curatorFramework, zkConfig); String replicaId = "foo"; String snapshotId = "bar"; // setup Zk, BlobFs so data can be loaded - initializeZkReplica(curatorFramework, replicaId, snapshotId); - initializeZkSnapshot(curatorFramework, snapshotId, 0); + initializeZkReplica(curatorFramework, zkConfig, replicaId, snapshotId); + initializeZkSnapshot(curatorFramework, zkConfig, snapshotId, 0); initializeBlobStorageWithIndex(snapshotId); SearchContext searchContext = @@ -246,20 +251,25 @@ public void shouldHandleMissingS3Assets() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); - CacheSlotMetadataStore cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore(curatorFramework, zkConfig); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, true); + CacheSlotMetadataStore cacheSlotMetadataStore = + new CacheSlotMetadataStore(curatorFramework, zkConfig); String replicaId = "foo"; String snapshotId = "bar"; // setup Zk, BlobFs so data can be loaded - initializeZkReplica(curatorFramework, replicaId, snapshotId); - initializeZkSnapshot(curatorFramework, snapshotId, 0); + initializeZkReplica(curatorFramework, zkConfig, replicaId, snapshotId); + initializeZkSnapshot(curatorFramework, zkConfig, snapshotId, 0); ReadOnlyChunkImpl readOnlyChunk = new ReadOnlyChunkImpl<>( @@ -312,19 +322,24 @@ public void shouldHandleMissingZkData() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); - CacheSlotMetadataStore cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore(curatorFramework, zkConfig); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, true); + CacheSlotMetadataStore cacheSlotMetadataStore = + new CacheSlotMetadataStore(curatorFramework, zkConfig); String replicaId = "foo"; String snapshotId = "bar"; // setup Zk, BlobFs so data can be loaded - initializeZkReplica(curatorFramework, replicaId, snapshotId); + initializeZkReplica(curatorFramework, zkConfig, replicaId, snapshotId); // we intentionally do not initialize a Snapshot, so the lookup is expected to fail ReadOnlyChunkImpl readOnlyChunk = @@ -378,20 +393,25 @@ public void closeShouldCleanupLiveChunkCorrectly() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); - CacheSlotMetadataStore cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore(curatorFramework, zkConfig); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, true); + CacheSlotMetadataStore cacheSlotMetadataStore = + new CacheSlotMetadataStore(curatorFramework, zkConfig); String replicaId = "foo"; String snapshotId = "bar"; // setup Zk, BlobFs so data can be loaded - initializeZkReplica(curatorFramework, replicaId, snapshotId); - initializeZkSnapshot(curatorFramework, snapshotId, 0); + initializeZkReplica(curatorFramework, zkConfig, replicaId, snapshotId); + initializeZkSnapshot(curatorFramework, zkConfig, snapshotId, 0); initializeBlobStorageWithIndex(snapshotId); ReadOnlyChunkImpl readOnlyChunk = @@ -481,15 +501,20 @@ public void shouldHandleDynamicChunkSizeLifecycle() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, true); - CacheSlotMetadataStore cacheSlotMetadataStore = new CacheSlotMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore(curatorFramework, zkConfig); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, true); + CacheSlotMetadataStore cacheSlotMetadataStore = + new CacheSlotMetadataStore(curatorFramework, zkConfig); CacheNodeAssignmentStore cacheNodeAssignmentStore = - new CacheNodeAssignmentStore(curatorFramework); + new CacheNodeAssignmentStore(curatorFramework, zkConfig); String replicaId = "foo"; String snapshotId = "boo"; @@ -498,8 +523,8 @@ public void shouldHandleDynamicChunkSizeLifecycle() throws Exception { String replicaSet = "cat"; // setup Zk, BlobFs so data can be loaded - initializeZkReplica(curatorFramework, replicaId, snapshotId); - initializeZkSnapshot(curatorFramework, snapshotId, 29); + initializeZkReplica(curatorFramework, zkConfig, replicaId, snapshotId); + initializeZkSnapshot(curatorFramework, zkConfig, snapshotId, 29); initializeBlobStorageWithIndex(snapshotId); initializeCacheNodeAssignment( cacheNodeAssignmentStore, assignmentId, snapshotId, cacheNodeId, replicaSet, replicaId); @@ -599,9 +624,13 @@ private void assignReplicaToChunk( } private void initializeZkSnapshot( - AsyncCuratorFramework curatorFramework, String snapshotId, long sizeInBytesOnDisk) + AsyncCuratorFramework curatorFramework, + AstraConfigs.ZookeeperConfig zkConfig, + String snapshotId, + long sizeInBytesOnDisk) throws Exception { - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); snapshotMetadataStore.createSync( new SnapshotMetadata( snapshotId, @@ -613,9 +642,13 @@ private void initializeZkSnapshot( } private void initializeZkReplica( - AsyncCuratorFramework curatorFramework, String replicaId, String snapshotId) + AsyncCuratorFramework curatorFramework, + AstraConfigs.ZookeeperConfig zkConfig, + String replicaId, + String snapshotId) throws Exception { - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore(curatorFramework, zkConfig); replicaMetadataStore.createSync( new ReplicaMetadata( replicaId, diff --git a/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java b/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java index 8877666120..837429519f 100644 --- a/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java +++ b/astra/src/test/java/com/slack/astra/chunk/RecoveryChunkImplTest.java @@ -88,11 +88,12 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(registry, zkConfig); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - searchMetadataStore = new SearchMetadataStore(curatorFramework, false); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false); final LuceneIndexStoreImpl logStore = LuceneIndexStoreImpl.makeLogStore( @@ -449,11 +450,12 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(registry, zkConfig); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - searchMetadataStore = new SearchMetadataStore(curatorFramework, false); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false); final LuceneIndexStoreImpl logStore = LuceneIndexStoreImpl.makeLogStore( @@ -531,14 +533,15 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); registry = new SimpleMeterRegistry(); curatorFramework = CuratorBuilder.build(registry, zkConfig); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - searchMetadataStore = new SearchMetadataStore(curatorFramework, true); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, true); final LuceneIndexStoreImpl logStore = LuceneIndexStoreImpl.makeLogStore( diff --git a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java index 6284e9aaa6..4179f6d807 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/CachingChunkManagerTest.java @@ -72,6 +72,7 @@ public class CachingChunkManagerTest { .build(); private AsyncCuratorFramework curatorFramework; + private AstraConfigs.ZookeeperConfig zkConfig; private CachingChunkManager cachingChunkManager; private CacheNodeAssignmentStore cacheNodeAssignmentStore; private SnapshotMetadataStore snapshotMetadataStore; @@ -128,13 +129,14 @@ private CachingChunkManager initChunkManager() throws TimeoutExcepti .setS3Config(s3Config) .build(); - AstraConfigs.ZookeeperConfig zkConfig = + zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testingServer.getConnectString()) .setZkPathPrefix("test") .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); @@ -143,6 +145,7 @@ private CachingChunkManager initChunkManager() throws TimeoutExcepti new CachingChunkManager<>( meterRegistry, curatorFramework, + zkConfig, blobStore, SearchContext.fromConfig(AstraConfig.getCacheConfig().getServerConfig()), AstraConfig.getS3Config().getS3Bucket(), @@ -157,8 +160,8 @@ private CachingChunkManager initChunkManager() throws TimeoutExcepti } private CacheNodeAssignment initAssignment(String snapshotId) throws Exception { - cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + cacheNodeAssignmentStore = new CacheNodeAssignmentStore(curatorFramework, zkConfig); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); snapshotMetadataStore.createSync(new SnapshotMetadata(snapshotId, 1, 1, 0, "abcd", 29)); CacheNodeAssignment newAssignment = new CacheNodeAssignment( @@ -271,7 +274,8 @@ public void testChunkManagerRegistration() throws Exception { enableDynamicChunksFlag(); cachingChunkManager = initChunkManager(); - CacheNodeMetadataStore cacheNodeMetadataStore = new CacheNodeMetadataStore(curatorFramework); + CacheNodeMetadataStore cacheNodeMetadataStore = + new CacheNodeMetadataStore(curatorFramework, zkConfig); List cacheNodeMetadatas = cacheNodeMetadataStore.listSync(); assertThat(cachingChunkManager.getChunkList().size()).isEqualTo(0); diff --git a/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java index 63609dde3d..576ebb3ce1 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/IndexingChunkManagerTest.java @@ -118,6 +118,7 @@ public class IndexingChunkManagerTest { private BlobStore blobStore; private TestingServer localZkServer; private AsyncCuratorFramework curatorFramework; + private AstraConfigs.ZookeeperConfig zkConfig; private SnapshotMetadataStore snapshotMetadataStore; private SearchMetadataStore searchMetadataStore; @@ -132,18 +133,19 @@ public void setUp() throws Exception { localZkServer = new TestingServer(); localZkServer.start(); - AstraConfigs.ZookeeperConfig zkConfig = + zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(localZkServer.getConnectString()) .setZkPathPrefix(ZK_PATH_PREFIX) .setZkSessionTimeoutMs(15000) .setZkConnectionTimeoutMs(1500) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(metricsRegistry, zkConfig); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); - searchMetadataStore = new SearchMetadataStore(curatorFramework, false); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false); } @AfterEach @@ -174,7 +176,8 @@ private void initChunkManager( listeningExecutorService, curatorFramework, searchContext, - AstraConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100)); + AstraConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100), + zkConfig); chunkManager.startAsync(); chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); } @@ -196,7 +199,8 @@ private void initChunkManager( listeningExecutorService, curatorFramework, searchContext, - indexerConfig); + indexerConfig, + zkConfig); chunkManager.startAsync(); chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); } @@ -1246,8 +1250,10 @@ public void testSuccessfulRollOverFinishesOnClose() throws Exception { assertThat(rollOverFuture.isDone()).isTrue(); // The stores are closed so temporarily re-create them so we can query the data in ZK. - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, false); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, false); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); assertThat(AstraMetadataTestUtils.listSyncUncached(searchMetadataStore)).isEmpty(); List snapshots = AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore); @@ -1298,8 +1304,10 @@ public void testFailedRollOverFinishesOnClose() throws Exception { // The stores are closed so temporarily re-create them so we can query the data in ZK. // All ephemeral data is ZK is deleted and no data or metadata is persisted. - SearchMetadataStore searchMetadataStore = new SearchMetadataStore(curatorFramework, false); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SearchMetadataStore searchMetadataStore = + new SearchMetadataStore(curatorFramework, zkConfig, false); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); assertThat(AstraMetadataTestUtils.listSyncUncached(searchMetadataStore)).isEmpty(); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore)).isEmpty(); searchMetadataStore.close(); diff --git a/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java b/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java index 0f841a1ae8..499c9d5ad5 100644 --- a/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java +++ b/astra/src/test/java/com/slack/astra/chunkManager/RecoveryChunkManagerTest.java @@ -110,11 +110,12 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(15000) .setZkConnectionTimeoutMs(1500) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(metricsRegistry, zkConfig); - searchMetadataStore = new SearchMetadataStore(curatorFramework, false); - snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + searchMetadataStore = new SearchMetadataStore(curatorFramework, zkConfig, false); + snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework, zkConfig); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java b/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java index 229b3c42d6..5af1238386 100644 --- a/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java +++ b/astra/src/test/java/com/slack/astra/chunkrollover/DiskOrMessageCountBasedRolloverStrategyTest.java @@ -72,6 +72,7 @@ public class DiskOrMessageCountBasedRolloverStrategyTest { private BlobStore blobStore; private TestingServer localZkServer; private AsyncCuratorFramework curatorFramework; + private AstraConfigs.ZookeeperConfig zkConfig; private static long MAX_BYTES_PER_CHUNK = 12000; @@ -95,13 +96,14 @@ public void setUp() throws Exception { localZkServer = new TestingServer(); localZkServer.start(); - AstraConfigs.ZookeeperConfig zkConfig = + zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(localZkServer.getConnectString()) .setZkPathPrefix(ZK_PATH_PREFIX) .setZkSessionTimeoutMs(15000) .setZkConnectionTimeoutMs(1500) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(metricsRegistry, zkConfig); @@ -140,7 +142,8 @@ private void initChunkManager( listeningExecutorService, curatorFramework, searchContext, - AstraConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100)); + AstraConfigUtil.makeIndexerConfig(TEST_PORT, 1000, 100), + zkConfig); chunkManager.startAsync(); chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); diff --git a/astra/src/test/java/com/slack/astra/clusterManager/CacheNodeAssignmentServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/CacheNodeAssignmentServiceTest.java index c67e4c3b53..b42412ee6c 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/CacheNodeAssignmentServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/CacheNodeAssignmentServiceTest.java @@ -64,6 +64,7 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AstraConfigs.ManagerConfig.CacheNodeAssignmentServiceConfig cacheNodeAssignmentServiceConfig = @@ -80,10 +81,10 @@ public void setUp() throws Exception { .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - cacheNodeAssignmentStore = spy(new CacheNodeAssignmentStore(curatorFramework)); - cacheNodeMetadataStore = spy(new CacheNodeMetadataStore(curatorFramework)); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); + cacheNodeAssignmentStore = spy(new CacheNodeAssignmentStore(curatorFramework, zkConfig)); + cacheNodeMetadataStore = spy(new CacheNodeMetadataStore(curatorFramework, zkConfig)); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/clusterManager/ClusterHpaMetricServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/ClusterHpaMetricServiceTest.java index def9aac1a6..6cc2c66bfd 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/ClusterHpaMetricServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/ClusterHpaMetricServiceTest.java @@ -62,16 +62,17 @@ public void setup() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); - cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework)); - cacheNodeAssignmentStore = spy(new CacheNodeAssignmentStore(curatorFramework)); - cacheNodeMetadataStore = spy(new CacheNodeMetadataStore(curatorFramework)); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - hpaMetricMetadataStore = spy(new HpaMetricMetadataStore(curatorFramework, true)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); + cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework, zkConfig)); + cacheNodeAssignmentStore = spy(new CacheNodeAssignmentStore(curatorFramework, zkConfig)); + cacheNodeMetadataStore = spy(new CacheNodeMetadataStore(curatorFramework, zkConfig)); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + hpaMetricMetadataStore = spy(new HpaMetricMetadataStore(curatorFramework, zkConfig, true)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/clusterManager/RecoveryTaskAssignmentServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/RecoveryTaskAssignmentServiceTest.java index aac172732b..244b9619ae 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/RecoveryTaskAssignmentServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/RecoveryTaskAssignmentServiceTest.java @@ -60,11 +60,14 @@ public void setup() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - recoveryTaskMetadataStore = spy(new RecoveryTaskMetadataStore(curatorFramework, true)); - recoveryNodeMetadataStore = spy(new RecoveryNodeMetadataStore(curatorFramework, true)); + recoveryTaskMetadataStore = + spy(new RecoveryTaskMetadataStore(curatorFramework, zkConfig, true)); + recoveryNodeMetadataStore = + spy(new RecoveryNodeMetadataStore(curatorFramework, zkConfig, true)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaAssignmentServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaAssignmentServiceTest.java index 13e5921f30..d700969bb4 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaAssignmentServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaAssignmentServiceTest.java @@ -65,11 +65,12 @@ public void setup() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework)); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); + cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework, zkConfig)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaCreationServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaCreationServiceTest.java index 43ea2bfe84..664b5f71b8 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaCreationServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaCreationServiceTest.java @@ -48,6 +48,7 @@ public class ReplicaCreationServiceTest { private MeterRegistry meterRegistry; private AsyncCuratorFramework curatorFramework; + private AstraConfigs.ZookeeperConfig zkConfig; private SnapshotMetadataStore snapshotMetadataStore; private ReplicaMetadataStore replicaMetadataStore; @@ -57,18 +58,19 @@ public void setup() throws Exception { meterRegistry = new SimpleMeterRegistry(); testingServer = new TestingServer(); - AstraConfigs.ZookeeperConfig zkConfig = + zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testingServer.getConnectString()) .setZkPathPrefix("ReplicaCreatorServiceTest") .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); } @AfterEach @@ -83,8 +85,10 @@ public void shutdown() throws IOException { @Test public void shouldDoNothingIfReplicasAlreadyExist() throws Exception { - ReplicaMetadataStore replicaMetadataStore = new ReplicaMetadataStore(curatorFramework); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + ReplicaMetadataStore replicaMetadataStore = + new ReplicaMetadataStore(curatorFramework, zkConfig); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore(curatorFramework, zkConfig); SnapshotMetadata snapshotA = new SnapshotMetadata( diff --git a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaDeletionServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaDeletionServiceTest.java index eb13a32cf7..5e64944449 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaDeletionServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaDeletionServiceTest.java @@ -63,12 +63,13 @@ public void setup() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework)); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); - cacheNodeMetadataStore = spy(new CacheNodeAssignmentStore(curatorFramework)); + cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework, zkConfig)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); + cacheNodeMetadataStore = spy(new CacheNodeAssignmentStore(curatorFramework, zkConfig)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaEvictionServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaEvictionServiceTest.java index 2b87db2ca6..4be0841761 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaEvictionServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaEvictionServiceTest.java @@ -62,11 +62,12 @@ public void setup() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework)); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); + cacheSlotMetadataStore = spy(new CacheSlotMetadataStore(curatorFramework, zkConfig)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaRestoreServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaRestoreServiceTest.java index b3a6f793a2..c108207c21 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/ReplicaRestoreServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/ReplicaRestoreServiceTest.java @@ -52,6 +52,7 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AstraConfigs.ManagerConfig.ReplicaRestoreServiceConfig replicaRecreationServiceConfig = @@ -69,7 +70,7 @@ public void setUp() throws Exception { .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/clusterManager/SnapshotDeletionServiceTest.java b/astra/src/test/java/com/slack/astra/clusterManager/SnapshotDeletionServiceTest.java index 8f31b86cfe..26ff9d4fe2 100644 --- a/astra/src/test/java/com/slack/astra/clusterManager/SnapshotDeletionServiceTest.java +++ b/astra/src/test/java/com/slack/astra/clusterManager/SnapshotDeletionServiceTest.java @@ -81,11 +81,12 @@ public void setup() throws Exception { .setZkSessionTimeoutMs(2500) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); s3AsyncClient = S3TestUtils.createS3CrtClient(S3_MOCK_EXTENSION.getServiceEndpoint()); blobStore = spy(new BlobStore(s3AsyncClient, S3_TEST_BUCKET)); diff --git a/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java b/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java index 9707cae7e8..65a56206f1 100644 --- a/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java +++ b/astra/src/test/java/com/slack/astra/logstore/search/AstraDistributedQueryServiceTest.java @@ -83,13 +83,14 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = spy(CuratorBuilder.build(metricsRegistry, zkConfig)); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - searchMetadataStore = spy(new SearchMetadataStore(curatorFramework, true)); - datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + searchMetadataStore = spy(new SearchMetadataStore(curatorFramework, zkConfig, true)); + datasetMetadataStore = new DatasetMetadataStore(curatorFramework, zkConfig, true); indexer1SearchContext = new SearchContext("indexer_host1", 10000); indexer2SearchContext = new SearchContext("indexer_host2", 10001); diff --git a/astra/src/test/java/com/slack/astra/metadata/cache/CacheSlotMetadataStoreTest.java b/astra/src/test/java/com/slack/astra/metadata/cache/CacheSlotMetadataStoreTest.java index dc64dd5b0c..d193700138 100644 --- a/astra/src/test/java/com/slack/astra/metadata/cache/CacheSlotMetadataStoreTest.java +++ b/astra/src/test/java/com/slack/astra/metadata/cache/CacheSlotMetadataStoreTest.java @@ -34,16 +34,17 @@ public void setUp() throws Exception { // flaky. testingServer = new TestingServer(); - AstraConfigs.ZookeeperConfig zookeeperConfig = + AstraConfigs.ZookeeperConfig zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testingServer.getConnectString()) .setZkPathPrefix("Test") .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(500) + .setZkCacheInitTimeoutMs(1000) .build(); - this.curatorFramework = CuratorBuilder.build(meterRegistry, zookeeperConfig); - this.store = new CacheSlotMetadataStore(curatorFramework); + this.curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); + this.store = new CacheSlotMetadataStore(curatorFramework, zkConfig); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/metadata/core/AstraMetadataStoreTest.java b/astra/src/test/java/com/slack/astra/metadata/core/AstraMetadataStoreTest.java index 6375645689..dffa675b24 100644 --- a/astra/src/test/java/com/slack/astra/metadata/core/AstraMetadataStoreTest.java +++ b/astra/src/test/java/com/slack/astra/metadata/core/AstraMetadataStoreTest.java @@ -29,7 +29,7 @@ public class AstraMetadataStoreTest { private AsyncCuratorFramework curatorFramework; - private AstraConfigs.ZookeeperConfig zookeeperConfig; + private AstraConfigs.ZookeeperConfig zkConfig; @BeforeEach public void setUp() throws Exception { @@ -38,15 +38,16 @@ public void setUp() throws Exception { // flaky. testingServer = new TestingServer(); - zookeeperConfig = + zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testingServer.getConnectString()) .setZkPathPrefix("Test") .setZkSessionTimeoutMs(10000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(500) + .setZkCacheInitTimeoutMs(10000) .build(); - this.curatorFramework = CuratorBuilder.build(meterRegistry, zookeeperConfig); + this.curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); } @AfterEach @@ -100,6 +101,7 @@ class TestMetadataStore extends AstraMetadataStore { public TestMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, true, new JacksonModelSerializer<>(TestMetadata.class), @@ -157,6 +159,7 @@ class TestMetadataStore extends AstraMetadataStore { public TestMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, true, new JacksonModelSerializer<>(TestMetadata.class), @@ -178,6 +181,7 @@ class TestMetadataStore extends AstraMetadataStore { public TestMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, false, new JacksonModelSerializer<>(TestMetadata.class), @@ -212,6 +216,7 @@ class PersistentMetadataStore extends AstraMetadataStore { public PersistentMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, false, new JacksonModelSerializer<>(TestMetadata.class), @@ -223,6 +228,7 @@ class EphemeralMetadataStore extends AstraMetadataStore { public EphemeralMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.EPHEMERAL, false, new JacksonModelSerializer<>(TestMetadata.class), @@ -253,7 +259,7 @@ public EphemeralMetadataStore() { // close curator, and then instantiate a new copy // This is because we cannot restart the closed curator. curatorFramework.unwrap().close(); - curatorFramework = CuratorBuilder.build(meterRegistry, zookeeperConfig); + curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); try (AstraMetadataStore persistentStore = new PersistentMetadataStore()) { assertThat(persistentStore.getSync("foo")).isEqualTo(metadata1); @@ -272,6 +278,7 @@ class TestMetadataStore extends AstraMetadataStore { public TestMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, true, new JacksonModelSerializer<>(TestMetadata.class), @@ -311,6 +318,7 @@ class TestMetadataStore extends AstraMetadataStore { public TestMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, true, new JacksonModelSerializer<>(TestMetadata.class), @@ -374,7 +382,12 @@ public TestMetadata deserialize(byte[] bytes) { class TestMetadataStore extends AstraMetadataStore { public TestMetadataStore() { super( - curatorFramework, CreateMode.PERSISTENT, true, new CountingSerializer(), STORE_FOLDER); + curatorFramework, + zkConfig, + CreateMode.PERSISTENT, + true, + new CountingSerializer(), + STORE_FOLDER); } } @@ -401,6 +414,7 @@ class FastMetadataStore extends AstraMetadataStore { public FastMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, true, new JacksonModelSerializer<>(TestMetadata.class), @@ -430,7 +444,13 @@ public TestMetadata deserialize(byte[] bytes) { class SlowMetadataStore extends AstraMetadataStore { public SlowMetadataStore() { - super(curatorFramework, CreateMode.PERSISTENT, true, new SlowSerializer(), STORE_FOLDER); + super( + curatorFramework, + zkConfig, + CreateMode.PERSISTENT, + true, + new SlowSerializer(), + STORE_FOLDER); } } diff --git a/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java b/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java index 822c20cb1e..66d8afb7f7 100644 --- a/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java +++ b/astra/src/test/java/com/slack/astra/metadata/core/AstraPartitioningMetadataStoreTest.java @@ -41,7 +41,7 @@ class AstraPartitioningMetadataStoreTest { private TestingServer testingServer; private AsyncCuratorFramework curatorFramework; - private AstraConfigs.ZookeeperConfig zookeeperConfig; + private AstraConfigs.ZookeeperConfig zkConfig; private static final String ZNODE_CONTAINER_CHECK_INTERVAL_MS = "znode.container.checkIntervalMs"; private final Integer checkInterval = Integer.getInteger(ZNODE_CONTAINER_CHECK_INTERVAL_MS); @@ -189,7 +189,7 @@ public void setUp() throws Exception { meterRegistry = new SimpleMeterRegistry(); testingServer = new TestingServer(); - zookeeperConfig = + zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() // .setZkConnectString("127.0.0.1:2181") .setZkConnectString(testingServer.getConnectString()) @@ -197,8 +197,9 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(500) + .setZkCacheInitTimeoutMs(1000) .build(); - this.curatorFramework = CuratorBuilder.build(meterRegistry, zookeeperConfig); + this.curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); } @AfterEach @@ -214,6 +215,7 @@ void testLargeNumberOfZNodes() throws IOException { try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_znodes")) { @@ -247,6 +249,7 @@ void testCreate() throws IOException { try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_create")) { @@ -270,6 +273,7 @@ void testUpdate() throws IOException { try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_update")) { @@ -302,6 +306,7 @@ void testDelete() throws IOException { try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_delete")) { @@ -328,6 +333,7 @@ void testFind() throws IOException { try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_find")) { @@ -350,6 +356,7 @@ void testFindMissingNode() throws IOException { try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_find_missing")) { @@ -364,6 +371,7 @@ void testDuplicateCreate() throws IOException { try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_snapshot_duplicate_create")) { @@ -382,6 +390,7 @@ class PersistentMetadataStore extends AstraPartitioningMetadataStore persistentStore = new PersistentMetadataStore()) { @@ -444,6 +454,7 @@ class TestMetadataStore extends AstraPartitioningMetadataStore public TestMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_zk_reconnect"); @@ -482,6 +493,7 @@ void testListenersOnAddRemoveNodes() try (AstraPartitioningMetadataStore partitionedMetadataStore = new AstraPartitioningMetadataStore<>( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_snapshot_listeners")) { @@ -585,6 +597,7 @@ class TestMetadataStore extends AstraPartitioningMetadataStore public TestMetadataStore() { super( curatorFramework, + zkConfig, CreateMode.PERSISTENT, new ExampleMetadataSerializer().toModelSerializer(), "/partitioned_add_remove_listeners"); @@ -629,6 +642,7 @@ class TestMetadataStore extends AstraPartitioningMetadataStore store.updateAsync(searchMetadata)); assertThat(exAsync).isInstanceOf(UnsupportedOperationException.class); diff --git a/astra/src/test/java/com/slack/astra/recovery/RecoveryServiceTest.java b/astra/src/test/java/com/slack/astra/recovery/RecoveryServiceTest.java index d6c12cf603..3ab5b51e3b 100644 --- a/astra/src/test/java/com/slack/astra/recovery/RecoveryServiceTest.java +++ b/astra/src/test/java/com/slack/astra/recovery/RecoveryServiceTest.java @@ -157,7 +157,9 @@ public void testShouldHandleRecoveryTask() throws Exception { final Instant startTime = Instant.now(); produceMessagesToKafka(kafkaServer.getBroker(), startTime, TEST_KAFKA_TOPIC_1, 0); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig()); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); // Start recovery RecoveryTaskMetadata recoveryTask = @@ -227,7 +229,9 @@ public void testShouldHandleRecoveryTaskWithCompletelyUnavailableOffsets() throw await() .until(() -> localTestConsumer.getEndOffSetForPartition() == msgsToProduce + msgsToProduce); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig()); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); // Start recovery service @@ -309,7 +313,9 @@ public void testShouldHandleRecoveryTaskWithPartiallyUnavailableOffsets() throws await() .until(() -> localTestConsumer.getEndOffSetForPartition() == msgsToProduce + msgsToProduce); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig()); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); // Start recovery service @@ -363,7 +369,9 @@ public void testShouldHandleRecoveryTaskFailure() throws Exception { assertThat(s3AsyncClient.listBuckets().get().buckets().get(0).name()).isEqualTo(TEST_S3_BUCKET); assertThat(s3AsyncClient.listBuckets().get().buckets().get(0).name()) .isNotEqualTo(fakeS3Bucket); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig()); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); // Start recovery @@ -400,13 +408,16 @@ public void testShouldHandleRecoveryTaskAssignmentSuccess() throws Exception { assertThat(s3AsyncClient.listBuckets().get().buckets().size()).isEqualTo(1); assertThat(s3AsyncClient.listBuckets().get().buckets().get(0).name()).isEqualTo(TEST_S3_BUCKET); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig()); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); // Create a recovery task RecoveryTaskMetadataStore recoveryTaskMetadataStore = - new RecoveryTaskMetadataStore(curatorFramework, false); + new RecoveryTaskMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig(), false); assertThat(AstraMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()).isZero(); RecoveryTaskMetadata recoveryTask = new RecoveryTaskMetadata("testRecoveryTask", "0", 30, 60, Instant.now().toEpochMilli()); @@ -418,7 +429,8 @@ public void testShouldHandleRecoveryTaskAssignmentSuccess() throws Exception { // Assign the recovery task to node. RecoveryNodeMetadataStore recoveryNodeMetadataStore = - new RecoveryNodeMetadataStore(curatorFramework, false); + new RecoveryNodeMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig(), false); List recoveryNodes = AstraMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore); assertThat(recoveryNodes.size()).isEqualTo(1); @@ -484,13 +496,16 @@ public void testShouldHandleRecoveryTaskAssignmentFailure() throws Exception { // fakeS3Bucket is not present. assertThat(s3AsyncClient.listBuckets().get().buckets().size()).isEqualTo(1); assertThat(s3AsyncClient.listBuckets().get().buckets().get(0).name()).isEqualTo(TEST_S3_BUCKET); - SnapshotMetadataStore snapshotMetadataStore = new SnapshotMetadataStore(curatorFramework); + SnapshotMetadataStore snapshotMetadataStore = + new SnapshotMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig()); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); assertThat(AstraMetadataTestUtils.listSyncUncached(snapshotMetadataStore).size()).isZero(); // Create a recovery task RecoveryTaskMetadataStore recoveryTaskMetadataStore = - new RecoveryTaskMetadataStore(curatorFramework, false); + new RecoveryTaskMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig(), false); assertThat(AstraMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()).isZero(); RecoveryTaskMetadata recoveryTask = new RecoveryTaskMetadata("testRecoveryTask", "0", 30, 60, Instant.now().toEpochMilli()); @@ -502,7 +517,8 @@ public void testShouldHandleRecoveryTaskAssignmentFailure() throws Exception { // Assign the recovery task to node. RecoveryNodeMetadataStore recoveryNodeMetadataStore = - new RecoveryNodeMetadataStore(curatorFramework, false); + new RecoveryNodeMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig(), false); List recoveryNodes = AstraMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore); assertThat(recoveryNodes.size()).isEqualTo(1); @@ -654,7 +670,8 @@ public void shouldHandleInvalidRecoveryTasks() throws Exception { // Create a recovery task RecoveryTaskMetadataStore recoveryTaskMetadataStore = - new RecoveryTaskMetadataStore(curatorFramework, false); + new RecoveryTaskMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig(), false); assertThat(AstraMetadataTestUtils.listSyncUncached(recoveryTaskMetadataStore).size()).isZero(); RecoveryTaskMetadata recoveryTask = new RecoveryTaskMetadata("testRecoveryTask", "0", 0, 0, Instant.now().toEpochMilli()); @@ -666,7 +683,8 @@ public void shouldHandleInvalidRecoveryTasks() throws Exception { // Assign the recovery task to node. RecoveryNodeMetadataStore recoveryNodeMetadataStore = - new RecoveryNodeMetadataStore(curatorFramework, false); + new RecoveryNodeMetadataStore( + curatorFramework, astraCfg.getMetadataStoreConfig().getZookeeperConfig(), false); List recoveryNodes = AstraMetadataTestUtils.listSyncUncached(recoveryNodeMetadataStore); assertThat(recoveryNodes.size()).isEqualTo(1); diff --git a/astra/src/test/java/com/slack/astra/server/AstraIndexerTest.java b/astra/src/test/java/com/slack/astra/server/AstraIndexerTest.java index 5556424418..b9117f90e5 100644 --- a/astra/src/test/java/com/slack/astra/server/AstraIndexerTest.java +++ b/astra/src/test/java/com/slack/astra/server/AstraIndexerTest.java @@ -81,6 +81,7 @@ public class AstraIndexerTest { private TestKafkaServer kafkaServer; private TestingServer testZKServer; private AsyncCuratorFramework curatorFramework; + private AstraConfigs.ZookeeperConfig zkConfig; private SnapshotMetadataStore snapshotMetadataStore; private RecoveryTaskMetadataStore recoveryTaskStore; private SearchMetadataStore searchMetadataStore; @@ -93,13 +94,14 @@ public void setUp() throws Exception { testZKServer = new TestingServer(); // Metadata store - AstraConfigs.ZookeeperConfig zkConfig = + zkConfig = AstraConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testZKServer.getConnectString()) .setZkPathPrefix("indexerTest") .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = spy(CuratorBuilder.build(metricsRegistry, zkConfig)); @@ -114,14 +116,15 @@ public void setUp() throws Exception { 100, new SearchContext(TEST_HOST, TEST_PORT), curatorFramework, - indexerConfig); + indexerConfig, + zkConfig); chunkManagerUtil.chunkManager.startAsync(); chunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - recoveryTaskStore = spy(new RecoveryTaskMetadataStore(curatorFramework, false)); - searchMetadataStore = spy(new SearchMetadataStore(curatorFramework, false)); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + recoveryTaskStore = spy(new RecoveryTaskMetadataStore(curatorFramework, zkConfig, false)); + searchMetadataStore = spy(new SearchMetadataStore(curatorFramework, zkConfig, false)); kafkaServer = new TestKafkaServer(); } @@ -172,6 +175,7 @@ public void testIndexFreshConsumerKafkaSearchViaGrpcSearchApi() throws Exception new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); @@ -209,6 +213,7 @@ public void testDeleteStaleSnapshotAndStartConsumerKafkaSearchViaGrpcSearchApi() new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); @@ -255,6 +260,7 @@ public void testExceptionOnIndexerStartup() throws Exception { new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); @@ -290,6 +296,7 @@ public void testWithMultipleLiveSnapshotsOnIndexerStart() throws Exception { new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); @@ -338,6 +345,7 @@ public void testIndexerStartsWithPreviousOffset() throws Exception { new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); @@ -388,6 +396,7 @@ public void testIndexerCreatesRecoveryTask() throws Exception { new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(50), getKafkaConfig(), metricsRegistry); @@ -446,6 +455,7 @@ public void testIndexerShutdownTwice() throws Exception { new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(50), getKafkaConfig(), metricsRegistry); @@ -508,6 +518,7 @@ public void testIndexerRestart() throws Exception { new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); @@ -554,7 +565,8 @@ public void testIndexerRestart() throws Exception { 100, new SearchContext(TEST_HOST, TEST_PORT), curatorFramework, - makeIndexerConfig()); + makeIndexerConfig(), + zkConfig); chunkManagerUtil.chunkManager.startAsync(); chunkManagerUtil.chunkManager.awaitRunning(DEFAULT_START_STOP_DURATION); @@ -562,6 +574,7 @@ public void testIndexerRestart() throws Exception { new AstraIndexer( chunkManagerUtil.chunkManager, curatorFramework, + zkConfig, makeIndexerConfig(1000), getKafkaConfig(), metricsRegistry); diff --git a/astra/src/test/java/com/slack/astra/server/AstraTest.java b/astra/src/test/java/com/slack/astra/server/AstraTest.java index 12a7f0b85e..4d4136018a 100644 --- a/astra/src/test/java/com/slack/astra/server/AstraTest.java +++ b/astra/src/test/java/com/slack/astra/server/AstraTest.java @@ -122,9 +122,10 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + datasetMetadataStore = new DatasetMetadataStore(curatorFramework, zkConfig, true); final DatasetPartitionMetadata partition = new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("0", "1")); final List partitionConfigs = Collections.singletonList(partition); diff --git a/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java b/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java index 64a64b09a7..8d60afe819 100644 --- a/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java +++ b/astra/src/test/java/com/slack/astra/server/BulkIngestApiTest.java @@ -79,6 +79,7 @@ public void bootstrapCluster() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); @@ -103,7 +104,7 @@ public void bootstrapCluster() throws Exception { .setRateLimiterMaxBurstSeconds(1) .build(); - datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + datasetMetadataStore = new DatasetMetadataStore(curatorFramework, zkConfig, true); DatasetMetadata datasetMetadata = new DatasetMetadata( INDEX_NAME, diff --git a/astra/src/test/java/com/slack/astra/server/HpaMetricPublisherServiceTest.java b/astra/src/test/java/com/slack/astra/server/HpaMetricPublisherServiceTest.java index 78f5425e74..5f4aa9bbc9 100644 --- a/astra/src/test/java/com/slack/astra/server/HpaMetricPublisherServiceTest.java +++ b/astra/src/test/java/com/slack/astra/server/HpaMetricPublisherServiceTest.java @@ -39,10 +39,11 @@ public void setup() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(new SimpleMeterRegistry(), zkConfig); - hpaMetricMetadataStore = spy(new HpaMetricMetadataStore(curatorFramework, true)); + hpaMetricMetadataStore = spy(new HpaMetricMetadataStore(curatorFramework, zkConfig, true)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/server/ManagerApiGrpcTest.java b/astra/src/test/java/com/slack/astra/server/ManagerApiGrpcTest.java index 5d9e2d90ba..a0c47bc897 100644 --- a/astra/src/test/java/com/slack/astra/server/ManagerApiGrpcTest.java +++ b/astra/src/test/java/com/slack/astra/server/ManagerApiGrpcTest.java @@ -74,12 +74,13 @@ public void setUp() throws Exception { .setZkSessionTimeoutMs(30000) .setZkConnectionTimeoutMs(30000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - datasetMetadataStore = spy(new DatasetMetadataStore(curatorFramework, true)); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework)); + datasetMetadataStore = spy(new DatasetMetadataStore(curatorFramework, zkConfig, true)); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + replicaMetadataStore = spy(new ReplicaMetadataStore(curatorFramework, zkConfig)); AstraConfigs.ManagerConfig.ReplicaRestoreServiceConfig replicaRecreationServiceConfig = AstraConfigs.ManagerConfig.ReplicaRestoreServiceConfig.newBuilder() diff --git a/astra/src/test/java/com/slack/astra/server/RecoveryTaskCreatorTest.java b/astra/src/test/java/com/slack/astra/server/RecoveryTaskCreatorTest.java index de0d7bad41..1771f564a7 100644 --- a/astra/src/test/java/com/slack/astra/server/RecoveryTaskCreatorTest.java +++ b/astra/src/test/java/com/slack/astra/server/RecoveryTaskCreatorTest.java @@ -70,6 +70,7 @@ public void startup() throws Exception { .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(500) + .setZkCacheInitTimeoutMs(1000) .build(); // Default behavior @@ -82,8 +83,8 @@ public void startup() throws Exception { .build(); curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); - snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework)); - recoveryTaskStore = spy(new RecoveryTaskMetadataStore(curatorFramework, true)); + snapshotMetadataStore = spy(new SnapshotMetadataStore(curatorFramework, zkConfig)); + recoveryTaskStore = spy(new RecoveryTaskMetadataStore(curatorFramework, zkConfig, true)); } @AfterEach diff --git a/astra/src/test/java/com/slack/astra/testlib/AstraConfigUtil.java b/astra/src/test/java/com/slack/astra/testlib/AstraConfigUtil.java index 3324868c01..8df2261ac6 100644 --- a/astra/src/test/java/com/slack/astra/testlib/AstraConfigUtil.java +++ b/astra/src/test/java/com/slack/astra/testlib/AstraConfigUtil.java @@ -68,6 +68,7 @@ public static AstraConfigs.AstraConfig makeAstraConfig( .setZkSessionTimeoutMs(15000) .setZkConnectionTimeoutMs(15000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AstraConfigs.MetadataStoreConfig metadataStoreConfig = AstraConfigs.MetadataStoreConfig.newBuilder().setZookeeperConfig(zkConfig).build(); diff --git a/astra/src/test/java/com/slack/astra/testlib/ChunkManagerUtil.java b/astra/src/test/java/com/slack/astra/testlib/ChunkManagerUtil.java index 97e1ad8130..f7cefa09b7 100644 --- a/astra/src/test/java/com/slack/astra/testlib/ChunkManagerUtil.java +++ b/astra/src/test/java/com/slack/astra/testlib/ChunkManagerUtil.java @@ -60,6 +60,7 @@ public static ChunkManagerUtil makeChunkManagerUtil( .setZkSessionTimeoutMs(30000) .setZkConnectionTimeoutMs(30000) .setSleepBetweenRetriesMs(1000) + .setZkCacheInitTimeoutMs(1000) .build(); AsyncCuratorFramework curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); @@ -72,7 +73,8 @@ public static ChunkManagerUtil makeChunkManagerUtil( maxMessagesPerChunk, new SearchContext(TEST_HOST, TEST_PORT), curatorFramework, - indexerConfig); + indexerConfig, + zkConfig); } public ChunkManagerUtil( @@ -84,7 +86,8 @@ public ChunkManagerUtil( long maxMessagesPerChunk, SearchContext searchContext, AsyncCuratorFramework curatorFramework, - AstraConfigs.IndexerConfig indexerConfig) + AstraConfigs.IndexerConfig indexerConfig, + AstraConfigs.ZookeeperConfig zkConfig) throws Exception { tempFolder = Files.createTempDir(); // TODO: don't use beta func. @@ -111,7 +114,8 @@ public ChunkManagerUtil( MoreExecutors.newDirectExecutorService(), curatorFramework, searchContext, - indexerConfig); + indexerConfig, + zkConfig); } public void close() throws IOException, TimeoutException { diff --git a/config/config.yaml b/config/config.yaml index f2c9fb3a0a..f0de7e38e5 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -58,6 +58,7 @@ metadataStoreConfig: zkPathPrefix: ${ASTRA_ZK_PATH_PREFIX:-ASTRA} zkSessionTimeoutMs: ${ASTRA_ZK_SESSION_TIMEOUT_MS:-5000} zkConnectionTimeoutMs: ${ASTRA_ZK_CONNECT_TIMEOUT_MS:-500} + zkCacheInitTimeoutMs: ${ASTRA_ZK_CACHE_INIT_TIMEOUT_MS:-30000} sleepBetweenRetriesMs: ${ASTRA_ZK_SLEEP_RETRIES_MS:-100} cacheConfig: