Skip to content

Commit

Permalink
Expose methods to suspend periodic topology refresh and to check whet…
Browse files Browse the repository at this point in the history
…her a topology refresh is running #2428
  • Loading branch information
mp911de committed Jul 6, 2023
1 parent 2ad862f commit fe782d2
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ protected void activateTopologyRefreshIfNeeded() {
}

/**
* Disable periodic topology refresh.
* Suspend (cancel) periodic topology refresh.
*/
public void shutdown() {
public void suspendTopologyRefresh() {

if (clusterTopologyRefreshActivated.compareAndSet(true, false)) {

Expand All @@ -104,6 +104,10 @@ public void shutdown() {
}
}

public boolean isTopologyRefreshInProgress() {
return clusterTopologyRefreshTask.get();
}

@Override
public void run() {

Expand Down
28 changes: 24 additions & 4 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.api.NodeSelectionSupport;
Expand Down Expand Up @@ -68,6 +67,7 @@
import io.lettuce.core.resource.ClientResources;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import reactor.core.publisher.Mono;

/**
* A scalable and thread-safe <a href="https://redis.io/">Redis</a> cluster client supporting synchronous, asynchronous and
Expand Down Expand Up @@ -177,8 +177,8 @@ protected RedisClusterClient() {
* cluster. If any uri is successful for connection, the others are not tried anymore. The initial uri is needed to discover
* the cluster structure for distributing the requests.
*
* @param clientResources the client resources. If {@code null}, the client will create a new dedicated instance of
* client resources and keep track of them.
* @param clientResources the client resources. If {@code null}, the client will create a new dedicated instance of client
* resources and keep track of them.
* @param redisURIs iterable of initial {@link RedisURI cluster URIs}. Must not be {@code null} and not empty.
*/
protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {
Expand Down Expand Up @@ -905,6 +905,26 @@ public CompletionStage<Void> refreshPartitionsAsync() {
}).whenComplete((unused, throwable) -> event.record());
}

/**
* Suspend periodic topology refresh if it was activated previously. Suspending cancels the periodic schedule without
* interrupting any running topology refresh. Suspension is in place until obtaining a new {@link #connect connection}.
*
* @since 6.3
*/
public void suspendTopologyRefresh() {
topologyRefreshScheduler.suspendTopologyRefresh();
}

/**
* Return whether a scheduled or adaptive topology refresh is in progress.
*
* @return {@code true} if a topology refresh is in progress.
* @since 6.3
*/
public boolean isTopologyRefreshInProgress() {
return topologyRefreshScheduler.isTopologyRefreshInProgress();
}

protected void updatePartitionsInConnections() {

forEachClusterConnection(input -> {
Expand Down Expand Up @@ -1074,7 +1094,7 @@ public void setPartitions(Partitions partitions) {
@Override
public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) {

topologyRefreshScheduler.shutdown();
suspendTopologyRefresh();

return super.shutdownAsync(quietPeriod, timeout, timeUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void clusterConnectionShouldSetClientName() {
}

@Test
void pubSubclusterConnectionShouldSetClientName() {
void pubSubClusterConnectionShouldSetClientName() {

StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();

Expand All @@ -203,6 +203,36 @@ void reloadPartitions() {
assertThat(clusterClient.getPartitions()).hasSize(4);
}

@Test
void suspendedTopologyRefreshCanBeResumed() {

RedisClusterClient client = RedisClusterClient.create(clusterClient.getResources(),
RedisURI.Builder.redis(host, ClusterTestSettings.port1).build());
try {

client.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(true).refreshPeriod(Duration.ofMillis(200)).build()).build());
client.connect().close();

Wait.untilTrue(client::isTopologyRefreshInProgress).during(Duration.ofSeconds(5)).waitOrTimeout();

client.suspendTopologyRefresh();

Wait.untilTrue(() -> !client.isTopologyRefreshInProgress()).during(Duration.ofSeconds(5)).waitOrTimeout();

client.getPartitions().clear();
client.getPartitions().updateCache();

client.connect().close();
Wait.untilTrue(client::isTopologyRefreshInProgress).during(Duration.ofSeconds(5)).waitOrTimeout();
Wait.untilTrue(() -> !client.isTopologyRefreshInProgress()).during(Duration.ofSeconds(5)).waitOrTimeout();

assertThat(client.getPartitions()).isNotEmpty();
} finally {
FastShutdown.shutdown(client);
}
}

@Test
void reloadPartitionsWithDynamicSourcesFallsBackToInitialSeedNodes() {

Expand Down

0 comments on commit fe782d2

Please sign in to comment.