Skip to content

Commit

Permalink
Add test for connection handshake failures #1262
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Apr 14, 2020
1 parent 9691645 commit 82ddb31
Showing 1 changed file with 75 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.inject.Inject;
Expand All @@ -42,10 +41,12 @@
import io.lettuce.core.event.Event;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.NettyCustomizer;
import io.lettuce.test.*;
import io.lettuce.test.resource.FastShutdown;
import io.lettuce.test.server.RandomResponseServer;
import io.lettuce.test.settings.TestSettings;
import io.netty.channel.Channel;
import io.netty.channel.local.LocalAddress;

/**
Expand Down Expand Up @@ -292,6 +293,76 @@ void emitEventOnReconnectFailure() throws Exception {
}
}

@Test
void pingOnConnectFailureShouldCloseConnection() throws Exception {

AtomicReference<Channel> ref = new AtomicReference<>();
ClientResources clientResources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() {
@Override
public void afterChannelInitialized(Channel channel) {
ref.set(channel);
}
}).build();

// Cluster node with auth
RedisURI redisUri = RedisURI.create(TestSettings.host(), 7385);
RedisClient client = RedisClient.create(clientResources);

client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());

try {
client.connect(redisUri);
fail("Missing Exception");
} catch (Exception e) {
assertThat(ref.get().isOpen()).isFalse();
assertThat(ref.get().isRegistered()).isFalse();
} finally {
FastShutdown.shutdown(client);
FastShutdown.shutdown(clientResources);
}
}

@Test
void pingOnConnectFailureShouldCloseConnectionOnReconnect() throws Exception {

BlockingQueue<Channel> ref = new LinkedBlockingQueue<>();
ClientResources clientResources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() {
@Override
public void afterChannelInitialized(Channel channel) {
ref.add(channel);
}
}).build();

RedisURI redisUri = RedisURI.create(TestSettings.host(), TestSettings.port());
RedisClient client = RedisClient.create(clientResources, redisUri);
client.setOptions(ClientOptions.builder().pingBeforeActivateConnection(true).build());

StatefulRedisConnection<String, String> connection = client.connect();

ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection);
connectionWatchdog.setListenOnChannelInactive(false);
connection.async().quit();

// Cluster node with auth
redisUri.setPort(7385);

connectionWatchdog.setListenOnChannelInactive(true);
connectionWatchdog.scheduleReconnect();

Wait.untilTrue(() -> ref.size() > 1).waitOrTimeout();

redisUri.setPort(TestSettings.port());

Channel initial = ref.take();
assertThat(initial.isOpen()).isFalse();

Channel reconnect = ref.take();
assertThat(reconnect.isOpen()).isFalse();

FastShutdown.shutdown(client);
FastShutdown.shutdown(clientResources);
}

/**
* Expect to disable {@link ConnectionWatchdog} when closing a broken connection.
*/
Expand Down

0 comments on commit 82ddb31

Please sign in to comment.