From 34d1ae68fbfed9298185f738ff5f6ecbaac86445 Mon Sep 17 00:00:00 2001 From: Thach Le Date: Thu, 12 Sep 2024 11:59:59 +0700 Subject: [PATCH 1/6] Shutdown ClusterTopologyRefreshTask when RedisClusterClient is shutdown --- .../ClusterTopologyRefreshScheduler.java | 19 +++++++++++++++++++ .../core/cluster/RedisClusterClient.java | 11 +++++++++++ .../RedisClusterClientIntegrationTests.java | 17 +++++++++++++++++ 3 files changed, 47 insertions(+) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index b47b386d2c..7c7e86a085 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -108,6 +108,15 @@ public void suspendTopologyRefresh() { } } + /** + * Cancel any scheduled or running topology refresh tasks. + */ + public void cancelTopologyRefreshTask() { + if (clusterTopologyRefreshTask.get()) { + clusterTopologyRefreshTask.cancel(); + } + } + public boolean isTopologyRefreshInProgress() { return clusterTopologyRefreshTask.get(); } @@ -317,12 +326,18 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements private final Supplier> reloadTopologyAsync; + private final AtomicBoolean isCanceled = new AtomicBoolean(false); + ClusterTopologyRefreshTask(Supplier> reloadTopologyAsync) { this.reloadTopologyAsync = reloadTopologyAsync; } public void run() { + if (isCanceled.get()) { + return; + } + if (compareAndSet(false, true)) { doRun(); return; @@ -352,6 +367,10 @@ void doRun() { } } + void cancel() { + isCanceled.set(true); + } + } } diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 31125d93c7..cfd0a7a80f 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -972,6 +972,17 @@ public void suspendTopologyRefresh() { topologyRefreshScheduler.suspendTopologyRefresh(); } + /** + * Cancel any running topology refresh tasks. This method calls + * {@link ClusterTopologyRefreshScheduler#cancelTopologyRefreshTask()} to ensure that any ongoing refresh tasks are properly + * stopped. It is typically called during the shutdown process to clean up resources and prevent any further topology + * refresh operations. + * + */ + public void cancelTopologyRefresh() { + topologyRefreshScheduler.cancelTopologyRefreshTask(); + } + /** * Return whether a scheduled or adaptive topology refresh is in progress. * diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java index 00b87784f6..b30fbdefcd 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java @@ -715,4 +715,21 @@ public void clear() { } + @Test + void shouldCancelTopologyRefreshTaskOnShutdown() { + ClusterTopologyRefreshOptions refreshOptions = ClusterTopologyRefreshOptions.builder() + .enablePeriodicRefresh(Duration.ofSeconds(1)).build(); + RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(), + RedisURI.Builder.redis(TestSettings.host(), ClusterTestSettings.port1).build()); + clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(refreshOptions).build()); + clusterClient.connect().sync(); + Delay.delay(Duration.ofMillis(1500)); + assertThat(clusterClient.isTopologyRefreshInProgress()).isTrue(); + + clusterClient.shutdownAsync(0, 10, TimeUnit.SECONDS).join(); + + assertThat(clusterClient.isTopologyRefreshInProgress()).isFalse(); + FastShutdown.shutdown(clusterClient); + } + } From d8507a5825879b47355f68e391698fce43fed299 Mon Sep 17 00:00:00 2001 From: Thach Le Date: Thu, 12 Sep 2024 13:17:45 +0700 Subject: [PATCH 2/6] Update test --- .../core/cluster/RedisClusterClientIntegrationTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java index b30fbdefcd..b6a7916734 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java @@ -718,15 +718,17 @@ public void clear() { @Test void shouldCancelTopologyRefreshTaskOnShutdown() { ClusterTopologyRefreshOptions refreshOptions = ClusterTopologyRefreshOptions.builder() - .enablePeriodicRefresh(Duration.ofSeconds(1)).build(); + .enablePeriodicRefresh(Duration.ofMillis(200)).build(); RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(), RedisURI.Builder.redis(TestSettings.host(), ClusterTestSettings.port1).build()); + clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(refreshOptions).build()); clusterClient.connect().sync(); Delay.delay(Duration.ofMillis(1500)); - assertThat(clusterClient.isTopologyRefreshInProgress()).isTrue(); + Wait.untilTrue(clusterClient::isTopologyRefreshInProgress).during(Duration.ofSeconds(5)).waitOrTimeout(); clusterClient.shutdownAsync(0, 10, TimeUnit.SECONDS).join(); + Wait.untilTrue(() -> !clusterClient.isTopologyRefreshInProgress()).during(Duration.ofSeconds(5)).waitOrTimeout(); assertThat(clusterClient.isTopologyRefreshInProgress()).isFalse(); FastShutdown.shutdown(clusterClient); From 5952b0bfb2bc4448b1a17b0483a312217c52f23f Mon Sep 17 00:00:00 2001 From: Thach Le Date: Tue, 31 Dec 2024 11:37:43 +0700 Subject: [PATCH 3/6] Revert change by comment --- .../ClusterTopologyRefreshScheduler.java | 19 ------------------- .../core/cluster/RedisClusterClient.java | 11 ----------- .../RedisClusterClientIntegrationTests.java | 19 ------------------- 3 files changed, 49 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index 7c7e86a085..b47b386d2c 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -108,15 +108,6 @@ public void suspendTopologyRefresh() { } } - /** - * Cancel any scheduled or running topology refresh tasks. - */ - public void cancelTopologyRefreshTask() { - if (clusterTopologyRefreshTask.get()) { - clusterTopologyRefreshTask.cancel(); - } - } - public boolean isTopologyRefreshInProgress() { return clusterTopologyRefreshTask.get(); } @@ -326,18 +317,12 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements private final Supplier> reloadTopologyAsync; - private final AtomicBoolean isCanceled = new AtomicBoolean(false); - ClusterTopologyRefreshTask(Supplier> reloadTopologyAsync) { this.reloadTopologyAsync = reloadTopologyAsync; } public void run() { - if (isCanceled.get()) { - return; - } - if (compareAndSet(false, true)) { doRun(); return; @@ -367,10 +352,6 @@ void doRun() { } } - void cancel() { - isCanceled.set(true); - } - } } diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index cfd0a7a80f..31125d93c7 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -972,17 +972,6 @@ public void suspendTopologyRefresh() { topologyRefreshScheduler.suspendTopologyRefresh(); } - /** - * Cancel any running topology refresh tasks. This method calls - * {@link ClusterTopologyRefreshScheduler#cancelTopologyRefreshTask()} to ensure that any ongoing refresh tasks are properly - * stopped. It is typically called during the shutdown process to clean up resources and prevent any further topology - * refresh operations. - * - */ - public void cancelTopologyRefresh() { - topologyRefreshScheduler.cancelTopologyRefreshTask(); - } - /** * Return whether a scheduled or adaptive topology refresh is in progress. * diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java index b6a7916734..00b87784f6 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java @@ -715,23 +715,4 @@ public void clear() { } - @Test - void shouldCancelTopologyRefreshTaskOnShutdown() { - ClusterTopologyRefreshOptions refreshOptions = ClusterTopologyRefreshOptions.builder() - .enablePeriodicRefresh(Duration.ofMillis(200)).build(); - RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(), - RedisURI.Builder.redis(TestSettings.host(), ClusterTestSettings.port1).build()); - - clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(refreshOptions).build()); - clusterClient.connect().sync(); - Delay.delay(Duration.ofMillis(1500)); - Wait.untilTrue(clusterClient::isTopologyRefreshInProgress).during(Duration.ofSeconds(5)).waitOrTimeout(); - - clusterClient.shutdownAsync(0, 10, TimeUnit.SECONDS).join(); - Wait.untilTrue(() -> !clusterClient.isTopologyRefreshInProgress()).during(Duration.ofSeconds(5)).waitOrTimeout(); - - assertThat(clusterClient.isTopologyRefreshInProgress()).isFalse(); - FastShutdown.shutdown(clusterClient); - } - } From ed56fa4bd7423505321decff7bdf33ecbca4ec54 Mon Sep 17 00:00:00 2001 From: Thach Le Date: Tue, 31 Dec 2024 15:47:22 +0700 Subject: [PATCH 4/6] Add the track of the completion of the topology refresh --- .../cluster/ClusterTopologyRefreshScheduler.java | 14 ++++++++++++++ .../lettuce/core/cluster/RedisClusterClient.java | 8 +++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index b47b386d2c..9751bc0302 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -22,6 +22,7 @@ import static io.lettuce.core.event.cluster.AdaptiveRefreshTriggeredEvent.*; import java.time.Duration; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -112,6 +113,10 @@ public boolean isTopologyRefreshInProgress() { return clusterTopologyRefreshTask.get(); } + public CompletableFuture getTopologyRefreshCompletionFuture() { + return clusterTopologyRefreshTask.getCompletionFuture(); + } + @Override public void run() { @@ -316,11 +321,16 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements private static final long serialVersionUID = -1337731371220365694L; private final Supplier> reloadTopologyAsync; + private final CompletableFuture completionFuture = new CompletableFuture<>(); ClusterTopologyRefreshTask(Supplier> reloadTopologyAsync) { this.reloadTopologyAsync = reloadTopologyAsync; } + public CompletableFuture getCompletionFuture() { + return completionFuture; + } + public void run() { if (compareAndSet(false, true)) { @@ -343,12 +353,16 @@ void doRun() { if (throwable != null) { logger.warn("Cannot refresh Redis Cluster topology", throwable); + completionFuture.completeExceptionally(throwable); + } else { + completionFuture.complete(null); } set(false); }); } catch (Exception e) { logger.warn("Cannot refresh Redis Cluster topology", e); + completionFuture.completeExceptionally(e); } } diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 31125d93c7..c660e52e72 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -1153,7 +1153,13 @@ public CompletableFuture shutdownAsync(long quietPeriod, long timeout, Tim suspendTopologyRefresh(); - return super.shutdownAsync(quietPeriod, timeout, timeUnit); + CompletableFuture topologyRefreshFuture = topologyRefreshScheduler.getTopologyRefreshCompletionFuture(); + + return topologyRefreshFuture.thenCompose(ignore -> super.shutdownAsync(quietPeriod, timeout, timeUnit)) + .exceptionally(ex -> { + System.err.println("Error during topology refresh or shutdown: " + ex.getMessage()); + return null; + }); } // ------------------------------------------------------------------------- From 83a470e550efed3bb33060b3a87352f8894f5747 Mon Sep 17 00:00:00 2001 From: Thach Le Date: Tue, 31 Dec 2024 15:48:52 +0700 Subject: [PATCH 5/6] Format --- .../lettuce/core/cluster/ClusterTopologyRefreshScheduler.java | 1 + src/main/java/io/lettuce/core/cluster/RedisClusterClient.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index 9751bc0302..8fbcd0072c 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -321,6 +321,7 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements private static final long serialVersionUID = -1337731371220365694L; private final Supplier> reloadTopologyAsync; + private final CompletableFuture completionFuture = new CompletableFuture<>(); ClusterTopologyRefreshTask(Supplier> reloadTopologyAsync) { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index c660e52e72..00d7272cca 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -1159,7 +1159,7 @@ public CompletableFuture shutdownAsync(long quietPeriod, long timeout, Tim .exceptionally(ex -> { System.err.println("Error during topology refresh or shutdown: " + ex.getMessage()); return null; - }); + }); } // ------------------------------------------------------------------------- From 2f38c3c319cfc2c2b248657030e3bed4150f2705 Mon Sep 17 00:00:00 2001 From: Thach Le Date: Thu, 2 Jan 2025 15:21:39 +0700 Subject: [PATCH 6/6] Add the track of the completion of the topology refresh cancel --- .../ClusterTopologyRefreshScheduler.java | 30 ++++++++----------- .../core/cluster/RedisClusterClient.java | 14 ++------- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java index 8fbcd0072c..f1ca924b79 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterTopologyRefreshScheduler.java @@ -94,29 +94,33 @@ protected void activateTopologyRefreshIfNeeded() { /** * Suspend (cancel) periodic topology refresh. */ - public void suspendTopologyRefresh() { + public CompletableFuture suspendTopologyRefresh() { + CompletableFuture completionFuture = new CompletableFuture<>(); if (clusterTopologyRefreshActivated.compareAndSet(true, false)) { - ScheduledFuture scheduledFuture = clusterTopologyRefreshFuture.get(); try { - scheduledFuture.cancel(false); - clusterTopologyRefreshFuture.set(null); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + clusterTopologyRefreshFuture.set(null); + } + completionFuture.complete(null); } catch (Exception e) { logger.debug("Could not cancel Cluster topology refresh", e); + completionFuture.completeExceptionally(e); } + } else { + completionFuture.complete(null); } + + return completionFuture; } public boolean isTopologyRefreshInProgress() { return clusterTopologyRefreshTask.get(); } - public CompletableFuture getTopologyRefreshCompletionFuture() { - return clusterTopologyRefreshTask.getCompletionFuture(); - } - @Override public void run() { @@ -322,16 +326,10 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements private final Supplier> reloadTopologyAsync; - private final CompletableFuture completionFuture = new CompletableFuture<>(); - ClusterTopologyRefreshTask(Supplier> reloadTopologyAsync) { this.reloadTopologyAsync = reloadTopologyAsync; } - public CompletableFuture getCompletionFuture() { - return completionFuture; - } - public void run() { if (compareAndSet(false, true)) { @@ -354,16 +352,12 @@ void doRun() { if (throwable != null) { logger.warn("Cannot refresh Redis Cluster topology", throwable); - completionFuture.completeExceptionally(throwable); - } else { - completionFuture.complete(null); } set(false); }); } catch (Exception e) { logger.warn("Cannot refresh Redis Cluster topology", e); - completionFuture.completeExceptionally(e); } } diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 00d7272cca..e047717bc9 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -968,8 +968,8 @@ public CompletionStage refreshPartitionsAsync() { * * @since 6.3 */ - public void suspendTopologyRefresh() { - topologyRefreshScheduler.suspendTopologyRefresh(); + public CompletableFuture suspendTopologyRefresh() { + return topologyRefreshScheduler.suspendTopologyRefresh(); } /** @@ -1151,15 +1151,7 @@ public void setPartitions(Partitions partitions) { @Override public CompletableFuture shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) { - suspendTopologyRefresh(); - - CompletableFuture topologyRefreshFuture = topologyRefreshScheduler.getTopologyRefreshCompletionFuture(); - - return topologyRefreshFuture.thenCompose(ignore -> super.shutdownAsync(quietPeriod, timeout, timeUnit)) - .exceptionally(ex -> { - System.err.println("Error during topology refresh or shutdown: " + ex.getMessage()); - return null; - }); + return suspendTopologyRefresh().thenCompose(voidResult -> super.shutdownAsync(quietPeriod, timeout, timeUnit)); } // -------------------------------------------------------------------------