Skip to content

Commit

Permalink
Initial advanced client
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed May 24, 2015
1 parent cc220ce commit b34b091
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.lambdaworks.redis.cluster;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* Completes
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface ClusterCompletionStage<T> {

Map<RedisClusterNode, CompletionStage<T>> asMap();

Collection<RedisClusterNode> nodes();

Collection<CompletionStage<T>> stages();

CompletionStage<T> get(RedisClusterNode redisClusterNode);

CompletionStage<T> any();

CompletionStage<List<T>> all();

}
22 changes: 22 additions & 0 deletions src/main/java/com/lambdaworks/redis/cluster/NodeSelection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.lambdaworks.redis.cluster;

import java.util.Iterator;
import java.util.Map;

import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface NodeSelection<K, V> extends Iterable<RedisClusterAsyncConnection<K, V>> {

RedisClusterAsyncConnection<K, V> node(int index);

int size();

Map<RedisClusterNode, RedisClusterAsyncConnection<K, V>> asMap();

Map<RedisClusterNode, RedisFuture<String>> get(K key);
}
69 changes: 69 additions & 0 deletions src/main/java/com/lambdaworks/redis/cluster/NodeSelectionImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.lambdaworks.redis.cluster;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import io.netty.util.concurrent.CompleteFuture;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class NodeSelectionImpl<K, V> implements NodeSelection<K, V> {

private List<RedisClusterNode> nodes = Lists.newArrayList();
private RedisAdvancedClusterConnectionImpl<K, V> globalConnection;
private ClusterDistributionChannelWriter writer;

public NodeSelectionImpl(List<RedisClusterNode> nodes, RedisAdvancedClusterConnectionImpl<K, V> globalConnection) {
this.nodes = nodes;
this.globalConnection = globalConnection;
CompletableFuture cf1 = new CompletableFuture();

writer = (ClusterDistributionChannelWriter) globalConnection.getChannelWriter();
}

@Override
public RedisClusterAsyncConnection<K, V> node(int index) {
return null;
}

private RedisClusterAsyncConnection<K, V> getConnection(RedisClusterNode redisClusterNode) {

RedisURI uri = redisClusterNode.getUri();
return writer.getClusterConnectionProvider().getConnection(ClusterConnectionProvider.Intent.WRITE, uri.getHost(),
uri.getPort());
}

@Override
public int size() {
return nodes.size();
}

@Override
public Map<RedisClusterNode, RedisClusterAsyncConnection<K, V>> asMap() {
return nodes.stream().collect(
Collectors.toMap(redisClusterNode -> redisClusterNode, redisClusterNode1 -> getConnection(redisClusterNode1)));
}

@Override
public Map<RedisClusterNode, RedisFuture<String>> get(K key) {
for (RedisClusterAsyncConnection<K, V> kvRedisClusterAsyncConnection : this) {

}
return null;
}

@Override
public Iterator<RedisClusterAsyncConnection<K, V>> iterator() {
return nodes.stream().map(this::getConnection).iterator();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.lambdaworks.redis.cluster;

import java.util.function.Predicate;

import com.lambdaworks.redis.RedisClusterConnection;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface RedisAdvancedClusterConnection<K, V> extends RedisClusterConnection<K, V> {

/**
* Select all masters.
*
* @return
*/
default NodeSelection<K, V> masters() {
return nodes(redisClusterNode -> redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.MASTER));
}

/**
* Select all slaves.
*
* @return
*/
default NodeSelection<K, V> slaves() {
return nodes(redisClusterNode -> redisClusterNode.getFlags().contains(RedisClusterNode.NodeFlag.SLAVE));
}

/**
* Select all known cluster nodes.
*
* @return
*/
default NodeSelection<K, V> all() {
return nodes(redisClusterNode -> true);
}

/**
* Select nodes by a predicate
*
* @param predicate
* @return
*/
NodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.RedisAsyncConnectionImpl;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class RedisAdvancedClusterConnectionImpl<K, V> extends RedisAsyncConnectionImpl<K, V> implements
RedisAdvancedClusterConnection<K, V> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public RedisClusterNode(RedisURI uri, String nodeId, boolean connected, String s
this.flags = flags;
}

public static RedisClusterNode of(String nodeId) {
RedisClusterNode redisClusterNode = new RedisClusterNode();
redisClusterNode.setNodeId(nodeId);
return redisClusterNode;
}

public RedisURI getUri() {
return uri;
}
Expand Down Expand Up @@ -126,9 +132,6 @@ public boolean equals(Object o) {

RedisClusterNode that = (RedisClusterNode) o;

if (uri != null ? !uri.equals(that.uri) : that.uri != null) {
return false;
}
if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) {
return false;
}
Expand All @@ -138,8 +141,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = uri != null ? uri.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
int result = 31 * (nodeId != null ? nodeId.hashCode() : 0);
return result;
}

Expand Down

0 comments on commit b34b091

Please sign in to comment.