From 82ddb31b62e2dccb754afb52e8acc0cd842f29df Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 14 Apr 2020 22:41:10 +0200 Subject: [PATCH] Add test for connection handshake failures #1262 --- .../ConnectionFailureIntegrationTests.java | 79 ++++++++++++++++++- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java b/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java index 7b86b204ff..5110a5da9e 100644 --- a/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java +++ b/src/test/java/io/lettuce/core/protocol/ConnectionFailureIntegrationTests.java @@ -17,16 +17,15 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; import java.net.InetSocketAddress; import java.time.Duration; import java.util.Comparator; import java.util.List; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.inject.Inject; @@ -42,10 +41,12 @@ import io.lettuce.core.event.Event; import io.lettuce.core.event.connection.ReconnectFailedEvent; import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.NettyCustomizer; import io.lettuce.test.*; import io.lettuce.test.resource.FastShutdown; import io.lettuce.test.server.RandomResponseServer; import io.lettuce.test.settings.TestSettings; +import io.netty.channel.Channel; import io.netty.channel.local.LocalAddress; /** @@ -292,6 +293,76 @@ void emitEventOnReconnectFailure() throws Exception { } } + @Test + void pingOnConnectFailureShouldCloseConnection() throws Exception { + + AtomicReference ref = new AtomicReference<>(); + ClientResources clientResources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() { + @Override + public void afterChannelInitialized(Channel channel) { + ref.set(channel); + } + }).build(); + + // Cluster node with auth + RedisURI redisUri = RedisURI.create(TestSettings.host(), 7385); + RedisClient client = RedisClient.create(clientResources); + + client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build()); + + try { + client.connect(redisUri); + fail("Missing Exception"); + } catch (Exception e) { + assertThat(ref.get().isOpen()).isFalse(); + assertThat(ref.get().isRegistered()).isFalse(); + } finally { + FastShutdown.shutdown(client); + FastShutdown.shutdown(clientResources); + } + } + + @Test + void pingOnConnectFailureShouldCloseConnectionOnReconnect() throws Exception { + + BlockingQueue ref = new LinkedBlockingQueue<>(); + ClientResources clientResources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() { + @Override + public void afterChannelInitialized(Channel channel) { + ref.add(channel); + } + }).build(); + + RedisURI redisUri = RedisURI.create(TestSettings.host(), TestSettings.port()); + RedisClient client = RedisClient.create(clientResources, redisUri); + client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build()); + + StatefulRedisConnection connection = client.connect(); + + ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection); + connectionWatchdog.setListenOnChannelInactive(false); + connection.async().quit(); + + // Cluster node with auth + redisUri.setPort(7385); + + connectionWatchdog.setListenOnChannelInactive(true); + connectionWatchdog.scheduleReconnect(); + + Wait.untilTrue(() -> ref.size() > 1).waitOrTimeout(); + + redisUri.setPort(TestSettings.port()); + + Channel initial = ref.take(); + assertThat(initial.isOpen()).isFalse(); + + Channel reconnect = ref.take(); + assertThat(reconnect.isOpen()).isFalse(); + + FastShutdown.shutdown(client); + FastShutdown.shutdown(clientResources); + } + /** * Expect to disable {@link ConnectionWatchdog} when closing a broken connection. */