From fe782d26b24621f86e1dc9cd923bb8b45b2e4f56 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 6 Jul 2023 12:30:50 +0200 Subject: [PATCH] Expose methods to suspend periodic topology refresh and to check whether a topology refresh is running #2428 --- .../ClusterTopologyRefreshScheduler.java | 8 +++-- .../core/cluster/RedisClusterClient.java | 28 +++++++++++++--- .../RedisClusterClientIntegrationTests.java | 32 ++++++++++++++++++- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index 960d36a383..c5c53708d3 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -87,9 +87,9 @@ protected void activateTopologyRefreshIfNeeded() { } /** - * Disable periodic topology refresh. + * Suspend (cancel) periodic topology refresh. */ - public void shutdown() { + public void suspendTopologyRefresh() { if (clusterTopologyRefreshActivated.compareAndSet(true, false)) { @@ -104,6 +104,10 @@ public void shutdown() { } } + public boolean isTopologyRefreshInProgress() { + return clusterTopologyRefreshTask.get(); + } + @Override public void run() { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 090db439a2..93c2958a23 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -34,7 +34,6 @@ import java.util.function.Predicate; import java.util.function.Supplier; -import reactor.core.publisher.Mono; import io.lettuce.core.*; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.cluster.api.NodeSelectionSupport; @@ -68,6 +67,7 @@ import io.lettuce.core.resource.ClientResources; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import reactor.core.publisher.Mono; /** * A scalable and thread-safe Redis cluster client supporting synchronous, asynchronous and @@ -177,8 +177,8 @@ protected RedisClusterClient() { * cluster. If any uri is successful for connection, the others are not tried anymore. The initial uri is needed to discover * the cluster structure for distributing the requests. * - * @param clientResources the client resources. If {@code null}, the client will create a new dedicated instance of - * client resources and keep track of them. + * @param clientResources the client resources. If {@code null}, the client will create a new dedicated instance of client + * resources and keep track of them. * @param redisURIs iterable of initial {@link RedisURI cluster URIs}. Must not be {@code null} and not empty. */ protected RedisClusterClient(ClientResources clientResources, Iterable redisURIs) { @@ -905,6 +905,26 @@ public CompletionStage refreshPartitionsAsync() { }).whenComplete((unused, throwable) -> event.record()); } + /** + * Suspend periodic topology refresh if it was activated previously. Suspending cancels the periodic schedule without + * interrupting any running topology refresh. Suspension is in place until obtaining a new {@link #connect connection}. + * + * @since 6.3 + */ + public void suspendTopologyRefresh() { + topologyRefreshScheduler.suspendTopologyRefresh(); + } + + /** + * Return whether a scheduled or adaptive topology refresh is in progress. + * + * @return {@code true} if a topology refresh is in progress. + * @since 6.3 + */ + public boolean isTopologyRefreshInProgress() { + return topologyRefreshScheduler.isTopologyRefreshInProgress(); + } + protected void updatePartitionsInConnections() { forEachClusterConnection(input -> { @@ -1074,7 +1094,7 @@ public void setPartitions(Partitions partitions) { @Override public CompletableFuture shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) { - topologyRefreshScheduler.shutdown(); + suspendTopologyRefresh(); return super.shutdownAsync(quietPeriod, timeout, timeUnit); } diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java index e819d04fb0..851d8dd3c9 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java @@ -178,7 +178,7 @@ void clusterConnectionShouldSetClientName() { } @Test - void pubSubclusterConnectionShouldSetClientName() { + void pubSubClusterConnectionShouldSetClientName() { StatefulRedisClusterPubSubConnection connection = clusterClient.connectPubSub(); @@ -203,6 +203,36 @@ void reloadPartitions() { assertThat(clusterClient.getPartitions()).hasSize(4); } + @Test + void suspendedTopologyRefreshCanBeResumed() { + + RedisClusterClient client = RedisClusterClient.create(clusterClient.getResources(), + RedisURI.Builder.redis(host, ClusterTestSettings.port1).build()); + try { + + client.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() + .enablePeriodicRefresh(true).refreshPeriod(Duration.ofMillis(200)).build()).build()); + client.connect().close(); + + Wait.untilTrue(client::isTopologyRefreshInProgress).during(Duration.ofSeconds(5)).waitOrTimeout(); + + client.suspendTopologyRefresh(); + + Wait.untilTrue(() -> !client.isTopologyRefreshInProgress()).during(Duration.ofSeconds(5)).waitOrTimeout(); + + client.getPartitions().clear(); + client.getPartitions().updateCache(); + + client.connect().close(); + Wait.untilTrue(client::isTopologyRefreshInProgress).during(Duration.ofSeconds(5)).waitOrTimeout(); + Wait.untilTrue(() -> !client.isTopologyRefreshInProgress()).during(Duration.ofSeconds(5)).waitOrTimeout(); + + assertThat(client.getPartitions()).isNotEmpty(); + } finally { + FastShutdown.shutdown(client); + } + } + @Test void reloadPartitionsWithDynamicSourcesFallsBackToInitialSeedNodes() {