Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic pipelining for certain cluster commands #66 #73

Merged
merged 2 commits into from
Jun 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/main/java/com/lambdaworks/redis/RedisAsyncConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

package com.lambdaworks.redis;

import static com.lambdaworks.redis.protocol.CommandType.*;
import static com.lambdaworks.redis.protocol.CommandType.EXEC;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand All @@ -14,8 +14,18 @@

import com.lambdaworks.codec.Base16;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.output.*;
import com.lambdaworks.redis.protocol.*;
import com.lambdaworks.redis.output.KeyStreamingChannel;
import com.lambdaworks.redis.output.KeyValueStreamingChannel;
import com.lambdaworks.redis.output.MultiOutput;
import com.lambdaworks.redis.output.ScoredValueStreamingChannel;
import com.lambdaworks.redis.output.ValueStreamingChannel;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandOutput;
import com.lambdaworks.redis.protocol.CommandType;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.protocol.SetArgs;
import io.netty.channel.ChannelHandler;

/**
Expand Down Expand Up @@ -245,6 +255,10 @@ public RedisFuture<Long> del(K... keys) {
return dispatch(commandBuilder.del(keys));
}

public RedisFuture<Long> del(Iterable<K> keys) {
return dispatch(commandBuilder.del(keys));
}

@Override
public RedisFuture<String> discard() {
if (multi != null) {
Expand Down Expand Up @@ -539,6 +553,10 @@ public RedisFuture<List<V>> mget(K... keys) {
return dispatch(commandBuilder.mget(keys));
}

public RedisFuture<List<V>> mget(Iterable<K> keys) {
return dispatch(commandBuilder.mget(keys));
}

@Override
public RedisFuture<Long> mget(ValueStreamingChannel<V> channel, K... keys) {
return dispatch(commandBuilder.mget(channel, keys));
Expand Down Expand Up @@ -1579,8 +1597,7 @@ protected <T> RedisCommand<K, V, T> dispatch(CommandType type, CommandOutput<K,
return dispatch(type, output, null);
}

protected <T> RedisCommand<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output,
CommandArgs<K, V> args) {
protected <T> RedisCommand<K, V, T> dispatch(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
Command<K, V, T> cmd = new Command<K, V, T>(type, output, args, multi != null);
return dispatch(cmd);
}
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/lambdaworks/redis/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public Command<K, V, Long> del(K... keys) {
return createCommand(DEL, new IntegerOutput<K, V>(codec), args);
}

public Command<K, V, Long> del(Iterable<K> keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return createCommand(DEL, new IntegerOutput<K, V>(codec), args);
}

public Command<K, V, String> discard() {
return createCommand(DISCARD, new StatusOutput<K, V>(codec));
}
Expand Down Expand Up @@ -468,6 +473,11 @@ public Command<K, V, List<V>> mget(K... keys) {
return createCommand(MGET, new ValueListOutput<K, V>(codec), args);
}

public Command<K, V, List<V>> mget(Iterable<K> keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return createCommand(MGET, new ValueListOutput<K, V>(codec), args);
}

public Command<K, V, Long> mget(ValueStreamingChannel<V> channel, K... keys) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKeys(keys);
return createCommand(MGET, new ValueStreamingOutput<K, V>(codec, channel), args);
Expand Down Expand Up @@ -524,6 +534,7 @@ public Command<K, V, Boolean> pexpireat(K key, long timestamp) {
public Command<K, V, String> ping() {
return createCommand(PING, new StatusOutput<K, V>(codec));
}

public Command<K, V, String> readOnly() {
return createCommand(READONLY, new StatusOutput<K, V>(codec));
}
Expand All @@ -532,7 +543,6 @@ public Command<K, V, String> readWrite() {
return createCommand(READWRITE, new StatusOutput<K, V>(codec));
}


public Command<K, V, Long> pttl(K key) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key);
return createCommand(PTTL, new IntegerOutput<K, V>(codec), args);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.lambdaworks.redis.cluster;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.lambdaworks.redis.RedisFuture;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class PipelinedRedisFuture<V> extends CompletableFuture<V> implements RedisFuture<V> {

private CountDownLatch latch = new CountDownLatch(1);

public PipelinedRedisFuture(Map<Integer, ? extends RedisFuture<?>> executions,
Function<PipelinedRedisFuture<V>, V> converter) {

CompletableFuture.allOf(executions.values().toArray(new CompletableFuture[executions.size()]))
.thenRun(() -> complete(converter.apply(this))).exceptionally(throwable -> {
completeExceptionally(throwable);
return null;
});
}

@Override
public boolean complete(V value) {
boolean result = super.complete(value);
latch.countDown();
return result;
}

@Override
public boolean completeExceptionally(Throwable ex) {

boolean value = super.completeExceptionally(ex);
latch.countDown();
return value;
}

@Override
public String getError() {
return null;
}

@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.lambdaworks.redis.cluster;

import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisClusterConnection;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

Expand Down Expand Up @@ -63,4 +65,41 @@ default NodeSelectionAsyncOperations<K, V> all() {
* @return the Partitions/Cluster view.
*/
Partitions getPartitions();

/**
* Delete a key with pipelining. Cross-slot keys will result in multiple calls to the particular cluster nodes.
*
* @param keys the key
* @return RedisFuture&lt;Long&gt; integer-reply The number of keys that were removed.
*/
RedisFuture<Long> del(K... keys);

/**
* Get the values of all the given keys with pipelining. Cross-slot keys will result in multiple calls to the particular
* cluster nodes.
*
* @param keys the key
* @return RedisFuture&lt;List&lt;V&gt;&gt; array-reply list of values at the specified keys.
*/
RedisFuture<List<V>> mget(K... keys);

/**
* Set multiple keys to multiple values with pipelining. Cross-slot keys will result in multiple calls to the particular
* cluster nodes.
*
* @param map the null
* @return RedisFuture&lt;String&gt; simple-string-reply always {@code OK} since {@code MSET} can't fail.
*/
RedisFuture<String> mset(Map<K, V> map);

/**
* Set multiple keys to multiple values, only if none of the keys exist with pipelining. Cross-slot keys will result in
* multiple calls to the particular cluster nodes.
*
* @param map the null
* @return RedisFuture&lt;Boolean&gt; integer-reply specifically:
*
* {@code 1} if the all the keys were set. {@code 0} if no key was set (at least one key already existed).
*/
RedisFuture<Boolean> msetnx(Map<K, V> map);
}
Loading