diff --git a/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java b/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java index 486cf007f2..0a41d21d48 100644 --- a/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java +++ b/src/main/java/com/lambdaworks/redis/AbstractRedisClient.java @@ -86,7 +86,7 @@ public void setDefaultTimeout(long timeout, TimeUnit unit) { } @SuppressWarnings("unchecked") - protected > T connectAsyncImpl(final CommandHandler handler, + protected > T connectAsyncImpl(final CommandHandler handler, final T connection, final Supplier socketAddressSupplier) { ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder(); diff --git a/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java b/src/main/java/com/lambdaworks/redis/cluster/AbstractNodeSelection.java similarity index 67% rename from src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java rename to src/main/java/com/lambdaworks/redis/cluster/AbstractNodeSelection.java index 0d50485def..c6bc6e1a3e 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java +++ b/src/main/java/com/lambdaworks/redis/cluster/AbstractNodeSelection.java @@ -3,53 +3,52 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import com.google.common.collect.Lists; import com.lambdaworks.redis.RedisClusterAsyncConnection; import com.lambdaworks.redis.RedisFuture; import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; -import io.netty.util.concurrent.CompleteFuture; /** * @author Mark Paluch */ -public class NodeSelectionImpl implements NodeSelection { +abstract class AbstractNodeSelection implements NodeSelection { - private List nodes = Lists.newArrayList(); - private RedisAdvancedClusterConnectionImpl globalConnection; - private ClusterDistributionChannelWriter writer; + protected RedisAdvancedClusterConnectionImpl globalConnection; + protected ClusterDistributionChannelWriter writer; - public NodeSelectionImpl(List nodes, RedisAdvancedClusterConnectionImpl globalConnection) { - this.nodes = nodes; + public AbstractNodeSelection(RedisAdvancedClusterConnectionImpl globalConnection) { this.globalConnection = globalConnection; - CompletableFuture cf1 = new CompletableFuture(); - writer = (ClusterDistributionChannelWriter) globalConnection.getChannelWriter(); } @Override public RedisClusterAsyncConnection node(int index) { - return null; + + RedisClusterNode redisClusterNode = nodes().get(index); + return getConnection(redisClusterNode); } private RedisClusterAsyncConnection getConnection(RedisClusterNode redisClusterNode) { - RedisURI uri = redisClusterNode.getUri(); return writer.getClusterConnectionProvider().getConnection(ClusterConnectionProvider.Intent.WRITE, uri.getHost(), uri.getPort()); } + /** + * @return List of involved nodes + */ + protected abstract List nodes(); + @Override public int size() { - return nodes.size(); + return nodes().size(); } @Override public Map> asMap() { - return nodes.stream().collect( + return nodes().stream().collect( Collectors.toMap(redisClusterNode -> redisClusterNode, redisClusterNode1 -> getConnection(redisClusterNode1))); } @@ -63,7 +62,6 @@ public Map> get(K key) { @Override public Iterator> iterator() { - return nodes.stream().map(this::getConnection).iterator(); + return nodes().stream().map(this::getConnection).iterator(); } - } diff --git a/src/main/java/com/lambdaworks/redis/cluster/AsyncExecutions.java b/src/main/java/com/lambdaworks/redis/cluster/AsyncExecutions.java new file mode 100644 index 0000000000..ca2ccf7123 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/AsyncExecutions.java @@ -0,0 +1,31 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * Completes + * + * @author Mark Paluch + */ +public interface AsyncExecutions extends Iterable> { + + Map> asMap(); + + Collection nodes(); + + CompletionStage get(RedisClusterNode redisClusterNode); + + CompletableFuture[] futures(); + + @Override + default Iterator> iterator() { + return asMap().values().iterator(); + } +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/AsyncExecutionsImpl.java b/src/main/java/com/lambdaworks/redis/cluster/AsyncExecutionsImpl.java new file mode 100644 index 0000000000..ed71748940 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/AsyncExecutionsImpl.java @@ -0,0 +1,42 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import com.google.common.collect.ImmutableMap; +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * @author Mark Paluch + */ +public class AsyncExecutionsImpl implements AsyncExecutions { + + private Map> executions; + + public AsyncExecutionsImpl(Map> executions) { + this.executions = ImmutableMap.copyOf(executions); + } + + @Override + public Map> asMap() { + return executions; + } + + @Override + public Collection nodes() { + return executions.keySet(); + } + + @Override + public CompletionStage get(RedisClusterNode redisClusterNode) { + return executions.get(redisClusterNode); + } + + @Override + public CompletableFuture[] futures() { + return executions.values().toArray(new CompletableFuture[executions.size()]); + } + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/DynamicNodeSelection.java b/src/main/java/com/lambdaworks/redis/cluster/DynamicNodeSelection.java new file mode 100644 index 0000000000..3e2b9f0d86 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/DynamicNodeSelection.java @@ -0,0 +1,26 @@ +package com.lambdaworks.redis.cluster; + +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * @author Mark Paluch + */ +class DynamicNodeSelection extends AbstractNodeSelection { + + private final Predicate selector; + + public DynamicNodeSelection(RedisAdvancedClusterConnectionImpl globalConnection, Predicate selector) { + super(globalConnection); + + this.selector = selector; + } + + @Override + protected List nodes() { + return globalConnection.getPartitions().getPartitions().stream().filter(selector).collect(Collectors.toList()); + } +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/ExampleForMultipleAsyncExecutions.java b/src/main/java/com/lambdaworks/redis/cluster/ExampleForMultipleAsyncExecutions.java new file mode 100644 index 0000000000..ab616a66bb --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/ExampleForMultipleAsyncExecutions.java @@ -0,0 +1,57 @@ +package com.lambdaworks.redis.cluster; + +import java.util.List; + +import com.lambdaworks.redis.ScriptOutputType; +import com.lambdaworks.redis.output.KeyStreamingChannel; + +/** + * @author Mark Paluch + */ +public interface ExampleForMultipleAsyncExecutions { + /** + * Find all keys matching the given pattern. + * + * @param pattern the pattern type: patternkey (pattern) + * @return RedisFuture<List<K>> array-reply list of keys matching {@code pattern}. + */ + AsyncExecutions> keys(K pattern); + + /** + * Find all keys matching the given pattern. + * + * @param channel the channel + * @param pattern the pattern + * + * @return RedisFuture<Long> array-reply list of keys matching {@code pattern}. + */ + AsyncExecutions keys(KeyStreamingChannel channel, K pattern); + + /** + * Set a configuration parameter to the given value. + * + * @param parameter the parameter name + * @param value the parameter value + * @return RedisFuture<String> simple-string-reply: {@code OK} when the configuration was set properly. Otherwise an + * error is returned. + */ + AsyncExecutions configSet(String parameter, String value); + + /** + * Execute a Lua script server side. + * + * @param script Lua 5.1 script. + * @param type output type + * @param keys key names + * @param expected return type + * @return script result + */ + AsyncExecutions eval(String script, ScriptOutputType type, K... keys); + + /** + * Kill the script currently in execution. + * + * @return RedisFuture<String> simple-string-reply + */ + AsyncExecutions scriptKill(); +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionAsyncOperations.java b/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionAsyncOperations.java new file mode 100644 index 0000000000..c29e8c921a --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionAsyncOperations.java @@ -0,0 +1,8 @@ +package com.lambdaworks.redis.cluster; + +/** + * @author Mark Paluch + */ +public interface NodeSelectionAsyncOperations extends NodeSelection, ExampleForMultipleAsyncExecutions { + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionInvocationHandler.java b/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionInvocationHandler.java new file mode 100644 index 0000000000..612d2857c4 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionInvocationHandler.java @@ -0,0 +1,88 @@ +package com.lambdaworks.redis.cluster; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.CompletionStage; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.reflect.AbstractInvocationHandler; +import com.lambdaworks.redis.RedisClusterAsyncConnection; +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * @author Mark Paluch + */ +class NodeSelectionInvocationHandler extends AbstractInvocationHandler { + + private NodeSelection selection; + private Map nodeSelectionMethods = new WeakHashMap<>(); + private Map connectionMethod = new WeakHashMap<>(); + + public NodeSelectionInvocationHandler(NodeSelection selection) { + this.selection = selection; + } + + @Override + protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable { + + try { + Method targetMethod = findMethod(RedisClusterAsyncConnection.class, method, connectionMethod); + if (targetMethod != null) { + Map> connections = ImmutableMap.copyOf(selection + .asMap()); + Map> executions = Maps.newHashMap(); + + for (Map.Entry> entry : connections.entrySet()) { + + CompletionStage result = (CompletionStage) targetMethod.invoke(entry.getValue(), args); + executions.put(entry.getKey(), result); + } + + return new AsyncExecutionsImpl<>((Map) executions); + + } + + targetMethod = findMethod(NodeSelection.class, method, nodeSelectionMethods); + return targetMethod.invoke(selection, args); + } catch (InvocationTargetException e) { + throw e.getTargetException(); + } + } + + private Method findMethod(Class type, Method method, Map cache) { + if (cache.containsKey(method)) { + return cache.get(method); + } + + for (Method typeMethod : type.getMethods()) { + if (!typeMethod.getName().equals(method.getName()) + || !Arrays.equals(typeMethod.getParameterTypes(), method.getParameterTypes())) { + continue; + } + + synchronized (cache) { + cache.put(method, typeMethod); + return typeMethod; + } + } + + // Null-marker to avoid full class method scans. + cache.put(method, null); + return null; + + } + + private static class Tuple { + public K key; + public V value; + + public Tuple(K key, V value) { + this.key = key; + this.value = value; + } + } +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java index 43485d590f..10aeacc208 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java @@ -2,20 +2,22 @@ import java.util.function.Predicate; +import com.lambdaworks.redis.RedisClusterAsyncConnection; import com.lambdaworks.redis.RedisClusterConnection; +import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; /** * @author Mark Paluch */ -public interface RedisAdvancedClusterConnection extends RedisClusterConnection { +public interface RedisAdvancedClusterConnection extends RedisClusterAsyncConnection { /** * Select all masters. * * @return */ - default NodeSelection masters() { + default NodeSelectionAsyncOperations masters() { return nodes(redisClusterNode -> redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MASTER)); } @@ -24,7 +26,7 @@ default NodeSelection masters() { * * @return */ - default NodeSelection slaves() { + default NodeSelectionAsyncOperations slaves() { return nodes(redisClusterNode -> redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.SLAVE)); } @@ -33,15 +35,32 @@ default NodeSelection slaves() { * * @return */ - default NodeSelection all() { + default NodeSelectionAsyncOperations all() { return nodes(redisClusterNode -> true); } /** - * Select nodes by a predicate + * Select nodes by a predicate and keeps a static selection. The set of nodes within the {@link NodeSelection} does not + * change when the cluster view changes. * * @param predicate - * @return + * @return a {@linkplain NodeSelectionAsyncOperations} matching {@code predicate} + */ + NodeSelectionAsyncOperations nodes(Predicate predicate); + + /** + * Select nodes by a predicate + * + * @param predicate + * @param dynamic Defines, whether the set of nodes within the {@link NodeSelection} can change when the cluster view + * changes. + * @return a {@linkplain NodeSelection} matching {@code predicate} + */ + NodeSelectionAsyncOperations nodes(Predicate predicate, boolean dynamic); + + /** + * + * @return the Partitions/Cluster view. */ - NodeSelection nodes(Predicate predicate); + Partitions getPartitions(); } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java index 04833b383e..92cd9a297d 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java @@ -1,11 +1,58 @@ package com.lambdaworks.redis.cluster; +import java.lang.reflect.Proxy; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + import com.lambdaworks.redis.RedisAsyncConnectionImpl; +import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.cluster.models.partitions.Partitions; +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; +import com.lambdaworks.redis.codec.RedisCodec; +import io.netty.channel.ChannelHandler; /** * @author Mark Paluch */ +@ChannelHandler.Sharable public class RedisAdvancedClusterConnectionImpl extends RedisAsyncConnectionImpl implements RedisAdvancedClusterConnection { + private Partitions partitions; + + /** + * Initialize a new connection. + * + * @param writer the channel writer + * @param codec Codec used to encode/decode keys and values. + * @param timeout Maximum time to wait for a response. + * @param unit Unit of time for the timeout. + */ + public RedisAdvancedClusterConnectionImpl(RedisChannelWriter writer, RedisCodec codec, long timeout, + TimeUnit unit) { + super(writer, codec, timeout, unit); + } + + @Override + public NodeSelectionAsyncOperations nodes(Predicate predicate) { + return nodes(predicate, false); + } + + @Override + public NodeSelectionAsyncOperations nodes(Predicate predicate, boolean dynamic) { + + NodeSelection selection = new StaticNodeSelection<>(this, predicate); + NodeSelectionInvocationHandler h = new NodeSelectionInvocationHandler(selection); + return (NodeSelectionAsyncOperations) Proxy.newProxyInstance(NodeSelection.class.getClassLoader(), + new Class[] { NodeSelectionAsyncOperations.class }, h); + } + + @Override + public Partitions getPartitions() { + return partitions; + } + + public void setPartitions(Partitions partitions) { + this.partitions = partitions; + } } diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index 746b1e0c81..dafb9ef55a 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -1,6 +1,8 @@ package com.lambdaworks.redis.cluster; -import static com.google.common.base.Preconditions.*; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import java.net.SocketAddress; import java.util.Collections; @@ -11,7 +13,11 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import com.lambdaworks.redis.*; +import com.lambdaworks.redis.AbstractRedisClient; +import com.lambdaworks.redis.RedisChannelWriter; +import com.lambdaworks.redis.RedisClusterConnection; +import com.lambdaworks.redis.RedisException; +import com.lambdaworks.redis.RedisURI; import com.lambdaworks.redis.cluster.models.partitions.ClusterPartitionParser; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; @@ -95,7 +101,7 @@ public RedisClusterConnection connectCluster(RedisCodec codec * * @return A new connection. */ - public RedisClusterAsyncConnection connectClusterAsync() { + public RedisAdvancedClusterConnection connectClusterAsync() { return connectClusterAsyncImpl(codec, getSocketAddressSupplier()); } @@ -107,11 +113,11 @@ public RedisClusterAsyncConnection connectClusterAsync() { * @param Value type. * @return A new connection. */ - public RedisClusterAsyncConnection connectClusterAsync(RedisCodec codec) { + public RedisAdvancedClusterConnection connectClusterAsync(RedisCodec codec) { return connectClusterAsyncImpl(codec, getSocketAddressSupplier()); } - protected RedisAsyncConnectionImpl connectAsyncImpl(SocketAddress socketAddress) { + protected RedisAdvancedClusterConnectionImpl connectAsyncImpl(SocketAddress socketAddress) { return connectAsyncImpl(codec, socketAddress); } @@ -123,13 +129,13 @@ protected RedisAsyncConnectionImpl connectAsyncImpl(SocketAddres * @param Value type. * @return a new connection */ - RedisAsyncConnectionImpl connectAsyncImpl(RedisCodec codec, final SocketAddress socketAddress) { + RedisAdvancedClusterConnectionImpl connectAsyncImpl(RedisCodec codec, final SocketAddress socketAddress) { logger.debug("connectAsyncImpl(" + socketAddress + ")"); BlockingQueue> queue = new LinkedBlockingQueue>(); CommandHandler handler = new CommandHandler(clientOptions, queue); - RedisAsyncConnectionImpl connection = newRedisAsyncConnectionImpl(handler, codec, timeout, unit); + RedisAdvancedClusterConnectionImpl connection = newRedisAsyncConnectionImpl(handler, codec, timeout, unit); connectAsyncImpl(handler, connection, new Supplier() { @Override @@ -143,7 +149,7 @@ public SocketAddress get() { return connection; } - RedisAsyncConnectionImpl connectClusterAsyncImpl(RedisCodec codec) { + RedisAdvancedClusterConnectionImpl connectClusterAsyncImpl(RedisCodec codec) { return connectClusterAsyncImpl(codec, getSocketAddressSupplier()); } @@ -156,7 +162,7 @@ RedisAsyncConnectionImpl connectClusterAsyncImpl(RedisCodec c * @param Value type. * @return a new connection */ - RedisAsyncConnectionImpl connectClusterAsyncImpl(RedisCodec codec, + RedisAdvancedClusterConnectionImpl connectClusterAsyncImpl(RedisCodec codec, final Supplier socketAddressSupplier) { if (partitions == null) { @@ -173,7 +179,11 @@ RedisAsyncConnectionImpl connectClusterAsyncImpl(RedisCodec c final ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(handler, pooledClusterConnectionProvider); - RedisAsyncConnectionImpl connection = newRedisAsyncConnectionImpl(clusterWriter, codec, timeout, unit); + RedisAdvancedClusterConnectionImpl connection = newRedisAsyncConnectionImpl(clusterWriter, codec, timeout, unit); + if (getPartitions() == null) { + reloadPartitions(); + } + connection.setPartitions(partitions); connectAsyncImpl(handler, connection, socketAddressSupplier); @@ -222,7 +232,8 @@ protected Partitions loadPartitions() { for (RedisURI initialUri : initialUris) { try { - RedisAsyncConnectionImpl connection = connectAsyncImpl(initialUri.getResolvedAddress()); + RedisAdvancedClusterConnectionImpl connection = connectAsyncImpl(initialUri + .getResolvedAddress()); nodeUri = initialUri; clusterNodes = connection.clusterNodes().get(); connection.close(); @@ -257,8 +268,8 @@ protected Partitions loadPartitions() { } /** - * Construct a new {@link RedisAsyncConnectionImpl}. Can be overridden in order to construct a subclass of - * {@link RedisAsyncConnectionImpl} + * Construct a new {@link RedisAdvancedClusterConnectionImpl}. Can be overridden in order to construct a subclass of + * {@link RedisAdvancedClusterConnectionImpl} * * @param channelWriter the channel writer * @param codec the codec to use @@ -268,9 +279,9 @@ protected Partitions loadPartitions() { * @param Value type. * @return RedisAsyncConnectionImpl<K, V> instance */ - protected RedisAsyncConnectionImpl newRedisAsyncConnectionImpl(RedisChannelWriter channelWriter, - RedisCodec codec, long timeout, TimeUnit unit) { - return new RedisAsyncConnectionImpl(channelWriter, codec, timeout, unit); + protected RedisAdvancedClusterConnectionImpl newRedisAsyncConnectionImpl( + RedisChannelWriter channelWriter, RedisCodec codec, long timeout, TimeUnit unit) { + return new RedisAdvancedClusterConnectionImpl(channelWriter, codec, timeout, unit); } protected RedisURI getFirstUri() { diff --git a/src/main/java/com/lambdaworks/redis/cluster/StaticNodeSelection.java b/src/main/java/com/lambdaworks/redis/cluster/StaticNodeSelection.java new file mode 100644 index 0000000000..d55fdf35e0 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/StaticNodeSelection.java @@ -0,0 +1,27 @@ +package com.lambdaworks.redis.cluster; + +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * @author Mark Paluch + */ +class StaticNodeSelection extends AbstractNodeSelection { + + private final List redisClusterNodes; + + public StaticNodeSelection(RedisAdvancedClusterConnectionImpl globalConnection, Predicate selector) { + super(globalConnection); + + this.redisClusterNodes = globalConnection.getPartitions().getPartitions().stream().filter(selector) + .collect(Collectors.toList()); + } + + @Override + protected List nodes() { + return redisClusterNodes; + } +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java index 61b1038815..a743c5586c 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java +++ b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java @@ -164,6 +164,24 @@ public String toString() { return sb.toString(); } + /** + * + * @param nodeFlag + * @return true if the {@linkplain NodeFlag} is contained within the flags. + */ + public boolean is(NodeFlag nodeFlag) { + return getFlags().contains(nodeFlag); + } + + /** + * + * @param slot + * @return true if the slot is contained within the handled slots. + */ + public boolean hasSlot(int slot) { + return getSlots().contains(slot); + } + public enum NodeFlag { NOFLAGS, MYSELF, SLAVE, MASTER, EVENTUAL_FAIL, FAIL, HANDSHAKE, NOADDR; } diff --git a/src/test/java/com/lambdaworks/redis/cluster/AbstractClusterTest.java b/src/test/java/com/lambdaworks/redis/cluster/AbstractClusterTest.java new file mode 100644 index 0000000000..7ec18f33c0 --- /dev/null +++ b/src/test/java/com/lambdaworks/redis/cluster/AbstractClusterTest.java @@ -0,0 +1,58 @@ +package com.lambdaworks.redis.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; + +import com.google.common.collect.ImmutableList; +import com.lambdaworks.redis.RedisClusterAsyncConnection; +import com.lambdaworks.redis.RedisURI; +import com.lambdaworks.redis.TestSettings; + +/** + * @author Mark Paluch + */ +public class AbstractClusterTest { + + public static final String host = TestSettings.hostAddr(); + public static final int port1 = 7379; + public static final int port2 = 7380; + public static final int port3 = 7381; + public static final int port4 = 7382; + + protected static RedisClusterClient clusterClient; + + protected Logger log = Logger.getLogger(getClass()); + + protected RedisClusterAsyncConnection redis1; + + protected String key = "key"; + protected String value = "value"; + + @Rule + public ClusterRule clusterRule = new ClusterRule(clusterClient, port1, port2, port3, port4); + + @BeforeClass + public static void setupClusterClient() throws Exception { + clusterClient = new RedisClusterClient(ImmutableList.of(RedisURI.Builder.redis(host, port1).build())); + } + + @AfterClass + public static void shutdownClusterClient() { + clusterClient.shutdown(0, 0, TimeUnit.MILLISECONDS); + } + + public static int[] createSlots(int from, int to) { + int[] result = new int[to - from]; + int counter = 0; + for (int i = from; i < to; i++) { + result[counter++] = i; + + } + return result; + } + +} diff --git a/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java new file mode 100644 index 0000000000..6807392b88 --- /dev/null +++ b/src/test/java/com/lambdaworks/redis/cluster/AdvancedClusterClientTest.java @@ -0,0 +1,133 @@ +package com.lambdaworks.redis.cluster; + +import static com.google.code.tempusfugit.temporal.Duration.seconds; +import static com.google.code.tempusfugit.temporal.Timeout.timeout; +import static com.lambdaworks.redis.ScriptOutputType.STATUS; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Vector; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.code.tempusfugit.temporal.Duration; +import com.google.code.tempusfugit.temporal.ThreadSleep; +import com.google.code.tempusfugit.temporal.WaitFor; +import com.google.common.collect.Lists; +import com.lambdaworks.redis.RedisClusterAsyncConnection; +import com.lambdaworks.redis.cluster.models.partitions.Partitions; +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * @author Mark Paluch + */ +public class AdvancedClusterClientTest extends AbstractClusterTest { + + protected RedisAdvancedClusterConnection connection; + + @Before + public void before() throws Exception { + + WaitFor.waitOrTimeout(() -> { + return clusterRule.isStable(); + }, timeout(seconds(5)), new ThreadSleep(Duration.millis(500))); + + connection = clusterClient.connectClusterAsync(); + } + + @After + public void after() throws Exception { + connection.close(); + } + + @Test + public void partitions() throws Exception { + + Partitions partitions = connection.getPartitions(); + assertThat(partitions).hasSize(4); + + partitions.iterator().forEachRemaining( + redisClusterNode -> System.out.println(redisClusterNode.getNodeId() + ": " + redisClusterNode.getFlags() + " " + + redisClusterNode.getUri())); + } + + @Test + public void testNodeSelectionCount() throws Exception { + assertThat(connection.all().size()).isEqualTo(4); + assertThat(connection.slaves().size()).isEqualTo(0); + assertThat(connection.masters().size()).isEqualTo(4); + + assertThat(connection.nodes(redisClusterNode -> redisClusterNode.is(RedisClusterNode.NodeFlag.MYSELF)).size()) + .isEqualTo(1); + } + + @Test + public void testNodeSelection() throws Exception { + + NodeSelection onlyMe = connection.nodes(redisClusterNode -> redisClusterNode.getFlags().contains( + RedisClusterNode.NodeFlag.MYSELF)); + + Map> map = onlyMe.asMap(); + + assertThat(map).hasSize(1); + + RedisClusterAsyncConnection node = onlyMe.node(0); + assertThat(node).isNotNull(); + } + + @Test + public void testMultiNodeOperations() throws Exception { + + List expectation = Lists.newArrayList(); + for (char c = 'a'; c < 'z'; c++) { + String key = new String(new char[] { c, c, c }); + expectation.add(key); + connection.set(key, value); + } + + List result = new Vector<>(); + + CompletableFuture.allOf(connection.masters().keys(result::add, "*").futures()).get(); + + assertThat(result).hasSize(expectation.size()); + + Collections.sort(expectation); + Collections.sort(result); + + assertThat(result).isEqualTo(expectation); + } + + @Test + public void testAsynchronicityOfMultiNodeExeccution() throws Exception { + + RedisAdvancedClusterConnection connection2 = clusterClient.connectClusterAsync(); + + NodeSelectionAsyncOperations masters = connection2.masters(); + CompletableFuture.allOf(masters.configSet("lua-time-limit", "10").futures()).get(); + AsyncExecutions eval = masters.eval("while true do end", STATUS, new String[0]); + assertThat(eval.nodes()).hasSize(4); + + for (CompletableFuture future : eval.futures()) { + + assertThat(future.isDone()).isFalse(); + assertThat(future.isCancelled()).isFalse(); + } + + AsyncExecutions kill = connection.masters().scriptKill(); + CompletableFuture.allOf(kill.futures()).get(); + + for (CompletionStage execution : kill) { + assertThat(execution.toCompletableFuture().get()).isEqualTo("OK"); + } + + for (CompletableFuture future : eval.futures()) { + assertThat(future.isDone()).isTrue(); + } + } +} diff --git a/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java b/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java index 6459685fda..1610febec4 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java +++ b/src/test/java/com/lambdaworks/redis/cluster/ClusterRule.java @@ -121,4 +121,10 @@ public void clusterReset() { throw new IllegalStateException(e); } } + + public void meet(String host, int port) { + for (RedisAsyncConnectionImpl redisAsyncConnection : connectionCache.values()) { + redisAsyncConnection.clusterMeet(host, port); + } + } } diff --git a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java index 765157100d..c8bcdc01bf 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/RedisClusterClientTest.java @@ -41,7 +41,6 @@ import com.lambdaworks.redis.RedisException; import com.lambdaworks.redis.RedisFuture; import com.lambdaworks.redis.RedisURI; -import com.lambdaworks.redis.TestSettings; import com.lambdaworks.redis.cluster.models.partitions.ClusterPartitionParser; import com.lambdaworks.redis.cluster.models.partitions.Partitions; import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; @@ -50,16 +49,9 @@ @FixMethodOrder(MethodSorters.NAME_ASCENDING) @SuppressWarnings("unchecked") -public class RedisClusterClientTest { - - public static final String host = TestSettings.hostAddr(); - public static final int port1 = 7379; - public static final int port2 = 7380; - public static final int port3 = 7381; - public static final int port4 = 7382; +public class RedisClusterClientTest extends AbstractClusterTest { protected static RedisClient client; - protected static RedisClusterClient clusterClient; protected Logger log = Logger.getLogger(getClass()); @@ -78,25 +70,13 @@ public class RedisClusterClientTest { @BeforeClass public static void setupClient() throws Exception { + setupClusterClient(); client = new RedisClient(host, port1); - - clusterClient = new RedisClusterClient(ImmutableList.of(RedisURI.Builder.redis(host, port1).build())); - - } - - private static int[] createSlots(int from, int to) { - int[] result = new int[to - from]; - int counter = 0; - for (int i = from; i < to; i++) { - result[counter++] = i; - - } - return result; } @AfterClass public static void shutdownClient() { - + shutdownClusterClient(); client.shutdown(0, 0, TimeUnit.MILLISECONDS); }