Skip to content

Commit

Permalink
Reduced test load, fix for sync connect, changed shutdown order
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jun 6, 2014
1 parent b8f8957 commit 8491dd8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package com.lambdaworks.redis;

import java.io.Closeable;
import java.net.SocketAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.lambdaworks.redis.internal.ChannelGroupListener;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -24,18 +18,25 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.ConcurrentSet;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.Closeable;
import java.net.SocketAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
* @since 26.05.14 17:28
*/
public abstract class AbstractRedisClient {
protected static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class);
protected EventLoopGroup group;

protected HashedWheelTimer timer;
protected ChannelGroup channels;
protected long timeout;
Expand Down Expand Up @@ -89,7 +90,7 @@ protected void initChannel(Channel ch) throws Exception {
}
});

redisBootstrap.connect(redisAddress).sync();
redisBootstrap.connect(redisAddress).get();

connection.addListener(new CloseEvents.CloseListener() {
@Override
Expand All @@ -100,7 +101,7 @@ public void resourceClosed(Object resource) {
closeableResources.add(connection);

return connection;
} catch (Throwable e) {
} catch (Exception e) {
throw new RedisException("Unable to connect", e);
}
}
Expand Down Expand Up @@ -133,9 +134,14 @@ public void shutdown() {
psCommandHandler.close();
}
}
ChannelGroupFuture future = channels.close();
future.awaitUninterruptibly();
group.shutdownGracefully().syncUninterruptibly();
ChannelGroupFuture closeFuture = channels.close();
Future<?> groupCloseFuture = group.shutdownGracefully();
try {
closeFuture.get();
groupCloseFuture.get();
} catch (Exception e) {
throw new RedisException(e);
}
timer.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import java.util.List;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

import com.google.code.tempusfugit.temporal.Condition;
import com.google.code.tempusfugit.temporal.Duration;
import com.google.code.tempusfugit.temporal.ThreadSleep;
Expand All @@ -33,6 +22,15 @@
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

import java.util.List;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -107,7 +105,19 @@ public boolean isSatisfied() {
try {
String info = redis1.clusterInfo().get();
if (info != null && info.contains("cluster_state:ok")) {

String s = redis1.clusterNodes().get();
Partitions parse = ClusterPartitionParser.parse(s);

for (RedisClusterNode redisClusterNode : parse) {
if (redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.FAIL)
|| redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)) {
return false;
}
}

return true;

}
} catch (Exception e) {

Expand All @@ -126,30 +136,6 @@ public void after() throws Exception {
redis4.close();
}

public void cleanup() throws Exception {

int slots[] = createSlots(1, 16385);
List<RedisFuture<?>> futures = Lists.newArrayList();

for (int i = 0; i < slots.length; i++) {
futures.add(redis1.clusterDelSlots(i));
futures.add(redis2.clusterDelSlots(i));
futures.add(redis3.clusterDelSlots(i));
futures.add(redis4.clusterDelSlots(i));
}

for (int i = 0; i < slots.length; i++) {
futures.add(redis1.clusterDelSlots(i));
futures.add(redis2.clusterDelSlots(i));
futures.add(redis3.clusterDelSlots(i));
futures.add(redis4.clusterDelSlots(i));
}

for (RedisFuture<?> future : futures) {
future.get();
}
}

@Test
public void testClusterInfo() throws Exception {

Expand All @@ -175,7 +161,7 @@ public void testClusterNodes() throws Exception {
}

@Test
public void clusterSlaves() throws Exception {
public void zzzLastClusterSlaves() throws Exception {
Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes().get());

final RedisClusterNode node1 = Iterables.find(partitions, new Predicate<RedisClusterNode>() {
Expand Down Expand Up @@ -327,7 +313,7 @@ public void massiveClusteredAccess() throws Exception {
RedisClusterAsyncConnection<String, String> connection = clusterClient.connectClusterAsync();

List<RedisFuture<?>> futures = Lists.newArrayList();
for (int i = 0; i < 10000; i++) {
for (int i = 0; i < 1000; i++) {
futures.add(connection.set("a" + i, "myValue1" + i));
futures.add(connection.set("b" + i, "myValue2" + i));
futures.add(connection.set("d" + i, "myValue3" + i));
Expand All @@ -337,7 +323,7 @@ public void massiveClusteredAccess() throws Exception {
future.get();
}

for (int i = 0; i < 10000; i++) {
for (int i = 0; i < 1000; i++) {
RedisFuture<String> setA = connection.get("a" + i);
RedisFuture<String> setB = connection.get("b" + i);
RedisFuture<String> setD = connection.get("d" + i);
Expand Down

0 comments on commit 8491dd8

Please sign in to comment.