Skip to content

Commit

Permalink
Guard AbstractRedisClient.shutdown(…) against races in closing resour…
Browse files Browse the repository at this point in the history
…ces #1800

shutdown now uses copies of mutable collections to process shutdown signals to avoid concurrent modification exceptions.
  • Loading branch information
mp911de committed Jul 13, 2021
1 parent bb101b6 commit 2185357
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,9 @@ public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, Tim
private CompletableFuture<Void> closeResources() {

List<CompletionStage<Void>> closeFutures = new ArrayList<>();
List<Closeable> closeableResources = new ArrayList<>(this.closeableResources);

while (!closeableResources.isEmpty()) {
Closeable closeableResource = closeableResources.iterator().next();
for (Closeable closeableResource : closeableResources) {

if (closeableResource instanceof AsyncCloseable) {

Expand All @@ -560,10 +560,14 @@ private CompletableFuture<Void> closeResources() {
logger.debug("Exception on Close: " + e.getMessage(), e);
}
}
closeableResources.remove(closeableResource);
this.closeableResources.remove(closeableResource);
}

for (Channel c : channels) {
for (Channel c : channels.toArray(new Channel[0])) {

if (c == null) {
continue;
}

ChannelPipeline pipeline = c.pipeline();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,22 @@ void clusterAuth() {
FastShutdown.shutdown(clusterClient);
}

@Test
void shutdownWithOpenConnectionShouldCloseCorrectly() {

ClusterTopologyRefreshOptions refreshOptions = ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers().enablePeriodicRefresh(Duration.ofSeconds(1)).build();

RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(),
RedisURI.Builder.redis(TestSettings.host(), ClusterTestSettings.port1).build());

clusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(refreshOptions).build());

clusterClient.connect().sync();

FastShutdown.shutdown(clusterClient);
}

@Test
void partitionRetrievalShouldFail() {

Expand Down

0 comments on commit 2185357

Please sign in to comment.