From c5411d4a7aed4f2d48d23153a43c0f8ef4f1efb3 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 8 Jul 2015 12:44:27 +0200 Subject: [PATCH] Merge Decouple ConnectionWatchdog reconnect from timer thread #100 Motivation: Netty's HashedWheelTimer runs single threaded and processes, therefore, one task at a time. lettuce's ConnectionWatchdog uses a blocking call (without a timeout) to the initializing future that may block the timer thread indefinitely. The blocking call stops the reconnection for all connections because the processing thread is blocked. --- .../redis/PlainChannelInitializer.java | 5 -- .../redis/RedisChannelHandler.java | 8 +-- .../redis/protocol/ChannelLogDescriptor.java | 32 +++++++++++ .../redis/protocol/CommandHandler.java | 30 ++++++---- .../redis/protocol/ConnectionWatchdog.java | 56 +++++++++++++++---- .../MyExtendedRedisClientTest.java | 11 ++-- src/test/java/com/lambdaworks/SslTest.java | 2 +- .../redis/AbstractRedisClientTest.java | 2 +- .../com/lambdaworks/redis/AllTheAPIsTest.java | 2 +- .../com/lambdaworks/redis/ClientTest.java | 8 +-- .../redis/ConnectionCommandTest.java | 2 +- .../lambdaworks/redis/DefaultRedisClient.java | 2 +- .../com/lambdaworks/redis/FastShutdown.java | 19 +++++++ .../lambdaworks/redis/PoolConnectionTest.java | 2 +- .../redis/UnixDomainSocketTest.java | 8 +-- .../redis/cluster/AbstractClusterTest.java | 3 +- .../redis/cluster/ClusterCommandTest.java | 6 +- .../cluster/ClusterReactiveCommandTest.java | 5 +- .../redis/cluster/ClusterRule.java | 3 +- .../redis/cluster/RedisClusterClientTest.java | 8 +-- .../redis/cluster/RedisClusterSetupTest.java | 33 +++++------ .../RedisClusterStressScenariosTest.java | 2 +- .../commands/GeoClusterCommandTest.java | 3 +- .../commands/HashClusterCommandTest.java | 3 +- .../commands/ListClusterCommandTest.java | 3 +- .../commands/StringClusterCommandTest.java | 3 +- .../commands/rx/HashClusterRxCommandTest.java | 3 +- .../commands/rx/ListClusterRxCommandTest.java | 3 +- .../rx/StringClusterRxCommandTest.java | 3 +- .../redis/issue42/BreakClusterClientTest.java | 3 +- .../redis/pubsub/PubSubCommandTest.java | 19 ++++--- .../redis/pubsub/PubSubRxTest.java | 3 +- .../redis/sentinel/AbstractSentinelTest.java | 10 ++-- .../redis/sentinel/SentinelCommandTest.java | 16 ++---- .../redis/sentinel/SentinelFailoverTest.java | 3 +- .../redis/server/RandomResponseServer.java | 6 +- src/test/resources/log4j.properties | 2 +- 37 files changed, 215 insertions(+), 117 deletions(-) create mode 100644 src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java create mode 100644 src/test/java/com/lambdaworks/redis/FastShutdown.java diff --git a/src/main/java/com/lambdaworks/redis/PlainChannelInitializer.java b/src/main/java/com/lambdaworks/redis/PlainChannelInitializer.java index 143c137dc7..382ea082ab 100644 --- a/src/main/java/com/lambdaworks/redis/PlainChannelInitializer.java +++ b/src/main/java/com/lambdaworks/redis/PlainChannelInitializer.java @@ -50,11 +50,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - super.channelRead(ctx, msg); - } - @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ConnectionEvents.Close) { diff --git a/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java b/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java index c5f34d266f..872d705fd5 100644 --- a/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java +++ b/src/main/java/com/lambdaworks/redis/RedisChannelHandler.java @@ -197,14 +197,14 @@ public void setOptions(ClientOptions clientOptions) { this.clientOptions = clientOptions; } - public TimeUnit getTimeoutUnit() { - return unit; - } - public long getTimeout() { return timeout; } + public TimeUnit getTimeoutUnit() { + return unit; + } + protected T syncHandler(Object asyncApi, Class... interfaces) { FutureSyncInvocationHandler h = new FutureSyncInvocationHandler<>((StatefulConnection) this, asyncApi); return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h); diff --git a/src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java b/src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java new file mode 100644 index 0000000000..72d78792cd --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/protocol/ChannelLogDescriptor.java @@ -0,0 +1,32 @@ +package com.lambdaworks.redis.protocol; + +import io.netty.channel.Channel; + +/** + * @author Mark Paluch + * @since 08.07.15 09:43 + */ +class ChannelLogDescriptor { + + static String logDescriptor(Channel channel) { + + if (channel == null) { + return "unknown"; + } + + StringBuffer buffer = new StringBuffer(64); + + if (channel.localAddress() != null) { + buffer.append(channel.localAddress()).append(" -> "); + } + if (channel.remoteAddress() != null) { + buffer.append(channel.remoteAddress()); + } + + if (!channel.isActive()) { + buffer.append(" (inactive)"); + } + + return buffer.toString(); + } +} diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 3383974705..2ea11a4ccf 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -2,6 +2,7 @@ package com.lambdaworks.redis.protocol; +import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; import java.util.ArrayDeque; import java.util.ArrayList; @@ -14,6 +15,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.*; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -28,6 +31,7 @@ public class CommandHandler extends ChannelDuplexHandler implements RedisChannelWriter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class); + private static final WriteLogListener WRITE_LOG_LISTENER = new WriteLogListener(); protected ClientOptions clientOptions; protected Queue> queue; @@ -176,7 +180,7 @@ public > C write(C command) { if (reliability == Reliability.AT_LEAST_ONCE) { // commands are ok to stay within the queue, reconnect will retrigger them - channel.write(command, channel.voidPromise()); + channel.write(command).addListener(WRITE_LOG_LISTENER); channel.flush(); } } else { @@ -232,7 +236,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } catch (Exception e) { cmd.completeExceptionally(e); - cmd.cancel(); promise.setFailure(e); throw e; } @@ -483,14 +486,8 @@ private String logPrefix() { if (logPrefix != null) { return logPrefix; } - StringBuffer buffer = new StringBuffer(16); - buffer.append('['); - if (channel != null) { - buffer.append(channel.remoteAddress()); - } else { - buffer.append("not connected"); - } - buffer.append(']'); + StringBuffer buffer = new StringBuffer(64); + buffer.append('[').append(ChannelLogDescriptor.logDescriptor(channel)).append(']'); return logPrefix = buffer.toString(); } @@ -524,4 +521,17 @@ public void operationComplete(ChannelFuture future) throws Exception { } } + /** + * A generic future listener which logs unsuccessful writes. + * + */ + static class WriteLogListener implements GenericFutureListener> { + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess() && !(future.cause() instanceof ClosedChannelException)) + logger.warn(future.cause().getMessage(), future.cause()); + } + + } } diff --git a/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java b/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java index d00d1a85af..924774d0da 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java +++ b/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java @@ -3,11 +3,13 @@ package com.lambdaworks.redis.protocol; import java.net.SocketAddress; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import com.google.common.base.Supplier; import com.lambdaworks.redis.ClientOptions; import com.lambdaworks.redis.ConnectionEvents; +import com.lambdaworks.redis.RedisChannelHandler; import com.lambdaworks.redis.RedisChannelInitializer; import io.netty.bootstrap.Bootstrap; @@ -50,6 +52,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements private SocketAddress remoteAddress; private int attempts; private long lastReconnectionLogging = -1; + private String logPrefix; /** * Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new @@ -82,7 +85,7 @@ public ConnectionWatchdog(ClientOptions clientOptions, Bootstrap bootstrap, Time @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - logger.debug("userEventTriggered(" + ctx + ", " + evt + ")"); + logger.debug("{} userEventTriggered({}, {})", logPrefix(), ctx, evt); if (evt instanceof ConnectionEvents.PrepareClose) { ConnectionEvents.PrepareClose prepareClose = (ConnectionEvents.PrepareClose) evt; setListenOnChannelInactive(false); @@ -94,7 +97,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - logger.debug("channelActive(" + ctx + ")"); + logger.debug("{} channelActive({})", logPrefix(), ctx); channel = ctx.channel(); attempts = 0; remoteAddress = channel.remoteAddress(); @@ -104,10 +107,13 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - logger.debug("channelInactive(" + ctx + ")"); + logger.debug("{} channelInactive({})", logPrefix(), ctx); channel = null; if (listenOnChannelInactive && !reconnectSuspended) { scheduleReconnect(); + } else { + logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx); + logger.debug(""); } super.channelInactive(ctx); } @@ -116,9 +122,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { * Schedule reconnect if channel is not available/not active. */ public void scheduleReconnect() { - logger.debug("scheduleReconnect()"); + logger.debug("{} scheduleReconnect()", logPrefix()); if (!isEventLoopGroupActive()) { + logger.debug("isEventLoopGroupActive() == false"); return; } @@ -127,9 +134,26 @@ public void scheduleReconnect() { attempts++; } int timeout = 2 << attempts; - timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS); + timer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + + if (!isEventLoopGroupActive()) { + logger.debug("isEventLoopGroupActive() == false"); + return; + } + + bootstrap.group().submit(new Callable() { + @Override + public Object call() throws Exception { + ConnectionWatchdog.this.run(null); + return null; + } + }); + } + }, timeout, TimeUnit.MILLISECONDS); } else { - logger.debug("Skipping scheduleReconnect() because I have an active channel"); + logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix()); } } @@ -145,6 +169,7 @@ public void scheduleReconnect() { public void run(Timeout timeout) throws Exception { if (!isEventLoopGroupActive()) { + logger.debug("isEventLoopGroupActive() == false"); return; } @@ -163,7 +188,7 @@ public void run(Timeout timeout) throws Exception { try { reconnect(infoLevel, warnLevel); } catch (Exception e) { - logger.log(warnLevel, "Cannot connect: " + e.toString()); + logger.log(warnLevel, "Cannot connect: {}", e.toString()); scheduleReconnect(); } } @@ -184,10 +209,11 @@ private void reconnect(InternalLogLevel infoLevel, InternalLogLevel warnLevel) t connect.sync().await(); RedisChannelInitializer channelInitializer = connect.channel().pipeline().get(RedisChannelInitializer.class); - CommandHandler commandHandler = connect.channel().pipeline().get(CommandHandler.class); - try { + CommandHandler commandHandler = connect.channel().pipeline().get(CommandHandler.class); + RedisChannelHandler channelHandler = connect.channel().pipeline().get(RedisChannelHandler.class); - channelInitializer.channelInitialized().get(); + try { + channelInitializer.channelInitialized().get(channelHandler.getTimeout(), channelHandler.getTimeoutUnit()); logger.log(infoLevel, "Reconnected to " + remoteAddress); } catch (Exception e) { @@ -248,4 +274,14 @@ public boolean isReconnectSuspended() { public void setReconnectSuspended(boolean reconnectSuspended) { this.reconnectSuspended = reconnectSuspended; } + + private String logPrefix() { + if (logPrefix != null) { + return logPrefix; + } + StringBuffer buffer = new StringBuffer(64); + buffer.append('[').append(ChannelLogDescriptor.logDescriptor(channel)).append(']'); + return logPrefix = buffer.toString(); + } + } diff --git a/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientTest.java b/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientTest.java index 5e2fa791fe..a6f2259412 100644 --- a/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientTest.java +++ b/src/test/java/biz/paluch/redis/extensibility/MyExtendedRedisClientTest.java @@ -1,18 +1,17 @@ package biz.paluch.redis.extensibility; -import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.*; -import java.util.concurrent.TimeUnit; - -import com.lambdaworks.redis.pubsub.RedisPubSubAsyncCommandsImpl; -import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands; import org.apache.log4j.Logger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import com.lambdaworks.redis.FastShutdown; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.TestSettings; +import com.lambdaworks.redis.pubsub.RedisPubSubAsyncCommandsImpl; +import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands; /** * Test for override/extensability of RedisClient @@ -38,7 +37,7 @@ protected static MyExtendedRedisClient getRedisClient() { @AfterClass public static void shutdownClient() { - client.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(client); } @Test diff --git a/src/test/java/com/lambdaworks/SslTest.java b/src/test/java/com/lambdaworks/SslTest.java index 9747926e3d..ed90fc54b0 100644 --- a/src/test/java/com/lambdaworks/SslTest.java +++ b/src/test/java/com/lambdaworks/SslTest.java @@ -35,7 +35,7 @@ public void before() throws Exception { @AfterClass public static void afterClass() { - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); } @Test diff --git a/src/test/java/com/lambdaworks/redis/AbstractRedisClientTest.java b/src/test/java/com/lambdaworks/redis/AbstractRedisClientTest.java index 644996ce97..54fcba4382 100644 --- a/src/test/java/com/lambdaworks/redis/AbstractRedisClientTest.java +++ b/src/test/java/com/lambdaworks/redis/AbstractRedisClientTest.java @@ -56,7 +56,7 @@ public WithPasswordRequired() throws Exception { try { run(client); } finally { - client.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(client); } } finally { diff --git a/src/test/java/com/lambdaworks/redis/AllTheAPIsTest.java b/src/test/java/com/lambdaworks/redis/AllTheAPIsTest.java index 6f503c6917..cd6d1448ee 100644 --- a/src/test/java/com/lambdaworks/redis/AllTheAPIsTest.java +++ b/src/test/java/com/lambdaworks/redis/AllTheAPIsTest.java @@ -25,7 +25,7 @@ public static void beforeClass() throws Exception { @BeforeClass public static void afterClass() throws Exception { if (clusterClient != null) { - clusterClient.shutdown(); + FastShutdown.shutdown(clusterClient); } } diff --git a/src/test/java/com/lambdaworks/redis/ClientTest.java b/src/test/java/com/lambdaworks/redis/ClientTest.java index 4d761a4078..cde67df8a7 100644 --- a/src/test/java/com/lambdaworks/redis/ClientTest.java +++ b/src/test/java/com/lambdaworks/redis/ClientTest.java @@ -232,7 +232,7 @@ public void cancelCommandsOnReconnectFailure() throws Exception { assertThat(connection.isOpen()).isFalse(); connectionWatchdog.setReconnectSuspended(false); - connectionWatchdog.scheduleReconnect(); + connectionWatchdog.run(null); Thread.sleep(500); assertThat(connection.isOpen()).isFalse(); @@ -309,7 +309,7 @@ public boolean isSatisfied() { assertThat(listener.onConnected).isEqualTo(statefulRedisConnection); assertThat(listener.onDisconnected).isEqualTo(statefulRedisConnection); - client.shutdown(); + FastShutdown.shutdown(client); } @Test @@ -338,7 +338,7 @@ public boolean isSatisfied() { assertThat(removedListener.onDisconnected).isNull(); assertThat(removedListener.onException).isNull(); - client.shutdown(); + FastShutdown.shutdown(client); } @@ -436,7 +436,7 @@ public void emptyClient() throws Exception { } catch (IllegalArgumentException e) { assertThat(e).hasMessageContaining("RedisURI"); } - client.shutdown(); + FastShutdown.shutdown(client); } @Test diff --git a/src/test/java/com/lambdaworks/redis/ConnectionCommandTest.java b/src/test/java/com/lambdaworks/redis/ConnectionCommandTest.java index 97cb95420a..1abf6b4ca7 100644 --- a/src/test/java/com/lambdaworks/redis/ConnectionCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/ConnectionCommandTest.java @@ -42,7 +42,7 @@ public void run(RedisClient client) { RedisConnection authConnection = redisClient.connect().sync(); authConnection.ping(); authConnection.close(); - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); } }; } diff --git a/src/test/java/com/lambdaworks/redis/DefaultRedisClient.java b/src/test/java/com/lambdaworks/redis/DefaultRedisClient.java index 686fd18257..b58587f7b9 100644 --- a/src/test/java/com/lambdaworks/redis/DefaultRedisClient.java +++ b/src/test/java/com/lambdaworks/redis/DefaultRedisClient.java @@ -17,7 +17,7 @@ public DefaultRedisClient() { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - redisClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(redisClient); } }); } diff --git a/src/test/java/com/lambdaworks/redis/FastShutdown.java b/src/test/java/com/lambdaworks/redis/FastShutdown.java new file mode 100644 index 0000000000..38f04df3c8 --- /dev/null +++ b/src/test/java/com/lambdaworks/redis/FastShutdown.java @@ -0,0 +1,19 @@ +package com.lambdaworks.redis; + +import java.util.concurrent.TimeUnit; + +/** + * @author Mark Paluch + * @since 08.07.15 11:02 + */ +public class FastShutdown { + + /** + * Shut down the redis client with a timeout of 10ms. + * + * @param redisClient + */ + public static void shutdown(AbstractRedisClient redisClient) { + redisClient.shutdown(10, 10, TimeUnit.MILLISECONDS); + } +} diff --git a/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java b/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java index fa561f1d31..a26560ae75 100644 --- a/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java +++ b/src/test/java/com/lambdaworks/redis/PoolConnectionTest.java @@ -176,7 +176,7 @@ public void testResourceCleaning() throws Exception { redisClient.pool().close(); assertThat(redisClient.getResourceCount()).isEqualTo(6); - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); assertThat(redisClient.getChannelCount()).isEqualTo(0); assertThat(redisClient.getResourceCount()).isEqualTo(0); diff --git a/src/test/java/com/lambdaworks/redis/UnixDomainSocketTest.java b/src/test/java/com/lambdaworks/redis/UnixDomainSocketTest.java index b50663979d..afa604ffc6 100644 --- a/src/test/java/com/lambdaworks/redis/UnixDomainSocketTest.java +++ b/src/test/java/com/lambdaworks/redis/UnixDomainSocketTest.java @@ -40,7 +40,7 @@ public static void setupClient() { @AfterClass public static void shutdownClient() { - sentinelClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(sentinelClient); } @Test @@ -57,7 +57,7 @@ public void standalone_Linux_x86_64_socket() throws Exception { someRedisAction(connection.sync()); connection.close(); - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); } private void linuxOnly() { @@ -97,7 +97,7 @@ public void sentinel_Linux_x86_64_socket() throws Exception { assertThat(sentinelConnection.ping().get()).isEqualTo("PONG"); sentinelConnection.close(); - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); } @Test @@ -123,7 +123,7 @@ public void sentinel_Linux_x86_64_socket_and_inet() throws Exception { } catch (RedisConnectionException e) { assertThat(e).hasMessageContaining("You cannot mix unix domain socket and IP socket URI's"); } finally { - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); } } diff --git a/src/test/java/com/lambdaworks/redis/cluster/AbstractClusterTest.java b/src/test/java/com/lambdaworks/redis/cluster/AbstractClusterTest.java index 9891bdb4b6..39c735d3bf 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/AbstractClusterTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/AbstractClusterTest.java @@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -43,7 +44,7 @@ public static void setupClusterClient() throws Exception { @AfterClass public static void shutdownClusterClient() { - clusterClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(clusterClient); } public static int[] createSlots(int from, int to) { diff --git a/src/test/java/com/lambdaworks/redis/cluster/ClusterCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/ClusterCommandTest.java index 803248c610..8715af9faa 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/ClusterCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/ClusterCommandTest.java @@ -46,7 +46,6 @@ public class ClusterCommandTest extends AbstractClusterTest { @BeforeClass public static void setupClient() throws Exception { - setupClusterClient(); client = new RedisClient(host, port1); clusterClient = new RedisClusterClient(ImmutableList.of(RedisURI.Builder.redis(host, port1).build())); @@ -54,9 +53,8 @@ public static void setupClient() throws Exception { @AfterClass public static void shutdownClient() { - shutdownClusterClient(); - client.shutdown(0, 0, TimeUnit.MILLISECONDS); - clusterClient.shutdown(); + FastShutdown.shutdown(client); + FastShutdown.shutdown(clusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/ClusterReactiveCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/ClusterReactiveCommandTest.java index 2889bc8531..3893305880 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/ClusterReactiveCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/ClusterReactiveCommandTest.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -44,8 +45,8 @@ public static void setupClient() throws Exception { @AfterClass public static void shutdownClient() { shutdownClusterClient(); - client.shutdown(0, 0, TimeUnit.MILLISECONDS); - clusterClient.shutdown(); + FastShutdown.shutdown(client); + FastShutdown.shutdown(clusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java b/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java index c66fcf0347..2c9792fc8a 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java +++ b/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java @@ -82,7 +82,8 @@ public boolean isStable() { for (RedisClusterNode redisClusterNode : parse) { if (redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.FAIL) - || redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)) { + || redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.EVENTUAL_FAIL) + || redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.HANDSHAKE)) { return false; } } diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java index 6d3b85f36d..d20ee194c9 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java @@ -57,8 +57,8 @@ public static void setupClient() throws Exception { @AfterClass public static void shutdownClient() { shutdownClusterClient(); - client.shutdown(0, 0, TimeUnit.MILLISECONDS); - clusterClient.shutdown(); + FastShutdown.shutdown(client); + FastShutdown.shutdown(clusterClient); } @Before @@ -263,7 +263,7 @@ public void clusterAuth() throws Exception { char[] password = (char[]) ReflectionTestUtils.getField(connection.getStatefulConnection(), "password"); assertThat(new String(password)).isEqualTo("foobared"); } finally { - clusterClient.shutdown(); + FastShutdown.shutdown(clusterClient); } } @@ -278,7 +278,7 @@ public void clusterNeedsAuthButNotSupplied() throws Exception { List time = connection.time(); assertThat(time).hasSize(2); } finally { - clusterClient.shutdown(); + FastShutdown.shutdown(clusterClient); } } diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java index 424ae7d0f6..c7433eb303 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterSetupTest.java @@ -1,25 +1,15 @@ package com.lambdaworks.redis.cluster; -import static com.lambdaworks.redis.cluster.ClusterTestUtil.getNodeId; -import static com.lambdaworks.redis.cluster.ClusterTestUtil.getOwnPartition; -import static org.assertj.core.api.Assertions.assertThat; +import static com.lambdaworks.redis.cluster.ClusterTestUtil.*; +import static org.assertj.core.api.Assertions.*; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import com.lambdaworks.Wait; import com.lambdaworks.category.SlowTests; -import com.lambdaworks.redis.DefaultRedisClient; -import com.lambdaworks.redis.RedisClient; -import com.lambdaworks.redis.RedisURI; -import com.lambdaworks.redis.TestSettings; +import com.lambdaworks.redis.*; import com.lambdaworks.redis.cluster.api.sync.RedisClusterCommands; import com.lambdaworks.redis.cluster.models.partitions.ClusterPartitionParser; import com.lambdaworks.redis.cluster.models.partitions.Partitions; @@ -51,7 +41,7 @@ public static void setupClient() { @AfterClass public static void shutdownClient() { - clusterClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(clusterClient); } @Before @@ -91,9 +81,20 @@ public void clusterForget() throws Exception { String result = redis1.clusterMeet(host, AbstractClusterTest.port6); assertThat(result).isEqualTo("OK"); - Wait.untilEquals(2, () -> ClusterPartitionParser.parse(redis1.clusterNodes()).size()).waitOrTimeout(); Wait.untilTrue(() -> redis1.clusterNodes().contains(redis2.clusterMyId())).waitOrTimeout(); Wait.untilTrue(() -> redis2.clusterNodes().contains(redis1.clusterMyId())).waitOrTimeout(); + Wait.untilTrue(() -> { + Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes()); + if (partitions.size() != 2) { + return false; + } + for (RedisClusterNode redisClusterNode : partitions) { + if (redisClusterNode.is(RedisClusterNode.NodeFlag.HANDSHAKE)) { + return false; + } + } + return true; + }).waitOrTimeout(); redis1.clusterForget(redis2.clusterMyId()); diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterStressScenariosTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterStressScenariosTest.java index 1ab8ed6c24..570694aa3f 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterStressScenariosTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterStressScenariosTest.java @@ -58,7 +58,7 @@ public static void setupClient() throws Exception { @AfterClass public static void shutdownClient() { - client.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(client); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/commands/GeoClusterCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/commands/GeoClusterCommandTest.java index e3fde7cb98..ada8577cf3 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/commands/GeoClusterCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/commands/GeoClusterCommandTest.java @@ -4,6 +4,7 @@ import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection; import org.junit.AfterClass; import org.junit.Before; @@ -30,7 +31,7 @@ public static void setupClient() { @AfterClass public static void closeClient() { - redisClusterClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/commands/HashClusterCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/commands/HashClusterCommandTest.java index 4ed77f4554..b3cfafbd97 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/commands/HashClusterCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/commands/HashClusterCommandTest.java @@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -29,7 +30,7 @@ public static void setupClient() { @AfterClass public static void closeClient() { - redisClusterClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/commands/ListClusterCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/commands/ListClusterCommandTest.java index f11a442a79..3f7a7cc76b 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/commands/ListClusterCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/commands/ListClusterCommandTest.java @@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Before; @@ -30,7 +31,7 @@ public static void setupClient() { @AfterClass public static void closeClient() { - redisClusterClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/commands/StringClusterCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/commands/StringClusterCommandTest.java index a9a4b8c096..38129b4385 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/commands/StringClusterCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/commands/StringClusterCommandTest.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection; import org.junit.AfterClass; import org.junit.Assert; @@ -38,7 +39,7 @@ public static void setupClient() { @AfterClass public static void closeClient() { - redisClusterClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/commands/rx/HashClusterRxCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/commands/rx/HashClusterRxCommandTest.java index a8a8993a1b..8500079a84 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/commands/rx/HashClusterRxCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/commands/rx/HashClusterRxCommandTest.java @@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import com.lambdaworks.redis.cluster.ClusterTestUtil; import org.junit.AfterClass; import org.junit.Before; @@ -31,7 +32,7 @@ public static void setupClient() { @AfterClass public static void closeClient() { - redisClusterClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/commands/rx/ListClusterRxCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/commands/rx/ListClusterRxCommandTest.java index 35e048ea67..09f495e898 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/commands/rx/ListClusterRxCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/commands/rx/ListClusterRxCommandTest.java @@ -2,6 +2,7 @@ import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.Before; @@ -31,7 +32,7 @@ public static void setupClient() { @AfterClass public static void closeClient() { - redisClusterClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/cluster/commands/rx/StringClusterRxCommandTest.java b/src/test/java/com/lambdaworks/redis/cluster/commands/rx/StringClusterRxCommandTest.java index eb506e44cf..2a2a57a6ed 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/commands/rx/StringClusterRxCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/commands/rx/StringClusterRxCommandTest.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -35,7 +36,7 @@ public static void setupClient() { @AfterClass public static void closeClient() { - redisClusterClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/issue42/BreakClusterClientTest.java b/src/test/java/com/lambdaworks/redis/issue42/BreakClusterClientTest.java index db872089b6..d1685cb2a7 100644 --- a/src/test/java/com/lambdaworks/redis/issue42/BreakClusterClientTest.java +++ b/src/test/java/com/lambdaworks/redis/issue42/BreakClusterClientTest.java @@ -6,6 +6,7 @@ import java.util.concurrent.TimeUnit; import com.lambdaworks.category.SlowTests; +import com.lambdaworks.redis.FastShutdown; import org.junit.*; import com.google.code.tempusfugit.temporal.Condition; @@ -41,7 +42,7 @@ public static void setupClient() { @AfterClass public static void shutdownClient() { - clusterClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(clusterClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java b/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java index 50939bc024..59fb698d82 100644 --- a/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/pubsub/PubSubCommandTest.java @@ -14,6 +14,7 @@ import com.lambdaworks.Wait; import com.lambdaworks.redis.AbstractRedisClientTest; +import com.lambdaworks.redis.FastShutdown; import com.lambdaworks.redis.RedisClient; import com.lambdaworks.redis.RedisFuture; import com.lambdaworks.redis.pubsub.api.async.RedisPubSubAsyncCommands; @@ -81,7 +82,7 @@ protected void run(RedisClient client) throws Exception { }; } - @Test(timeout = 200) + @Test(timeout = 2000) public void message() throws Exception { pubsub.subscribe(channel); assertThat(channels.take()).isEqualTo(channel); @@ -91,7 +92,7 @@ public void message() throws Exception { assertThat(messages.take()).isEqualTo(message); } - @Test(timeout = 200) + @Test(timeout = 2000) public void pmessage() throws Exception { pubsub.psubscribe(pattern).await(1, TimeUnit.MINUTES); assertThat(patterns.take()).isEqualTo(pattern); @@ -107,7 +108,7 @@ public void pmessage() throws Exception { assertThat(messages.take()).isEqualTo("msg 2!"); } - @Test(timeout = 200) + @Test(timeout = 2000) public void psubscribe() throws Exception { RedisFuture psubscribe = pubsub.psubscribe(pattern); assertThat(psubscribe.get()).isNull(); @@ -119,7 +120,7 @@ public void psubscribe() throws Exception { assertThat((long) counts.take()).isEqualTo(1); } - @Test(timeout = 200) + @Test(timeout = 2000) public void psubscribeWithListener() throws Exception { RedisFuture psubscribe = pubsub.psubscribe(pattern); final List listener = Lists.newArrayList(); @@ -195,14 +196,14 @@ public void punsubscribe() throws Exception { } - @Test(timeout = 200) + @Test(timeout = 2000) public void subscribe() throws Exception { pubsub.subscribe(channel); assertThat(channels.take()).isEqualTo(channel); assertThat((long) counts.take()).isEqualTo(1); } - @Test(timeout = 200) + @Test(timeout = 2000) public void unsubscribe() throws Exception { pubsub.unsubscribe(channel).get(); assertThat(channels.take()).isEqualTo(channel); @@ -225,12 +226,12 @@ public void pubsubCloseOnClientShutdown() throws Exception { RedisPubSubAsyncCommands connection = redisClient.connectPubSub().async(); - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); assertThat(connection.isOpen()).isFalse(); } - @Test(timeout = 200) + @Test(timeout = 2000) public void utf8Channel() throws Exception { String channel = "channelλ"; String message = "αβγ"; @@ -279,7 +280,7 @@ public void resubscribePatternsOnReconnect() throws Exception { assertThat(messages.take()).isEqualTo(message); } - @Test(timeout = 200) + @Test(timeout = 2000) public void adapter() throws Exception { final BlockingQueue localCounts = new LinkedBlockingQueue(); diff --git a/src/test/java/com/lambdaworks/redis/pubsub/PubSubRxTest.java b/src/test/java/com/lambdaworks/redis/pubsub/PubSubRxTest.java index 07ae9f1ca8..a026077452 100644 --- a/src/test/java/com/lambdaworks/redis/pubsub/PubSubRxTest.java +++ b/src/test/java/com/lambdaworks/redis/pubsub/PubSubRxTest.java @@ -10,6 +10,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import com.lambdaworks.redis.FastShutdown; import com.lambdaworks.redis.pubsub.api.sync.RedisPubSubCommands; import org.junit.After; import org.junit.Before; @@ -279,7 +280,7 @@ public void pubsubCloseOnClientShutdown() throws Exception { RedisPubSubCommands connection = redisClient.connectPubSub().sync(); - redisClient.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(redisClient); assertThat(connection.isOpen()).isFalse(); } diff --git a/src/test/java/com/lambdaworks/redis/sentinel/AbstractSentinelTest.java b/src/test/java/com/lambdaworks/redis/sentinel/AbstractSentinelTest.java index 5b15c770ff..bab5c7f5a9 100644 --- a/src/test/java/com/lambdaworks/redis/sentinel/AbstractSentinelTest.java +++ b/src/test/java/com/lambdaworks/redis/sentinel/AbstractSentinelTest.java @@ -1,15 +1,13 @@ package com.lambdaworks.redis.sentinel; -import java.util.concurrent.TimeUnit; - -import com.lambdaworks.redis.AbstractTest; -import com.lambdaworks.redis.sentinel.api.sync.RedisSentinelCommands; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import org.junit.Rule; +import com.lambdaworks.redis.AbstractTest; +import com.lambdaworks.redis.FastShutdown; import com.lambdaworks.redis.RedisClient; +import com.lambdaworks.redis.sentinel.api.sync.RedisSentinelCommands; public abstract class AbstractSentinelTest extends AbstractTest { @@ -20,7 +18,7 @@ public abstract class AbstractSentinelTest extends AbstractTest { @AfterClass public static void shutdownClient() { - sentinelClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(sentinelClient); } @Before diff --git a/src/test/java/com/lambdaworks/redis/sentinel/SentinelCommandTest.java b/src/test/java/com/lambdaworks/redis/sentinel/SentinelCommandTest.java index c66f22ba04..56fe41eb6f 100644 --- a/src/test/java/com/lambdaworks/redis/sentinel/SentinelCommandTest.java +++ b/src/test/java/com/lambdaworks/redis/sentinel/SentinelCommandTest.java @@ -12,19 +12,13 @@ import java.util.concurrent.TimeUnit; import com.lambdaworks.Wait; +import com.lambdaworks.redis.*; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import com.lambdaworks.Delay; -import com.lambdaworks.redis.RedisAsyncConnection; -import com.lambdaworks.redis.RedisClient; -import com.lambdaworks.redis.RedisConnection; -import com.lambdaworks.redis.RedisConnectionException; -import com.lambdaworks.redis.RedisFuture; -import com.lambdaworks.redis.RedisURI; -import com.lambdaworks.redis.TestSettings; import com.lambdaworks.redis.sentinel.api.async.RedisSentinelAsyncCommands; public class SentinelCommandTest extends AbstractSentinelTest { @@ -99,7 +93,7 @@ public void sentinelConnectWith() throws Exception { assertThat(connection2.ping()).isEqualTo("PONG"); connection2.close(); - client.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(client); } @Test @@ -113,7 +107,7 @@ public void sentinelConnectWrongMaster() throws Exception { } catch (RedisConnectionException e) { } - client.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(client); } @Test @@ -125,7 +119,7 @@ public void sentinelConnect() throws Exception { assertThat(connection.ping().get()).isEqualTo("PONG"); connection.close(); - client.shutdown(0, 0, TimeUnit.SECONDS); + FastShutdown.shutdown(client); } @Test @@ -152,7 +146,7 @@ public void role() throws Exception { assertThat(objects.get(1).toString()).isEqualTo("[" + MASTER_ID + "]"); } finally { connection.close(); - redisClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + FastShutdown.shutdown(redisClient); } } diff --git a/src/test/java/com/lambdaworks/redis/sentinel/SentinelFailoverTest.java b/src/test/java/com/lambdaworks/redis/sentinel/SentinelFailoverTest.java index 8f9831c986..b5f1f10c27 100644 --- a/src/test/java/com/lambdaworks/redis/sentinel/SentinelFailoverTest.java +++ b/src/test/java/com/lambdaworks/redis/sentinel/SentinelFailoverTest.java @@ -8,6 +8,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.lambdaworks.redis.FastShutdown; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; @@ -61,7 +62,7 @@ public void failover() throws Exception { String tcpPort2 = connectUsingSentinelAndGetPort(); assertThat(tcpPort1).isNotEqualTo(tcpPort2); - redisClient.shutdown(); + FastShutdown.shutdown(redisClient); } protected String connectUsingSentinelAndGetPort() { diff --git a/src/test/java/com/lambdaworks/redis/server/RandomResponseServer.java b/src/test/java/com/lambdaworks/redis/server/RandomResponseServer.java index 93e5e4c7a4..2510f7a005 100644 --- a/src/test/java/com/lambdaworks/redis/server/RandomResponseServer.java +++ b/src/test/java/com/lambdaworks/redis/server/RandomResponseServer.java @@ -1,5 +1,7 @@ package com.lambdaworks.redis.server; +import java.util.concurrent.TimeUnit; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -41,7 +43,7 @@ public void initChannel(SocketChannel ch) throws Exception { public void shutdown() { channel.close(); - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(100, 100, TimeUnit.MILLISECONDS); + workerGroup.shutdownGracefully(100, 100, TimeUnit.MILLISECONDS); } } diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties index 3bd0976744..4b875ffc81 100644 --- a/src/test/resources/log4j.properties +++ b/src/test/resources/log4j.properties @@ -21,7 +21,7 @@ log4j.appender.capture.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%-5p] [ log4j.logger.com.lambdaworks=INFO log4j.logger.io.netty=INFO log4j.logger.com.lambdaworks.redis.cluster=INFO -log4j.logger.com.lambdaworks.redis.protocol=INFO +log4j.logger.com.lambdaworks.redis.protocol=DEBUG log4j.logger.com.lambdaworks.redis.RedisClient=INFO log4j.logger.com.lambdaworks.redis.protocol.ConnectionWatchdog=INFO