From 8491dd8b7877d71196bac44e5369f8acc11c5927 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 6 Jun 2014 15:37:27 +0200 Subject: [PATCH] Reduced test load, fix for sync connect, changed shutdown order --- .../redis/AbstractRedisClient.java | 28 +++++---- .../redis/cluster/RedisClusterClientTest.java | 62 +++++++------------ 2 files changed, 41 insertions(+), 49 deletions(-) diff --git a/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java b/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java index 28f26b23c5..1cc3d5a452 100644 --- a/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java +++ b/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java @@ -1,17 +1,11 @@ package com.lambdaworks.redis; -import java.io.Closeable; -import java.net.SocketAddress; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.lambdaworks.redis.internal.ChannelGroupListener; import com.lambdaworks.redis.protocol.CommandHandler; import com.lambdaworks.redis.protocol.ConnectionWatchdog; import com.lambdaworks.redis.pubsub.PubSubCommandHandler; - import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -24,11 +18,17 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.ConcurrentSet; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.io.Closeable; +import java.net.SocketAddress; +import java.util.Set; +import java.util.concurrent.TimeUnit; + /** * @author Mark Paluch * @since 26.05.14 17:28 @@ -36,6 +36,7 @@ public abstract class AbstractRedisClient { protected static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class); protected EventLoopGroup group; + protected HashedWheelTimer timer; protected ChannelGroup channels; protected long timeout; @@ -89,7 +90,7 @@ protected void initChannel(Channel ch) throws Exception { } }); - redisBootstrap.connect(redisAddress).sync(); + redisBootstrap.connect(redisAddress).get(); connection.addListener(new CloseEvents.CloseListener() { @Override @@ -100,7 +101,7 @@ public void resourceClosed(Object resource) { closeableResources.add(connection); return connection; - } catch (Throwable e) { + } catch (Exception e) { throw new RedisException("Unable to connect", e); } } @@ -133,9 +134,14 @@ public void shutdown() { psCommandHandler.close(); } } - ChannelGroupFuture future = channels.close(); - future.awaitUninterruptibly(); - group.shutdownGracefully().syncUninterruptibly(); + ChannelGroupFuture closeFuture = channels.close(); + Future groupCloseFuture = group.shutdownGracefully(); + try { + closeFuture.get(); + groupCloseFuture.get(); + } catch (Exception e) { + throw new RedisException(e); + } timer.stop(); } diff --git a/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java b/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java index cab1640af7..6256b3e4e8 100644 --- a/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java +++ b/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java @@ -7,17 +7,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.FixMethodOrder; -import org.junit.Test; -import org.junit.runners.MethodSorters; - import com.google.code.tempusfugit.temporal.Condition; import com.google.code.tempusfugit.temporal.Duration; import com.google.code.tempusfugit.temporal.ThreadSleep; @@ -33,6 +22,15 @@ import com.lambdaworks.redis.RedisClusterAsyncConnection; import com.lambdaworks.redis.RedisFuture; import com.lambdaworks.redis.RedisURI; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.util.List; @FixMethodOrder(MethodSorters.NAME_ASCENDING) @SuppressWarnings("unchecked") @@ -107,7 +105,19 @@ public boolean isSatisfied() { try { String info = redis1.clusterInfo().get(); if (info != null && info.contains("cluster_state:ok")) { + + String s = redis1.clusterNodes().get(); + Partitions parse = ClusterPartitionParser.parse(s); + + for (RedisClusterNode redisClusterNode : parse) { + if (redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.FAIL) + || redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)) { + return false; + } + } + return true; + } } catch (Exception e) { @@ -126,30 +136,6 @@ public void after() throws Exception { redis4.close(); } - public void cleanup() throws Exception { - - int slots[] = createSlots(1, 16385); - List> futures = Lists.newArrayList(); - - for (int i = 0; i < slots.length; i++) { - futures.add(redis1.clusterDelSlots(i)); - futures.add(redis2.clusterDelSlots(i)); - futures.add(redis3.clusterDelSlots(i)); - futures.add(redis4.clusterDelSlots(i)); - } - - for (int i = 0; i < slots.length; i++) { - futures.add(redis1.clusterDelSlots(i)); - futures.add(redis2.clusterDelSlots(i)); - futures.add(redis3.clusterDelSlots(i)); - futures.add(redis4.clusterDelSlots(i)); - } - - for (RedisFuture future : futures) { - future.get(); - } - } - @Test public void testClusterInfo() throws Exception { @@ -175,7 +161,7 @@ public void testClusterNodes() throws Exception { } @Test - public void clusterSlaves() throws Exception { + public void zzzLastClusterSlaves() throws Exception { Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes().get()); final RedisClusterNode node1 = Iterables.find(partitions, new Predicate() { @@ -327,7 +313,7 @@ public void massiveClusteredAccess() throws Exception { RedisClusterAsyncConnection connection = clusterClient.connectClusterAsync(); List> futures = Lists.newArrayList(); - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < 1000; i++) { futures.add(connection.set("a" + i, "myValue1" + i)); futures.add(connection.set("b" + i, "myValue2" + i)); futures.add(connection.set("d" + i, "myValue3" + i)); @@ -337,7 +323,7 @@ public void massiveClusteredAccess() throws Exception { future.get(); } - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < 1000; i++) { RedisFuture setA = connection.get("a" + i); RedisFuture setB = connection.get("b" + i); RedisFuture setD = connection.get("d" + i);