Skip to content

Commit

Permalink
Use commandBuffer for pipelining instead of channel write #92
Browse files Browse the repository at this point in the history
Use the commandBuffer when autoFlushCommands is disabled instead of writing commands to a channel and write the whole buffer when flushing. This change slightly improves the throughput of lettuce.

Motivation: netty maintains a promise for every written command and handles buffering on its own. Writing commands one by one but delaying the flush has less flavor of batching than buffering commands and writing them as batch.
  • Loading branch information
mp911de committed Aug 1, 2015
1 parent 80905a0 commit 4187fd9
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 69 deletions.
26 changes: 21 additions & 5 deletions src/main/java/com/lambdaworks/redis/protocol/CommandEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.lambdaworks.redis.protocol;

import java.nio.charset.Charset;
import java.util.Collection;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand All @@ -18,7 +19,7 @@
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
@ChannelHandler.Sharable
public class CommandEncoder extends MessageToByteEncoder<RedisCommand<?, ?, ?>> {
public class CommandEncoder extends MessageToByteEncoder<Object> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandEncoder.class);

Expand All @@ -43,12 +44,28 @@ public CommandEncoder(boolean preferDirect) {
}

@Override
protected void encode(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {

msg.encode(out);
if (msg instanceof RedisCommand) {
RedisCommand<?, ?, ?> command = (RedisCommand<?, ?, ?>) msg;
encode(ctx, out, command);
}

if (msg instanceof Collection) {
Collection<RedisCommand<?, ?, ?>> commands = (Collection<RedisCommand<?, ?, ?>>) msg;
for (RedisCommand<?, ?, ?> command : commands) {
if (command.isCancelled()) {
continue;
}
encode(ctx, out, command);
}
}
}

private void encode(ChannelHandlerContext ctx, ByteBuf out, RedisCommand<?, ?, ?> command) {
command.encode(out);
if (debugEnabled) {
logger.debug("{} writing command {}", logPrefix(ctx.channel()), msg);
logger.debug("{} writing command {}", logPrefix(ctx.channel()), command);
if (traceEnabled) {
logger.trace("{} Sent: {}", logPrefix(ctx.channel()), out.toString(Charset.defaultCharset()).trim());
}
Expand All @@ -60,5 +77,4 @@ private String logPrefix(Channel channel) {
buffer.append('[').append(ChannelLogDescriptor.logDescriptor(channel)).append(']');
return buffer.toString();
}

}
162 changes: 101 additions & 61 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
Expand Down Expand Up @@ -45,7 +46,10 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisC

protected ClientOptions clientOptions;
protected Queue<RedisCommand<K, V, ?>> queue;
protected Queue<RedisCommand<K, V, ?>> commandBuffer = new ArrayDeque<RedisCommand<K, V, ?>>();

// all access to the commandBuffer is synchronized
protected Queue<RedisCommand<K, V, ?>> commandBuffer = newCommandBuffer();

protected ByteBuf buffer;
protected RedisStateMachine<K, V> rsm;
protected Channel channel;
Expand Down Expand Up @@ -179,44 +183,41 @@ public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

writeLock.lock();
Channel channel = this.channel;
if (channel != null && isConnected() && channel.isActive()) {
if (debugEnabled) {
logger.debug("{} write() writeAndFlush Command {}", logPrefix(), command);
}
if (autoFlushCommands) {

if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channel.write(command).addListener(new AtMostOnceWriteListener(command, queue));
}

if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.write(command).addListener(WRITE_LOG_LISTENER);
}
if (channel != null && isConnected() && channel.isActive()) {
if (debugEnabled) {
logger.debug("{} write() writeAndFlush Command {}", logPrefix(), command);
}

if (autoFlushCommands) {
channel.flush();
}
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channel.writeAndFlush(command).addListener(new AtMostOnceWriteListener(command, queue));
}

} else {
if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.writeAndFlush(command).addListener(WRITE_LOG_LISTENER);
}
} else {

if (commandBuffer.contains(command) || queue.contains(command)) {
return command;
}
if (commandBuffer.contains(command) || queue.contains(command)) {
return command;
}

if (connectionError != null) {
if (debugEnabled) {
logger.debug("{} write() completing Command {} due to connection error", logPrefix(), command);
if (connectionError != null) {
if (debugEnabled) {
logger.debug("{} write() completing Command {} due to connection error", logPrefix(), command);
}
command.setException(connectionError);
command.complete();
return command;
}
command.setException(connectionError);
command.complete();
return command;
}

if (debugEnabled) {
logger.debug("{} write() buffering Command {}", logPrefix(), command);
bufferCommand(command);
}
commandBuffer.add(command);
} else {
bufferCommand(command);
}
} finally {
writeLock.unlock();
Expand All @@ -228,17 +229,42 @@ public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
return command;
}

private <T> void bufferCommand(RedisCommand<K, V, T> command) {
if (debugEnabled) {
logger.debug("{} write() buffering Command {}", logPrefix(), command);
}
commandBuffer.add(command);
}

private boolean isConnected() {
synchronized (lifecycleState) {
return lifecycleState.ordinal() >= LifecycleState.CONNECTED.ordinal()
&& lifecycleState.ordinal() <= LifecycleState.DISCONNECTED.ordinal();
return lifecycleState.ordinal() >= LifecycleState.CONNECTED.ordinal()
&& lifecycleState.ordinal() <= LifecycleState.DISCONNECTED.ordinal();
}
}

@Override
@SuppressWarnings("rawtypes")
public void flushCommands() {
if (channel != null && isConnected() && channel.isActive()) {
channel.flush();
if (channel != null && isConnected()) {
Queue<RedisCommand<?, ?, ?>> queuedCommands;
try {
writeLock.lock();
queuedCommands = (Queue) commandBuffer;
commandBuffer = newCommandBuffer();
} finally {
writeLock.unlock();
}

if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channel.writeAndFlush(queuedCommands).addListener(new AtMostOnceWriteListener(queuedCommands, this.queue));
}

if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.writeAndFlush(queuedCommands).addListener(WRITE_LOG_LISTENER);
}
}
}

Expand All @@ -251,7 +277,25 @@ public void flushCommands() {
@SuppressWarnings("unchecked")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

final RedisCommand<K, V, ?> cmd = (RedisCommand<K, V, ?>) msg;
if (msg instanceof Collection) {
Collection<RedisCommand<K, V, ?>> commands = (Collection<RedisCommand<K, V, ?>>) msg;
for (RedisCommand<K, V, ?> command : commands) {
queueCommand(promise, command);
}
ctx.write(commands, promise);
return;
}

RedisCommand<K, V, ?> cmd = (RedisCommand<K, V, ?>) msg;
queueCommand(promise, cmd);
ctx.write(cmd, promise);
}

private void queueCommand(ChannelPromise promise, RedisCommand<K, V, ?> cmd) throws Exception {
if (cmd.isCancelled()) {
System.out.println("cancelled");
return;
}

try {
if (cmd.getOutput() == null) {
Expand All @@ -265,8 +309,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
promise.setFailure(e);
throw e;
}

ctx.write(cmd, promise);
}

@Override
Expand Down Expand Up @@ -303,7 +345,7 @@ public void run() {
}

protected void executeQueuedCommands(ChannelHandlerContext ctx) {
List<RedisCommand<K, V, ?>> tmp = new ArrayList<RedisCommand<K, V, ?>>(queue.size() + commandBuffer.size());
Queue<RedisCommand<K, V, ?>> tmp = newCommandBuffer();

try {
writeLock.lock();
Expand All @@ -313,7 +355,7 @@ protected void executeQueuedCommands(ChannelHandlerContext ctx) {
tmp.addAll(queue);

queue.clear();
commandBuffer.clear();
commandBuffer = tmp;

if (debugEnabled) {
logger.debug("{} executeQueuedCommands {} command(s) queued", logPrefix(), tmp.size());
Expand All @@ -332,23 +374,10 @@ protected void executeQueuedCommands(ChannelHandlerContext ctx) {
}
setStateIfNotClosed(LifecycleState.ACTIVE);

for (RedisCommand<K, V, ?> cmd : tmp) {
if (!cmd.isCancelled()) {

if (debugEnabled) {
logger.debug("{} channelActive() triggering command {}", logPrefix(), cmd);
}

write(cmd);
}
}

tmp.clear();

flushCommands();
} finally {
writeLock.unlock();
}

}

/**
Expand Down Expand Up @@ -530,6 +559,10 @@ protected String logPrefix() {
return logPrefix = buffer.toString();
}

private ArrayDeque<RedisCommand<K, V, ?>> newCommandBuffer() {
return new ArrayDeque<RedisCommand<K, V, ?>>(512);
}

public enum LifecycleState {
NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED,
}
Expand All @@ -540,21 +573,29 @@ private enum Reliability {

private static class AtMostOnceWriteListener implements ChannelFutureListener {

private final RedisCommand<?, ?, ?> sentCommand;
private final Collection<RedisCommand<?, ?, ?>> sentCommands;
private final Queue<?> queue;

@SuppressWarnings("rawtypes")
public AtMostOnceWriteListener(RedisCommand<?, ?, ?> sentCommand, Queue<?> queue) {
this.sentCommand = sentCommand;
this.sentCommands = (Collection) ImmutableList.of(sentCommand);
this.queue = queue;
}

public AtMostOnceWriteListener(Collection<RedisCommand<?, ?, ?>> sentCommand, Queue<?> queue) {
this.sentCommands = sentCommand;
this.queue = queue;
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.await();
if (future.cause() != null) {
sentCommand.setException(future.cause());
sentCommand.cancel(true);
queue.remove(sentCommand);
for (RedisCommand<?, ?, ?> sentCommand : sentCommands) {
sentCommand.setException(future.cause());
sentCommand.cancel(true);
}
queue.removeAll(sentCommands);
}
}
}
Expand All @@ -564,7 +605,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
*
*/
static class WriteLogListener implements GenericFutureListener<Future<Void>> {

@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess() && !(future.cause() instanceof ClosedChannelException))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,30 @@ protected void shiftAllSlotsToNode1() throws InterruptedException, TimeoutExcept

waitForSlots(redis2, 0);

final RedisClusterNode redis2Partition = getOwnPartition(redis2);
WaitFor.waitOrTimeout(new Condition() {
@Override
public boolean isSatisfied() {
Partitions partitions = ClusterPartitionParser.parse(redis1.clusterNodes());
RedisClusterNode partition = partitions.getPartitionByNodeId(redis2Partition.getNodeId());

if (!partition.getSlots().isEmpty()) {
removeRemaining(partition);
}

return partition.getSlots().size() == 0;
}

private void removeRemaining(RedisClusterNode partition) {
try {
int[] ints = toIntArray(partition.getSlots());
redis1.clusterDelSlots(ints);
} catch (Exception e) {

}
}
}, timeout(seconds(10)));

redis1.clusterAddSlots(RedisClusterClientTest.createSlots(12000, 16384));
waitForSlots(redis1, 16384);

Expand All @@ -377,6 +401,14 @@ public boolean isSatisfied() {
}, timeout(seconds(6)));
}

private int[] toIntArray(List<Integer> source) {
int[] result = new int[source.size()];
for (int i = 0; i < source.size(); i++) {
result[i] = source.get(i);
}
return result;
}

@Test
public void expireStaleNodeIdConnections() throws Exception {

Expand Down Expand Up @@ -422,8 +454,8 @@ public boolean isSatisfied() {
@Test
public void doNotExpireStaleNodeIdConnections() throws Exception {

clusterClient.setOptions(new ClusterClientOptions.Builder().refreshClusterView(true).closeStaleConnections(false).refreshPeriod(1, TimeUnit.SECONDS)
.build());
clusterClient.setOptions(new ClusterClientOptions.Builder().refreshClusterView(true).closeStaleConnections(false)
.refreshPeriod(1, TimeUnit.SECONDS).build());
RedisAdvancedClusterAsyncConnection<String, String> clusterConnection = clusterClient.connectClusterAsync();

setup2Masters();
Expand Down
Loading

0 comments on commit 4187fd9

Please sign in to comment.