Skip to content

Commit

Permalink
Cluster operations on a selection/set of nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed May 25, 2015
1 parent b34b091 commit bda69f3
Show file tree
Hide file tree
Showing 17 changed files with 613 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void setDefaultTimeout(long timeout, TimeUnit unit) {
}

@SuppressWarnings("unchecked")
protected <K, V, T extends RedisAsyncConnectionImpl<K, V>> T connectAsyncImpl(final CommandHandler<K, V> handler,
protected <K, V, T extends RedisChannelHandler<K, V>> T connectAsyncImpl(final CommandHandler<K, V> handler,
final T connection, final Supplier<SocketAddress> socketAddressSupplier) {

ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,52 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import io.netty.util.concurrent.CompleteFuture;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class NodeSelectionImpl<K, V> implements NodeSelection<K, V> {
abstract class AbstractNodeSelection<K, V> implements NodeSelection<K, V> {

private List<RedisClusterNode> nodes = Lists.newArrayList();
private RedisAdvancedClusterConnectionImpl<K, V> globalConnection;
private ClusterDistributionChannelWriter writer;
protected RedisAdvancedClusterConnectionImpl<K, V> globalConnection;
protected ClusterDistributionChannelWriter writer;

public NodeSelectionImpl(List<RedisClusterNode> nodes, RedisAdvancedClusterConnectionImpl<K, V> globalConnection) {
this.nodes = nodes;
public AbstractNodeSelection(RedisAdvancedClusterConnectionImpl<K, V> globalConnection) {
this.globalConnection = globalConnection;
CompletableFuture cf1 = new CompletableFuture();

writer = (ClusterDistributionChannelWriter) globalConnection.getChannelWriter();
}

@Override
public RedisClusterAsyncConnection<K, V> node(int index) {
return null;

RedisClusterNode redisClusterNode = nodes().get(index);
return getConnection(redisClusterNode);
}

private RedisClusterAsyncConnection<K, V> getConnection(RedisClusterNode redisClusterNode) {

RedisURI uri = redisClusterNode.getUri();
return writer.getClusterConnectionProvider().getConnection(ClusterConnectionProvider.Intent.WRITE, uri.getHost(),
uri.getPort());
}

/**
* @return List of involved nodes
*/
protected abstract List<RedisClusterNode> nodes();

@Override
public int size() {
return nodes.size();
return nodes().size();
}

@Override
public Map<RedisClusterNode, RedisClusterAsyncConnection<K, V>> asMap() {
return nodes.stream().collect(
return nodes().stream().collect(
Collectors.toMap(redisClusterNode -> redisClusterNode, redisClusterNode1 -> getConnection(redisClusterNode1)));
}

Expand All @@ -63,7 +62,6 @@ public Map<RedisClusterNode, RedisFuture<String>> get(K key) {

@Override
public Iterator<RedisClusterAsyncConnection<K, V>> iterator() {
return nodes.stream().map(this::getConnection).iterator();
return nodes().stream().map(this::getConnection).iterator();
}

}
31 changes: 31 additions & 0 deletions src/main/java/com/lambdaworks/redis/cluster/AsyncExecutions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.lambdaworks.redis.cluster;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* Completes
*
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface AsyncExecutions<T> extends Iterable<CompletionStage<T>> {

Map<RedisClusterNode, CompletionStage<T>> asMap();

Collection<RedisClusterNode> nodes();

CompletionStage<T> get(RedisClusterNode redisClusterNode);

CompletableFuture<T>[] futures();

@Override
default Iterator<CompletionStage<T>> iterator() {
return asMap().values().iterator();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.lambdaworks.redis.cluster;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import com.google.common.collect.ImmutableMap;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class AsyncExecutionsImpl<T> implements AsyncExecutions<T> {

private Map<RedisClusterNode, CompletionStage<T>> executions;

public AsyncExecutionsImpl(Map<RedisClusterNode, CompletionStage<T>> executions) {
this.executions = ImmutableMap.copyOf(executions);
}

@Override
public Map<RedisClusterNode, CompletionStage<T>> asMap() {
return executions;
}

@Override
public Collection<RedisClusterNode> nodes() {
return executions.keySet();
}

@Override
public CompletionStage<T> get(RedisClusterNode redisClusterNode) {
return executions.get(redisClusterNode);
}

@Override
public CompletableFuture<T>[] futures() {
return executions.values().toArray(new CompletableFuture[executions.size()]);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.lambdaworks.redis.cluster;

import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
class DynamicNodeSelection<K, V> extends AbstractNodeSelection<K, V> {

private final Predicate<RedisClusterNode> selector;

public DynamicNodeSelection(RedisAdvancedClusterConnectionImpl<K, V> globalConnection, Predicate<RedisClusterNode> selector) {
super(globalConnection);

this.selector = selector;
}

@Override
protected List<RedisClusterNode> nodes() {
return globalConnection.getPartitions().getPartitions().stream().filter(selector).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.lambdaworks.redis.cluster;

import java.util.List;

import com.lambdaworks.redis.ScriptOutputType;
import com.lambdaworks.redis.output.KeyStreamingChannel;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface ExampleForMultipleAsyncExecutions<K, V> {
/**
* Find all keys matching the given pattern.
*
* @param pattern the pattern type: patternkey (pattern)
* @return RedisFuture&lt;List&lt;K&gt;&gt; array-reply list of keys matching {@code pattern}.
*/
AsyncExecutions<List<K>> keys(K pattern);

/**
* Find all keys matching the given pattern.
*
* @param channel the channel
* @param pattern the pattern
*
* @return RedisFuture&lt;Long&gt; array-reply list of keys matching {@code pattern}.
*/
AsyncExecutions<Long> keys(KeyStreamingChannel<K> channel, K pattern);

/**
* Set a configuration parameter to the given value.
*
* @param parameter the parameter name
* @param value the parameter value
* @return RedisFuture&lt;String&gt; simple-string-reply: {@code OK} when the configuration was set properly. Otherwise an
* error is returned.
*/
AsyncExecutions<String> configSet(String parameter, String value);

/**
* Execute a Lua script server side.
*
* @param script Lua 5.1 script.
* @param type output type
* @param keys key names
* @param <T> expected return type
* @return script result
*/
<T> AsyncExecutions<T> eval(String script, ScriptOutputType type, K... keys);

/**
* Kill the script currently in execution.
*
* @return RedisFuture&lt;String&gt; simple-string-reply
*/
AsyncExecutions<String> scriptKill();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.lambdaworks.redis.cluster;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public interface NodeSelectionAsyncOperations<K, V> extends NodeSelection<K, V>, ExampleForMultipleAsyncExecutions<K, V> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.lambdaworks.redis.cluster;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.CompletionStage;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.reflect.AbstractInvocationHandler;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
class NodeSelectionInvocationHandler extends AbstractInvocationHandler {

private NodeSelection<?, ?> selection;
private Map<Method, Method> nodeSelectionMethods = new WeakHashMap<>();
private Map<Method, Method> connectionMethod = new WeakHashMap<>();

public NodeSelectionInvocationHandler(NodeSelection<?, ?> selection) {
this.selection = selection;
}

@Override
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {

try {
Method targetMethod = findMethod(RedisClusterAsyncConnection.class, method, connectionMethod);
if (targetMethod != null) {
Map<RedisClusterNode, ? extends RedisClusterAsyncConnection<?, ?>> connections = ImmutableMap.copyOf(selection
.asMap());
Map<RedisClusterNode, CompletionStage<?>> executions = Maps.newHashMap();

for (Map.Entry<RedisClusterNode, ? extends RedisClusterAsyncConnection<?, ?>> entry : connections.entrySet()) {

CompletionStage<?> result = (CompletionStage<?>) targetMethod.invoke(entry.getValue(), args);
executions.put(entry.getKey(), result);
}

return new AsyncExecutionsImpl<>((Map) executions);

}

targetMethod = findMethod(NodeSelection.class, method, nodeSelectionMethods);
return targetMethod.invoke(selection, args);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}

private Method findMethod(Class<?> type, Method method, Map<Method, Method> cache) {
if (cache.containsKey(method)) {
return cache.get(method);
}

for (Method typeMethod : type.getMethods()) {
if (!typeMethod.getName().equals(method.getName())
|| !Arrays.equals(typeMethod.getParameterTypes(), method.getParameterTypes())) {
continue;
}

synchronized (cache) {
cache.put(method, typeMethod);
return typeMethod;
}
}

// Null-marker to avoid full class method scans.
cache.put(method, null);
return null;

}

private static class Tuple<K, V> {
public K key;
public V value;

public Tuple(K key, V value) {
this.key = key;
this.value = value;
}
}
}
Loading

0 comments on commit bda69f3

Please sign in to comment.