From e5080c57ede709c84c889965cae7688f327a05c0 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 21 Sep 2015 12:42:57 +0200 Subject: [PATCH] Use timeouts in async ops --- .../redis/cluster/ClusterRule.java | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java b/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java index 63e70aa063..eff6dba4b4 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java +++ b/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.rules.TestRule; import org.junit.runner.Description; @@ -50,9 +51,7 @@ public void evaluate() throws Throwable { futures.add(connection.flushall()); } - for (Future future : futures) { - future.get(); - } + await(futures); } }; @@ -67,6 +66,13 @@ public void evaluate() throws Throwable { }; } + private void await(List> futures) throws InterruptedException, java.util.concurrent.ExecutionException, + java.util.concurrent.TimeoutException { + for (Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + } + public boolean isStable() { for (int port : ports) { @@ -102,7 +108,7 @@ public boolean isStable() { public void flushdb() { try { for (RedisAsyncConnection connection : connectionCache.values()) { - connection.flushdb().get(); + connection.flushdb().get(10, TimeUnit.SECONDS); } } catch (Exception e) { throw new IllegalStateException(e); @@ -113,9 +119,9 @@ public void clusterReset() { try { for (RedisAsyncConnectionImpl connection : connectionCache.values()) { - connection.clusterReset(false).get(); - connection.clusterReset(true).get(); - connection.clusterFlushslots().get(); + connection.clusterReset(false).get(10, TimeUnit.SECONDS); + connection.clusterReset(true).get(10, TimeUnit.SECONDS); + connection.clusterFlushslots().get(10, TimeUnit.SECONDS); } } catch (Exception e) { throw new IllegalStateException(e); @@ -123,9 +129,19 @@ public void clusterReset() { } public void meet(String host, int port) { + + List> futures = Lists.newArrayList(); for (RedisAsyncConnectionImpl redisAsyncConnection : connectionCache.values()) { - redisAsyncConnection.clusterMeet(host, port); + futures.add(redisAsyncConnection.clusterMeet(host, port)); } + + for (Future future : futures) { + try { + future.get(10, TimeUnit.SECONDS); + } catch (Exception ignore) { + } + } + } public RedisClusterClient getClusterClient() {