Skip to content

Commit

Permalink
Use commandBuffer for pipelining instead of channel write #92
Browse files Browse the repository at this point in the history
Use the commandBuffer when autoFlushCommands is disabled instead of writing commands to a channel and write the whole buffer when flushing. This change slightly improves the throughput of lettuce.

Motivation: netty maintains a promise for every written command and handles buffering on its own. Writing commands one by one but delaying the flush has less flavor of batching than buffering commands and writing them as batch.
  • Loading branch information
mp911de committed Aug 2, 2015
1 parent bb191e1 commit 8090a14
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 63 deletions.
26 changes: 21 additions & 5 deletions src/main/java/com/lambdaworks/redis/protocol/CommandEncoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.lambdaworks.redis.protocol;

import java.nio.charset.Charset;
import java.util.Collection;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand All @@ -18,7 +19,7 @@
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
@ChannelHandler.Sharable
public class CommandEncoder extends MessageToByteEncoder<RedisCommand<?, ?, ?>> {
public class CommandEncoder extends MessageToByteEncoder<Object> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandEncoder.class);

Expand All @@ -43,12 +44,28 @@ public CommandEncoder(boolean preferDirect) {
}

@Override
protected void encode(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {

msg.encode(out);
if (msg instanceof RedisCommand) {
RedisCommand<?, ?, ?> command = (RedisCommand<?, ?, ?>) msg;
encode(ctx, out, command);
}

if (msg instanceof Collection) {
Collection<RedisCommand<?, ?, ?>> commands = (Collection<RedisCommand<?, ?, ?>>) msg;
for (RedisCommand<?, ?, ?> command : commands) {
if (command.isCancelled()) {
continue;
}
encode(ctx, out, command);
}
}
}

private void encode(ChannelHandlerContext ctx, ByteBuf out, RedisCommand<?, ?, ?> command) {
command.encode(out);
if (debugEnabled) {
logger.debug("{} writing command {}", logPrefix(ctx.channel()), msg);
logger.debug("{} writing command {}", logPrefix(ctx.channel()), command);
if (traceEnabled) {
logger.trace("{} Sent: {}", logPrefix(ctx.channel()), out.toString(Charset.defaultCharset()).trim());
}
Expand All @@ -60,5 +77,4 @@ private String logPrefix(Channel channel) {
buffer.append('[').append(ChannelLogDescriptor.logDescriptor(channel)).append(']');
return buffer.toString();
}

}
150 changes: 96 additions & 54 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.collect.ImmutableList;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
Expand Down Expand Up @@ -44,7 +46,10 @@ public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisC

protected ClientOptions clientOptions;
protected Queue<RedisCommand<K, V, ?>> queue;
protected Queue<RedisCommand<K, V, ?>> commandBuffer = new ArrayDeque<RedisCommand<K, V, ?>>();

// all access to the commandBuffer is synchronized
protected Queue<RedisCommand<K, V, ?>> commandBuffer = newCommandBuffer();

protected ByteBuf buffer;
protected RedisStateMachine<K, V> rsm;
protected Channel channel;
Expand Down Expand Up @@ -177,43 +182,42 @@ public <T, C extends RedisCommand<K, V, T>> C write(C command) {

writeLock.lock();
Channel channel = this.channel;
if (channel != null && isConnected() && channel.isActive()) {
if (debugEnabled) {
logger.debug("{} write() writeAndFlush Command {}", logPrefix(), command);
}
if (autoFlushCommands) {

if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channel.write(command).addListener(new AtMostOnceWriteListener(command, queue));
}
if (channel != null && isConnected() && channel.isActive()) {
if (debugEnabled) {
logger.debug("{} write() writeAndFlush Command {}", logPrefix(), command);
}

if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.write(command).addListener(WRITE_LOG_LISTENER);
}
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channel.writeAndFlush(command).addListener(new AtMostOnceWriteListener(command, queue));
}

if (autoFlushCommands) {
channel.flush();
}
if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.writeAndFlush(command).addListener(WRITE_LOG_LISTENER);
}
} else {

} else {
if (commandBuffer.contains(command) || queue.contains(command)) {
return command;
}

if (commandBuffer.contains(command) || queue.contains(command)) {
return command;
}
if (connectionError != null) {
if (debugEnabled) {
logger.debug("{} write() completing Command {} due to connection error", logPrefix(), command);
}
command.completeExceptionally(connectionError);

if (connectionError != null) {
if (debugEnabled) {
logger.debug("{} write() completing Command {} due to connection error", logPrefix(), command);
return command;
}
command.completeExceptionally(connectionError);
bufferCommand(command);
return command;
}

if (debugEnabled) {
logger.debug("{} write() buffering Command {}", logPrefix(), command);
}
commandBuffer.add(command);
} else {
bufferCommand(command);
}
} finally {
writeLock.unlock();
Expand All @@ -225,6 +229,13 @@ public <T, C extends RedisCommand<K, V, T>> C write(C command) {
return command;
}

private <T> void bufferCommand(RedisCommand<K, V, T> command) {
if (debugEnabled) {
logger.debug("{} write() buffering Command {}", logPrefix(), command);
}
commandBuffer.add(command);
}

private boolean isConnected() {
synchronized (lifecycleState) {
return lifecycleState.ordinal() >= LifecycleState.CONNECTED.ordinal()
Expand All @@ -233,9 +244,27 @@ private boolean isConnected() {
}

@Override
@SuppressWarnings("rawtypes")
public void flushCommands() {
if (channel != null && isConnected() && channel.isActive()) {
channel.flush();
if (channel != null && isConnected()) {
Queue<RedisCommand<?, ?, ?>> queuedCommands;
try {
writeLock.lock();
queuedCommands = (Queue) commandBuffer;
commandBuffer = newCommandBuffer();
} finally {
writeLock.unlock();
}

if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channel.writeAndFlush(queuedCommands).addListener(new AtMostOnceWriteListener(queuedCommands, this.queue));
}

if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channel.writeAndFlush(queuedCommands).addListener(WRITE_LOG_LISTENER);
}
}
}

Expand All @@ -248,7 +277,24 @@ public void flushCommands() {
@SuppressWarnings("unchecked")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

final RedisCommand<K, V, ?> cmd = (RedisCommand<K, V, ?>) msg;
if (msg instanceof Collection) {
Collection<RedisCommand<K, V, ?>> commands = (Collection<RedisCommand<K, V, ?>>) msg;
for (RedisCommand<K, V, ?> command : commands) {
queueCommand(promise, command);
}
ctx.write(commands, promise);
return;
}

RedisCommand<K, V, ?> cmd = (RedisCommand<K, V, ?>) msg;
queueCommand(promise, cmd);
ctx.write(cmd, promise);
}

private void queueCommand(ChannelPromise promise, RedisCommand<K, V, ?> cmd) throws Exception {
if (cmd.isCancelled()) {
return;
}

try {
if (cmd.getOutput() == null) {
Expand All @@ -261,8 +307,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
promise.setFailure(e);
throw e;
}

ctx.write(cmd, promise);
}

@Override
Expand Down Expand Up @@ -299,7 +343,7 @@ public void run() {
}

protected void executeQueuedCommands(ChannelHandlerContext ctx) {
List<RedisCommand<K, V, ?>> tmp = new ArrayList<>(queue.size() + commandBuffer.size());
Queue<RedisCommand<K, V, ?>> tmp = newCommandBuffer();

try {
writeLock.lock();
Expand All @@ -309,7 +353,7 @@ protected void executeQueuedCommands(ChannelHandlerContext ctx) {
tmp.addAll(queue);

queue.clear();
commandBuffer.clear();
commandBuffer = tmp;

if (debugEnabled) {
logger.debug("{} executeQueuedCommands {} command(s) queued", logPrefix(), tmp.size());
Expand All @@ -328,23 +372,10 @@ protected void executeQueuedCommands(ChannelHandlerContext ctx) {
}
setStateIfNotClosed(LifecycleState.ACTIVE);

for (RedisCommand<K, V, ?> cmd : tmp) {
if (!cmd.isCancelled()) {

if (debugEnabled) {
logger.debug("{} channelActive() triggering command {}", logPrefix(), cmd);
}

write(cmd);
}
}

tmp.clear();

flushCommands();
} finally {
writeLock.unlock();
}

}

/**
Expand Down Expand Up @@ -525,6 +556,10 @@ protected String logPrefix() {
return logPrefix = buffer.toString();
}

private ArrayDeque<RedisCommand<K, V, ?>> newCommandBuffer() {
return new ArrayDeque<RedisCommand<K, V, ?>>(512);
}

public enum LifecycleState {
NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED,
}
Expand All @@ -535,20 +570,28 @@ private enum Reliability {

private static class AtMostOnceWriteListener implements ChannelFutureListener {

private final RedisCommand<?, ?, ?> sentCommand;
private final Collection<RedisCommand<?, ?, ?>> sentCommands;
private final Queue<?> queue;

@SuppressWarnings("rawtypes")
public AtMostOnceWriteListener(RedisCommand<?, ?, ?> sentCommand, Queue<?> queue) {
this.sentCommand = sentCommand;
this.sentCommands = (Collection) ImmutableList.of(sentCommand);
this.queue = queue;
}

public AtMostOnceWriteListener(Collection<RedisCommand<?, ?, ?>> sentCommand, Queue<?> queue) {
this.sentCommands = sentCommand;
this.queue = queue;
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.await();
if (future.cause() != null) {
sentCommand.completeExceptionally(future.cause());
queue.remove(sentCommand);
for (RedisCommand<?, ?, ?> sentCommand : sentCommands) {
sentCommand.completeExceptionally(future.cause());
}
queue.removeAll(sentCommands);
}
}
}
Expand All @@ -558,7 +601,6 @@ public void operationComplete(ChannelFuture future) throws Exception {
*
*/
static class WriteLogListener implements GenericFutureListener<Future<Void>> {

@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess() && !(future.cause() instanceof ClosedChannelException))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
import java.util.Queue;
import java.util.concurrent.Future;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -34,11 +38,11 @@
@RunWith(MockitoJUnitRunner.class)
public class CommandHandlerTest {

private Queue<RedisCommand<String, String, ?>> q = new ArrayDeque<RedisCommand<String, String, ?>>(10);
private Queue<RedisCommand<String, String, ?>> q = new ArrayDeque<>(10);

private CommandHandler<String, String> sut = new CommandHandler<String, String>(new ClientOptions.Builder().build(), q);
private CommandHandler<String, String> sut = new CommandHandler<>(new ClientOptions.Builder().build(), q);

private Command<String, String, String> command = new Command<String, String, String>(CommandType.APPEND,
private Command<String, String, String> command = new Command<>(CommandType.APPEND,
new StatusOutput<String, String>(new Utf8StringCodec()), null);

@Mock
Expand All @@ -47,6 +51,9 @@ public class CommandHandlerTest {
@Mock
private Channel channel;

@Mock
private ByteBufAllocator byteBufAllocator;

@Mock
private ChannelPipeline pipeline;

Expand All @@ -56,6 +63,7 @@ public class CommandHandlerTest {
@Before
public void before() throws Exception {
when(context.channel()).thenReturn(channel);
when(context.alloc()).thenReturn(byteBufAllocator);
when(channel.pipeline()).thenReturn(pipeline);
when(channel.eventLoop()).thenReturn(eventLoop);
when(eventLoop.submit(any(Runnable.class))).thenAnswer(new Answer<Future>() {
Expand All @@ -66,11 +74,15 @@ public Future answer(InvocationOnMock invocation) throws Throwable {
return null;
}
});

when(channel.write(any())).thenAnswer(invocation -> new DefaultChannelPromise(channel));

when(channel.writeAndFlush(any())).thenAnswer(invocation -> new DefaultChannelPromise(channel));
}

@Test
public void testChannelActive() throws Exception {
sut.setState(CommandHandler.LifecycleState.REGISTERED);
sut.channelRegistered(context);

sut.channelActive(context);

Expand Down

0 comments on commit 8090a14

Please sign in to comment.