From b34b0914fe91dd23aa591aee57411574f882e394 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Sun, 24 May 2015 20:16:47 +0200 Subject: [PATCH] Initial advanced client --- .../redis/cluster/ClusterCompletionStage.java | 29 ++++++++ .../redis/cluster/NodeSelection.java | 22 ++++++ .../redis/cluster/NodeSelectionImpl.java | 69 +++++++++++++++++++ .../RedisAdvancedClusterConnection.java | 47 +++++++++++++ .../RedisAdvancedClusterConnectionImpl.java | 11 +++ .../models/partitions/RedisClusterNode.java | 12 ++-- 6 files changed, 185 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/lambdaworks/redis/cluster/ClusterCompletionStage.java create mode 100644 src/main/java/com/lambdaworks/redis/cluster/NodeSelection.java create mode 100644 src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java create mode 100644 src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java create mode 100644 src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java diff --git a/src/main/java/com/lambdaworks/redis/cluster/ClusterCompletionStage.java b/src/main/java/com/lambdaworks/redis/cluster/ClusterCompletionStage.java new file mode 100644 index 0000000000..7da6f6855a --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/ClusterCompletionStage.java @@ -0,0 +1,29 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * Completes + * + * @author Mark Paluch + */ +public interface ClusterCompletionStage { + + Map> asMap(); + + Collection nodes(); + + Collection> stages(); + + CompletionStage get(RedisClusterNode redisClusterNode); + + CompletionStage any(); + + CompletionStage> all(); + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/NodeSelection.java b/src/main/java/com/lambdaworks/redis/cluster/NodeSelection.java new file mode 100644 index 0000000000..30844ebf96 --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/NodeSelection.java @@ -0,0 +1,22 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Iterator; +import java.util.Map; + +import com.lambdaworks.redis.RedisClusterAsyncConnection; +import com.lambdaworks.redis.RedisFuture; +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * @author Mark Paluch + */ +public interface NodeSelection extends Iterable> { + + RedisClusterAsyncConnection node(int index); + + int size(); + + Map> asMap(); + + Map> get(K key); +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java b/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java new file mode 100644 index 0000000000..0d50485def --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java @@ -0,0 +1,69 @@ +package com.lambdaworks.redis.cluster; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import com.lambdaworks.redis.RedisClusterAsyncConnection; +import com.lambdaworks.redis.RedisFuture; +import com.lambdaworks.redis.RedisURI; +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; +import io.netty.util.concurrent.CompleteFuture; + +/** + * @author Mark Paluch + */ +public class NodeSelectionImpl implements NodeSelection { + + private List nodes = Lists.newArrayList(); + private RedisAdvancedClusterConnectionImpl globalConnection; + private ClusterDistributionChannelWriter writer; + + public NodeSelectionImpl(List nodes, RedisAdvancedClusterConnectionImpl globalConnection) { + this.nodes = nodes; + this.globalConnection = globalConnection; + CompletableFuture cf1 = new CompletableFuture(); + + writer = (ClusterDistributionChannelWriter) globalConnection.getChannelWriter(); + } + + @Override + public RedisClusterAsyncConnection node(int index) { + return null; + } + + private RedisClusterAsyncConnection getConnection(RedisClusterNode redisClusterNode) { + + RedisURI uri = redisClusterNode.getUri(); + return writer.getClusterConnectionProvider().getConnection(ClusterConnectionProvider.Intent.WRITE, uri.getHost(), + uri.getPort()); + } + + @Override + public int size() { + return nodes.size(); + } + + @Override + public Map> asMap() { + return nodes.stream().collect( + Collectors.toMap(redisClusterNode -> redisClusterNode, redisClusterNode1 -> getConnection(redisClusterNode1))); + } + + @Override + public Map> get(K key) { + for (RedisClusterAsyncConnection kvRedisClusterAsyncConnection : this) { + + } + return null; + } + + @Override + public Iterator> iterator() { + return nodes.stream().map(this::getConnection).iterator(); + } + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java new file mode 100644 index 0000000000..43485d590f --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnection.java @@ -0,0 +1,47 @@ +package com.lambdaworks.redis.cluster; + +import java.util.function.Predicate; + +import com.lambdaworks.redis.RedisClusterConnection; +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode; + +/** + * @author Mark Paluch + */ +public interface RedisAdvancedClusterConnection extends RedisClusterConnection { + + /** + * Select all masters. + * + * @return + */ + default NodeSelection masters() { + return nodes(redisClusterNode -> redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MASTER)); + } + + /** + * Select all slaves. + * + * @return + */ + default NodeSelection slaves() { + return nodes(redisClusterNode -> redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.SLAVE)); + } + + /** + * Select all known cluster nodes. + * + * @return + */ + default NodeSelection all() { + return nodes(redisClusterNode -> true); + } + + /** + * Select nodes by a predicate + * + * @param predicate + * @return + */ + NodeSelection nodes(Predicate predicate); +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java new file mode 100644 index 0000000000..04833b383e --- /dev/null +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisAdvancedClusterConnectionImpl.java @@ -0,0 +1,11 @@ +package com.lambdaworks.redis.cluster; + +import com.lambdaworks.redis.RedisAsyncConnectionImpl; + +/** + * @author Mark Paluch + */ +public class RedisAdvancedClusterConnectionImpl extends RedisAsyncConnectionImpl implements + RedisAdvancedClusterConnection { + +} diff --git a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java index 4a5b901dc4..61b1038815 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java +++ b/src/main/java/com/lambdaworks/redis/cluster/models/partitions/RedisClusterNode.java @@ -43,6 +43,12 @@ public RedisClusterNode(RedisURI uri, String nodeId, boolean connected, String s this.flags = flags; } + public static RedisClusterNode of(String nodeId) { + RedisClusterNode redisClusterNode = new RedisClusterNode(); + redisClusterNode.setNodeId(nodeId); + return redisClusterNode; + } + public RedisURI getUri() { return uri; } @@ -126,9 +132,6 @@ public boolean equals(Object o) { RedisClusterNode that = (RedisClusterNode) o; - if (uri != null ? !uri.equals(that.uri) : that.uri != null) { - return false; - } if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) { return false; } @@ -138,8 +141,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = uri != null ? uri.hashCode() : 0; - result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); + int result = 31 * (nodeId != null ? nodeId.hashCode() : 0); return result; }