diff --git a/src/main/java/io/lettuce/core/cluster/topology/Connections.java b/src/main/java/io/lettuce/core/cluster/topology/Connections.java index 35d4e4ac1b..4dddcce32c 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/Connections.java +++ b/src/main/java/io/lettuce/core/cluster/topology/Connections.java @@ -15,10 +15,8 @@ */ package io.lettuce.core.cluster.topology; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; +import java.util.concurrent.CompletableFuture; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; @@ -31,6 +29,7 @@ /** * @author Mark Paluch + * @author Christian Weitendorf */ class Connections { @@ -53,19 +52,19 @@ private Connections(Map> conne */ public void addConnection(RedisURI redisURI, StatefulRedisConnection connection) { - if (closed) { // fastpath - connection.close(); + if (this.closed) { // fastpath + connection.closeAsync(); return; } - synchronized (connections) { + synchronized (this.connections) { - if (closed) { - connection.close(); + if (this.closed) { + connection.closeAsync(); return; } - connections.put(redisURI, connection); + this.connections.put(redisURI, connection); } } @@ -73,8 +72,8 @@ public void addConnection(RedisURI redisURI, StatefulRedisConnection> entry : connections.entrySet()) { + for (Map.Entry> entry : this.connections.entrySet()) { CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.NODES); Command command = new Command<>(CommandType.CLUSTER, new StatusOutput<>(StringCodec.UTF8), @@ -110,7 +109,7 @@ public Requests requestClients() { Requests requests = new Requests(); - for (Map.Entry> entry : connections.entrySet()) { + for (Map.Entry> entry : this.connections.entrySet()) { CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.LIST); Command command = new Command<>(CommandType.CLIENT, new StatusOutput<>(StringCodec.UTF8), @@ -131,16 +130,19 @@ public void close() { this.closed = true; + List> closeFutures = new ArrayList<>(); while (hasConnections()) { - for (StatefulRedisConnection connection : drainConnections()) { - connection.closeAsync(); + closeFutures.add(connection.closeAsync()); } } + + CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0])).join(); } private boolean hasConnections() { - synchronized (connections) { + + synchronized (this.connections) { return !this.connections.isEmpty(); } } @@ -148,6 +150,7 @@ private boolean hasConnections() { private Collection> drainConnections() { Map> drainedConnections; + synchronized (this.connections) { drainedConnections = new HashMap<>(this.connections); diff --git a/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshTest.java b/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshTest.java index 7b2207d79b..e082575aba 100644 --- a/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshTest.java +++ b/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshTest.java @@ -53,7 +53,9 @@ /** * @author Mark Paluch + * @author Christian Weitendorf */ +@SuppressWarnings("unchecked") @RunWith(MockitoJUnitRunner.class) public class ClusterTopologyRefreshTest { @@ -96,6 +98,8 @@ public void before() { when(clientResources.dnsResolver()).thenReturn(DnsResolvers.JVM_DEFAULT); when(connection1.async()).thenReturn(asyncCommands1); when(connection2.async()).thenReturn(asyncCommands2); + when(connection1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(connection2.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); when(connection1.dispatch(any(RedisCommand.class))).thenAnswer(invocation -> { @@ -277,7 +281,7 @@ public void shouldFailIfNoNodeConnects() { @Test public void shouldShouldDiscoverNodes() { - List seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380)); + List seed = Collections.singletonList(RedisURI.create("127.0.0.1", 7380)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)))) .thenReturn(completedFuture((StatefulRedisConnection) connection1)); @@ -293,7 +297,7 @@ public void shouldShouldDiscoverNodes() { @Test public void shouldShouldNotDiscoverNodes() { - List seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380)); + List seed = Collections.singletonList(RedisURI.create("127.0.0.1", 7380)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)))) .thenReturn(completedFuture((StatefulRedisConnection) connection1)); @@ -334,14 +338,14 @@ public void shouldCloseConnections() { sut.loadViews(seed, true); - verify(connection1).close(); - verify(connection2).close(); + verify(connection1).closeAsync(); + verify(connection2).closeAsync(); } @Test public void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() { - List seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380)); + List seed = Collections.singletonList(RedisURI.create("127.0.0.1", 7380)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)))) .thenReturn(completedFuture((StatefulRedisConnection) connection1)); @@ -359,7 +363,7 @@ public void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() { @Test public void discoveredAdditionalNodesShouldBeOrderedUsingClientCount() { - List seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380)); + List seed = Collections.singletonList(RedisURI.create("127.0.0.1", 7380)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)))) .thenReturn(completedFuture((StatefulRedisConnection) connection1)); @@ -379,7 +383,7 @@ public void discoveredAdditionalNodesShouldBeOrderedUsingClientCount() { @Test public void undiscoveredAdditionalNodesShouldBeLastUsingLatency() { - List seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380)); + List seed = Collections.singletonList(RedisURI.create("127.0.0.1", 7380)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)))) .thenReturn(completedFuture((StatefulRedisConnection) connection1)); @@ -397,7 +401,7 @@ public void undiscoveredAdditionalNodesShouldBeLastUsingLatency() { @Test public void discoveredAdditionalNodesShouldBeOrderedUsingLatency() { - List seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380)); + List seed = Collections.singletonList(RedisURI.create("127.0.0.1", 7380)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)))) .thenReturn(completedFuture((StatefulRedisConnection) connection1)); diff --git a/src/test/java/io/lettuce/core/cluster/topology/ConnectionsTest.java b/src/test/java/io/lettuce/core/cluster/topology/ConnectionsTest.java index 1e63e38c58..6be149a1f7 100644 --- a/src/test/java/io/lettuce/core/cluster/topology/ConnectionsTest.java +++ b/src/test/java/io/lettuce/core/cluster/topology/ConnectionsTest.java @@ -1,14 +1,39 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.lettuce.core.cluster.topology; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; -import io.lettuce.core.RedisURI; -import io.lettuce.core.api.StatefulRedisConnection; +import java.util.concurrent.CompletableFuture; + +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; + +/** + * @author Christian Weitendorf + * @author Mark Paluch + */ @RunWith(MockitoJUnitRunner.class) public class ConnectionsTest { @@ -18,15 +43,34 @@ public class ConnectionsTest { @Mock private StatefulRedisConnection connection2; + @Before + public void before() throws Exception { + + when(connection1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(connection2.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + } + @Test public void shouldCloseAllConnections() { - final Connections iut = new Connections(); - iut.addConnection(RedisURI.create("127.0.0.1", 7380), connection1); - iut.addConnection(RedisURI.create("127.0.0.1", 7381), connection2); - iut.close(); + Connections sut = new Connections(); + sut.addConnection(RedisURI.create("127.0.0.1", 7380), connection1); + sut.addConnection(RedisURI.create("127.0.0.1", 7381), connection2); + + sut.close(); + + verify(connection1).closeAsync(); + verify(connection2).closeAsync(); + } + + @Test + public void shouldCloseAllConnectionsAfterCloseSignal() { + + Connections sut = new Connections(); + sut.close(); + verifyZeroInteractions(connection1); - verify(connection1).close(); - verify(connection2).close(); + sut.addConnection(RedisURI.create("127.0.0.1", 7381), connection1); + verify(connection1).closeAsync(); } -} \ No newline at end of file +}