Skip to content

Commit

Permalink
Merge pipelining into 4.0 #92
Browse files Browse the repository at this point in the history
Allow explicit control over flushing when dispatching commands ("pipelining") on the async API
  • Loading branch information
mp911de committed Jul 9, 2015
1 parent c5411d4 commit 017e6ae
Show file tree
Hide file tree
Showing 16 changed files with 352 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1684,4 +1684,14 @@ public void reset() {
public StatefulConnection<K, V> getConnection() {
return connection;
}

@Override
public void setAutoFlushCommands(boolean autoFlush) {
connection.setAutoFlushCommands(autoFlush);
}

@Override
public void flushCommands() {
connection.flushCommands();
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/lambdaworks/redis/BaseRedisAsyncConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,19 @@ public interface BaseRedisAsyncConnection<K, V> extends Closeable, BaseRedisAsyn
*/
boolean isOpen();

/**
* Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands
* can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is
* issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis.
*
* @param autoFlush state of autoFlush.
*/
void setAutoFlushCommands(boolean autoFlush);

/**
* Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to
* achieve batching. No-op if channel is not connected.
*/
void flushCommands();

}
8 changes: 8 additions & 0 deletions src/main/java/com/lambdaworks/redis/RedisChannelHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,12 @@ protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
FutureSyncInvocationHandler<K, V> h = new FutureSyncInvocationHandler<>((StatefulConnection) this, asyncApi);
return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
}

public void setAutoFlushCommands(boolean autoFlush) {
getChannelWriter().setAutoFlushCommands(autoFlush);
}

public void flushCommands() {
getChannelWriter().flushCommands();
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/lambdaworks/redis/RedisChannelWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,19 @@ public interface RedisChannelWriter<K, V> extends Closeable {
* @param redisChannelHandler the channel handler (external connection object)
*/
void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler);

/**
* Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands
* can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is
* issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis.
*
* @param autoFlush state of autoFlush.
*/
void setAutoFlushCommands(boolean autoFlush);

/**
* Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to
* achieve batching. No-op if channel is not connected.
*/
void flushCommands();
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void addListener(CloseEvents.CloseListener listener) {
* @since 3.0
*/
static class PooledConnectionInvocationHandler<T> extends AbstractInvocationHandler {
public static final Set<String> DISABLED_METHODS = ImmutableSet.of("auth", "select", "quit", "getStatefulConnection");
public static final Set<String> DISABLED_METHODS = ImmutableSet.of("auth", "select", "quit", "getStatefulConnection", "setAutoFlushCommands");

private T connection;
private final RedisConnectionPool<T> pool;
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/lambdaworks/redis/api/StatefulConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,19 @@ public interface StatefulConnection<K, V> extends AutoCloseable {
* internal state machine gets out of sync with the connection.
*/
void reset();

/**
* Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands
* can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is
* issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis.
*
* @param autoFlush state of autoFlush.
*/
void setAutoFlushCommands(boolean autoFlush);

/**
* Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to
* achieve batching. No-op if channel is not connected.
*/
void flushCommands();
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,21 @@ interface ClusterConnectionProvider extends Closeable {
*/
void setPartitions(Partitions partitions);

/**
* Disable or enable auto-flush behavior. Default is {@literal true}. If autoFlushCommands is disabled, multiple commands
* can be issued without writing them actually to the transport. Commands are buffered until a {@link #flushCommands()} is
* issued. After calling {@link #flushCommands()} commands are sent to the transport and executed by Redis.
*
* @param autoFlush state of autoFlush.
*/
void setAutoFlushCommands(boolean autoFlush);

/**
* Flush pending commands. This commands forces a flush on the channel and can be used to buffer ("pipeline") commands to
* achieve batching. No-op if channel is not connected.
*/
void flushCommands();

enum Intent {
READ, WRITE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler
defaultWriter.setRedisChannelHandler(redisChannelHandler);
}

@Override
public void setAutoFlushCommands(boolean autoFlush) {
getClusterConnectionProvider().setAutoFlushCommands(autoFlush);
}

@Override
public void flushCommands() {
getClusterConnectionProvider().flushCommands();
}

public ClusterConnectionProvider getClusterConnectionProvider() {
return clusterConnectionProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.lambdaworks.redis.LettuceStrings;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.codec.RedisCodec;
Expand All @@ -37,6 +36,9 @@ class PooledClusterConnectionProvider<K, V> implements ClusterConnectionProvider
private final StatefulRedisConnection<K, V> writers[] = new StatefulRedisConnection[SlotHash.SLOT_COUNT];
private Partitions partitions;

private boolean autoFlushCommands = true;
private Object stateLock = new Object();

public PooledClusterConnectionProvider(final RedisClusterClient redisClusterClient, final RedisCodec<K, V> redisCodec) {
this.debugEnabled = logger.isDebugEnabled();
this.connections = CacheBuilder.newBuilder().build(new CacheLoader<PoolKey, StatefulRedisConnection<K, V>>() {
Expand All @@ -48,6 +50,10 @@ public StatefulRedisConnection<K, V> load(PoolKey key) throws Exception {
connection.sync().readOnly();
}

synchronized (stateLock) {
connection.setAutoFlushCommands(autoFlushCommands);
}

return connection;
}
});
Expand Down Expand Up @@ -186,7 +192,30 @@ public void setPartitions(Partitions partitions) {
resetPartitions();
}

@Override
public void setAutoFlushCommands(boolean autoFlush) {
synchronized (stateLock) {
this.autoFlushCommands = autoFlush;
}

for (StatefulRedisConnection<K, V> connection : connections.asMap().values()) {
connection.setAutoFlushCommands(autoFlush);
}
}

@Override
public void flushCommands() {

for (StatefulRedisConnection<K, V> connection : connections.asMap().values()) {
connection.flushCommands();
}

}

protected void resetPartitions() {
Arrays.fill(writers, null);

synchronized (stateLock) {
Arrays.fill(writers, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ public RedisFuture<String> mset(Map<K, V> map) {

return null;
});

}

@Override
Expand Down Expand Up @@ -201,7 +200,6 @@ private <T> T execute(Callable<T> function) {
} catch (Exception e) {
throw new RedisException(e);
}

}

@Override
Expand Down Expand Up @@ -253,5 +251,4 @@ protected AsyncNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate,
return (AsyncNodeSelection<K, V>) Proxy.newProxyInstance(NodeSelection.class.getClassLoader(), new Class<?>[] {
NodeSelectionAsyncCommands.class, AsyncNodeSelection.class }, h);
}

}
36 changes: 32 additions & 4 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;
import com.lambdaworks.redis.*;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisException;

import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogger;
Expand Down Expand Up @@ -58,6 +68,7 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisC
private RedisChannelHandler<K, V> redisChannelHandler;
private Throwable connectionError;
private String logPrefix;
private boolean autoFlushCommands = true;

/**
* Initialize a new instance that handles commands from the supplied queue.
Expand All @@ -80,7 +91,7 @@ public CommandHandler(ClientOptions clientOptions, Queue<RedisCommand<K, V, ?>>
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
setState(LifecycleState.REGISTERED);
buffer = ctx.alloc().heapBuffer();
buffer = ctx.alloc().directBuffer(8192 * 8);
rsm = new RedisStateMachine<K, V>();
synchronized (stateLock) {
channel = ctx.channel();
Expand Down Expand Up @@ -175,14 +186,17 @@ public <T, C extends RedisCommand<K, V, T>> C write(C command) {
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channel.write(command).addListener(new AtMostOnceWriteListener(command, queue));
channel.flush();
}

if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.write(command).addListener(WRITE_LOG_LISTENER);
}

if (autoFlushCommands) {
channel.flush();
}

} else {

if (commandBuffer.contains(command) || queue.contains(command)) {
Expand Down Expand Up @@ -217,6 +231,13 @@ private boolean isConnected() {
&& lifecycleState.ordinal() <= LifecycleState.DISCONNECTED.ordinal();
}

@Override
public void flushCommands() {
if (channel != null && isConnected() && channel.isActive()) {
channel.flush();
}
}

/**
*
* @see io.netty.channel.ChannelDuplexHandler#write(io.netty.channel.ChannelHandlerContext, java.lang.Object,
Expand Down Expand Up @@ -482,6 +503,13 @@ public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler
this.redisChannelHandler = redisChannelHandler;
}

@Override
public void setAutoFlushCommands(boolean autoFlush) {
synchronized (stateLock) {
this.autoFlushCommands = autoFlush;
}
}

private String logPrefix() {
if (logPrefix != null) {
return logPrefix;
Expand Down
86 changes: 86 additions & 0 deletions src/test/java/com/lambdaworks/redis/PipeliningTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.lambdaworks.redis;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import com.google.common.collect.Lists;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class PipeliningTest extends AbstractRedisClientTest {

@Test
public void basic() throws Exception {

RedisAsyncConnection<String, String> connection = client.connectAsync();
connection.setAutoFlushCommands(false);

int iterations = 1000;
List<RedisFuture<?>> futures = triggerSet(connection, iterations);

verifyNotExecuted(iterations);

connection.flushCommands();

LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[futures.size()]));

verifyExecuted(iterations);

connection.close();
}

protected void verifyExecuted(int iterations) {
for (int i = 0; i < iterations; i++) {
assertThat(redis.get(key(i))).as("Key " + key(i) + " must be " + value(i)).isEqualTo(value(i));
}
}

@Test
public void setAutoFlushTrueDoesNotFlush() throws Exception {

RedisAsyncConnection<String, String> connection = client.connectAsync();
connection.setAutoFlushCommands(false);

int iterations = 1000;
List<RedisFuture<?>> futures = triggerSet(connection, iterations);

verifyNotExecuted(iterations);

connection.setAutoFlushCommands(true);

verifyNotExecuted(iterations);

connection.flushCommands();
boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS, futures.toArray(new RedisFuture[futures.size()]));
assertThat(result).isTrue();

connection.close();
}

protected void verifyNotExecuted(int iterations) {
for (int i = 0; i < iterations; i++) {
assertThat(redis.get(key(i))).as("Key " + key(i) + " must be null").isNull();
}
}

protected List<RedisFuture<?>> triggerSet(RedisAsyncConnection<String, String> connection, int iterations) {
List<RedisFuture<?>> futures = Lists.newArrayList();
for (int i = 0; i < iterations; i++) {
futures.add(connection.set(key(i), value(i)));
}
return futures;
}

protected String value(int i) {
return value + "-" + i;
}

protected String key(int i) {
return key + "-" + i;
}
}
Loading

0 comments on commit 017e6ae

Please sign in to comment.