Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown ClusterTopologyRefreshTask properly #2985

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ public void suspendTopologyRefresh() {
}
}

/**
* Cancel any scheduled or running topology refresh tasks.
*/
public void cancelTopologyRefreshTask() {
if (clusterTopologyRefreshTask.get()) {
clusterTopologyRefreshTask.cancel();
}
}

public boolean isTopologyRefreshInProgress() {
return clusterTopologyRefreshTask.get();
}
Expand Down Expand Up @@ -317,12 +326,18 @@ private static class ClusterTopologyRefreshTask extends AtomicBoolean implements

private final Supplier<CompletionStage<?>> reloadTopologyAsync;

private final AtomicBoolean isCanceled = new AtomicBoolean(false);

ClusterTopologyRefreshTask(Supplier<CompletionStage<?>> reloadTopologyAsync) {
this.reloadTopologyAsync = reloadTopologyAsync;
}

public void run() {

if (isCanceled.get()) {
return;
}

if (compareAndSet(false, true)) {
doRun();
return;
Expand Down Expand Up @@ -352,6 +367,10 @@ void doRun() {
}
}

void cancel() {
isCanceled.set(true);
}

}

}
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,17 @@ public void suspendTopologyRefresh() {
topologyRefreshScheduler.suspendTopologyRefresh();
}

/**
* Cancel any running topology refresh tasks. This method calls
* {@link ClusterTopologyRefreshScheduler#cancelTopologyRefreshTask()} to ensure that any ongoing refresh tasks are properly
* stopped. It is typically called during the shutdown process to clean up resources and prevent any further topology
* refresh operations.
*
*/
public void cancelTopologyRefresh() {
topologyRefreshScheduler.cancelTopologyRefreshTask();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a suspendTopologyRefresh method invoked on Client.shutdown which will disable periodic topology refresh.
To my understanding what we are missing is cancelation of already submitted TopologyRefreshTask (if any) and also a logic for preventing submission of new one after Client.shutdown is initiated. Brief look at the code shows that the other code path triggering TopologyRefreshTask is when certain cluster events happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks for review 🙇

I think we are missing to cancel the running task, so cancelTopologyRefresh method is to do that.

@tishun @mp911de Do you have any suggestions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @thachlp , I will try to spend some time for this issue on Friday

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @thachlp ,

I think that, based on the comment in #2904 the request is to cancel any topology tasks automatically when we initiate shutdown:

When I close the RedisClusterClient by invoking RedisClusterClient.shutdown method, there is a chance that the ClusterTopologyRefreshTask is not stopped.
The issue was supposed to be resolved with #656 but it seems it was not completely working.

In #656 the idea of the fix was to drain all the existing cluster connections and cancel them upon shutdown. The user was never asked to call another method (and IMHO should not be asked)


/**
* Return whether a scheduled or adaptive topology refresh is in progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,4 +712,23 @@ public void clear() {

}

@Test
void shouldCancelTopologyRefreshTaskOnShutdown() {
ClusterTopologyRefreshOptions refreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(Duration.ofMillis(200)).build();
RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(),
RedisURI.Builder.redis(TestSettings.host(), ClusterTestSettings.port1).build());

clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(refreshOptions).build());
clusterClient.connect().sync();
Delay.delay(Duration.ofMillis(1500));
Wait.untilTrue(clusterClient::isTopologyRefreshInProgress).during(Duration.ofSeconds(5)).waitOrTimeout();

clusterClient.shutdownAsync(0, 10, TimeUnit.SECONDS).join();
Wait.untilTrue(() -> !clusterClient.isTopologyRefreshInProgress()).during(Duration.ofSeconds(5)).waitOrTimeout();

assertThat(clusterClient.isTopologyRefreshInProgress()).isFalse();
FastShutdown.shutdown(clusterClient);
}

}
Loading