Skip to content

Commit

Permalink
Use timeouts in async ops
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Sep 21, 2015
1 parent 0377488 commit e5080c5
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.rules.TestRule;
import org.junit.runner.Description;
Expand Down Expand Up @@ -50,9 +51,7 @@ public void evaluate() throws Throwable {
futures.add(connection.flushall());
}

for (Future<?> future : futures) {
future.get();
}
await(futures);
}
};

Expand All @@ -67,6 +66,13 @@ public void evaluate() throws Throwable {
};
}

private void await(List<Future<?>> futures) throws InterruptedException, java.util.concurrent.ExecutionException,
java.util.concurrent.TimeoutException {
for (Future<?> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
}

public boolean isStable() {

for (int port : ports) {
Expand Down Expand Up @@ -102,7 +108,7 @@ public boolean isStable() {
public void flushdb() {
try {
for (RedisAsyncConnection<?, ?> connection : connectionCache.values()) {
connection.flushdb().get();
connection.flushdb().get(10, TimeUnit.SECONDS);
}
} catch (Exception e) {
throw new IllegalStateException(e);
Expand All @@ -113,19 +119,29 @@ public void clusterReset() {
try {

for (RedisAsyncConnectionImpl<?, ?> connection : connectionCache.values()) {
connection.clusterReset(false).get();
connection.clusterReset(true).get();
connection.clusterFlushslots().get();
connection.clusterReset(false).get(10, TimeUnit.SECONDS);
connection.clusterReset(true).get(10, TimeUnit.SECONDS);
connection.clusterFlushslots().get(10, TimeUnit.SECONDS);
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

public void meet(String host, int port) {

List<Future<?>> futures = Lists.newArrayList();
for (RedisAsyncConnectionImpl<?, ?> redisAsyncConnection : connectionCache.values()) {
redisAsyncConnection.clusterMeet(host, port);
futures.add(redisAsyncConnection.clusterMeet(host, port));
}

for (Future<?> future : futures) {
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception ignore) {
}
}

}

public RedisClusterClient getClusterClient() {
Expand Down

0 comments on commit e5080c5

Please sign in to comment.