diff --git a/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java b/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java
index 28f26b23c5..1cc3d5a452 100644
--- a/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java
+++ b/lettuce/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java
@@ -1,17 +1,11 @@
package com.lambdaworks.redis;
-import java.io.Closeable;
-import java.net.SocketAddress;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.lambdaworks.redis.internal.ChannelGroupListener;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;
-
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -24,11 +18,17 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
+import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.ConcurrentSet;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
+import java.io.Closeable;
+import java.net.SocketAddress;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
/**
* @author Mark Paluch
* @since 26.05.14 17:28
@@ -36,6 +36,7 @@
public abstract class AbstractRedisClient {
protected static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class);
protected EventLoopGroup group;
+
protected HashedWheelTimer timer;
protected ChannelGroup channels;
protected long timeout;
@@ -89,7 +90,7 @@ protected void initChannel(Channel ch) throws Exception {
}
});
- redisBootstrap.connect(redisAddress).sync();
+ redisBootstrap.connect(redisAddress).get();
connection.addListener(new CloseEvents.CloseListener() {
@Override
@@ -100,7 +101,7 @@ public void resourceClosed(Object resource) {
closeableResources.add(connection);
return connection;
- } catch (Throwable e) {
+ } catch (Exception e) {
throw new RedisException("Unable to connect", e);
}
}
@@ -133,9 +134,14 @@ public void shutdown() {
psCommandHandler.close();
}
}
- ChannelGroupFuture future = channels.close();
- future.awaitUninterruptibly();
- group.shutdownGracefully().syncUninterruptibly();
+ ChannelGroupFuture closeFuture = channels.close();
+ Future> groupCloseFuture = group.shutdownGracefully();
+ try {
+ closeFuture.get();
+ groupCloseFuture.get();
+ } catch (Exception e) {
+ throw new RedisException(e);
+ }
timer.stop();
}
diff --git a/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java b/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java
index cab1640af7..6256b3e4e8 100644
--- a/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java
+++ b/lettuce/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java
@@ -7,17 +7,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-
import com.google.code.tempusfugit.temporal.Condition;
import com.google.code.tempusfugit.temporal.Duration;
import com.google.code.tempusfugit.temporal.ThreadSleep;
@@ -33,6 +22,15 @@
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import java.util.List;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@SuppressWarnings("unchecked")
@@ -107,7 +105,19 @@ public boolean isSatisfied() {
try {
String info = redis1.clusterInfo().get();
if (info != null && info.contains("cluster_state:ok")) {
+
+ String s = redis1.clusterNodes().get();
+ Partitions parse = ClusterPartitionParser.parse(s);
+
+ for (RedisClusterNode redisClusterNode : parse) {
+ if (redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.FAIL)
+ || redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)) {
+ return false;
+ }
+ }
+
return true;
+
}
} catch (Exception e) {
@@ -126,30 +136,6 @@ public void after() throws Exception {
redis4.close();
}
- public void cleanup() throws Exception {
-
- int slots[] = createSlots(1, 16385);
- List> futures = Lists.newArrayList();
-
- for (int i = 0; i < slots.length; i++) {
- futures.add(redis1.clusterDelSlots(i));
- futures.add(redis2.clusterDelSlots(i));
- futures.add(redis3.clusterDelSlots(i));
- futures.add(redis4.clusterDelSlots(i));
- }
-
- for (int i = 0; i < slots.length; i++) {
- futures.add(redis1.clusterDelSlots(i));
- futures.add(redis2.clusterDelSlots(i));
- futures.add(redis3.clusterDelSlots(i));
- futures.add(redis4.clusterDelSlots(i));
- }
-
- for (RedisFuture> future : futures) {
- future.get();
- }
- }
-
@Test
public void testClusterInfo() throws Exception {
@@ -175,7 +161,7 @@ public void testClusterNodes() throws Exception {
}
@Test
- public void clusterSlaves() throws Exception {
+ public void zzzLastClusterSlaves() throws Exception {
Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes().get());
final RedisClusterNode node1 = Iterables.find(partitions, new Predicate() {
@@ -327,7 +313,7 @@ public void massiveClusteredAccess() throws Exception {
RedisClusterAsyncConnection connection = clusterClient.connectClusterAsync();
List> futures = Lists.newArrayList();
- for (int i = 0; i < 10000; i++) {
+ for (int i = 0; i < 1000; i++) {
futures.add(connection.set("a" + i, "myValue1" + i));
futures.add(connection.set("b" + i, "myValue2" + i));
futures.add(connection.set("d" + i, "myValue3" + i));
@@ -337,7 +323,7 @@ public void massiveClusteredAccess() throws Exception {
future.get();
}
- for (int i = 0; i < 10000; i++) {
+ for (int i = 0; i < 1000; i++) {
RedisFuture setA = connection.get("a" + i);
RedisFuture setB = connection.get("b" + i);
RedisFuture setD = connection.get("d" + i);