Skip to content

Commit

Permalink
Move command write deduplication to batch write #709
Browse files Browse the repository at this point in the history
Lettuce now checks for command stack duplicates in CommandHandler.writeBatch(…) by using LinkedHashSet. Duplicates occur usually in batch submissions and the check in the single command path causes additional cost that isn't necessary in the majority of cases.

Using LinkedHashSet as intermediate collection reduces contains cost from O(N^2) to constant time.
  • Loading branch information
mp911de committed Feb 25, 2018
1 parent 08faf45 commit 9354727
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 46 deletions.
61 changes: 28 additions & 33 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,25 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
return;
}

if (msg instanceof List) {

List<RedisCommand<?, ?, ?>> batch = (List<RedisCommand<?, ?, ?>>) msg;

if (batch.size() == 1) {

writeSingleCommand(ctx, batch.get(0), promise);
return;
}

writeBatch(ctx, batch, promise);
return;
}

if (msg instanceof Collection) {
writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);

Collection<RedisCommand<?, ?, ?>> batch = (Collection<RedisCommand<?, ?, ?>>) msg;

writeBatch(ctx, batch, promise);
}
}

Expand All @@ -353,52 +370,35 @@ private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?>
private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?, ?>> batch, ChannelPromise promise)
throws Exception {

Collection<RedisCommand<?, ?, ?>> toWrite = batch;
int commandsToWrite = 0;
Collection<RedisCommand<?, ?, ?>> deduplicated = new LinkedHashSet<>(batch.size(), 1);

boolean cancelledCommands = false;
for (RedisCommand<?, ?, ?> command : batch) {

if (!isWriteable(command)) {
cancelledCommands = true;
break;
if (isWriteable(command) && !deduplicated.add(command)) {
deduplicated.remove(command);
command.completeExceptionally(new RedisException(
"Attempting to write duplicate command that is already enqueued: " + command));
}

commandsToWrite++;
}

try {
validateWrite(commandsToWrite);
validateWrite(deduplicated.size());
} catch (Exception e) {

for (RedisCommand<?, ?, ?> redisCommand : toWrite) {
for (RedisCommand<?, ?, ?> redisCommand : deduplicated) {
redisCommand.completeExceptionally(e);
}

promise.setFailure(e);
return;
}

if (cancelledCommands) {

toWrite = new ArrayList<>(batch.size());

for (RedisCommand<?, ?, ?> command : batch) {

if (!isWriteable(command)) {
continue;
}

toWrite.add(command);
}
}

for (RedisCommand<?, ?, ?> command : toWrite) {
for (RedisCommand<?, ?, ?> command : deduplicated) {
addToStack(command, promise);
}

if (!toWrite.isEmpty()) {
ctx.write(toWrite, promise);
if (!deduplicated.isEmpty()) {
ctx.write(deduplicated, promise);
}
}

Expand All @@ -411,11 +411,6 @@ private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) t
if (command.getOutput() == null) {
// fire&forget commands are excluded from metrics
command.complete();
} else {

if (stack.contains(command)) {
throw new RedisException("Attempting to write duplicate command that is already enqueued: " + command);
}
}

RedisCommand<?, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);
Expand Down
45 changes: 34 additions & 11 deletions src/test/java/io/lettuce/core/protocol/CommandHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.*;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
Expand All @@ -43,7 +40,8 @@

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.codec.Utf8StringCodec;
import io.lettuce.core.RedisException;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.metrics.DefaultCommandLatencyCollector;
import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions;
import io.lettuce.core.output.StatusOutput;
Expand All @@ -61,7 +59,7 @@ public class CommandHandlerTest {
private CommandHandler sut;

private final Command<String, String, String> command = new Command<>(CommandType.APPEND, new StatusOutput<>(
new Utf8StringCodec()), null);
StringCodec.UTF8), null);

@Mock
private ChannelHandlerContext context;
Expand Down Expand Up @@ -293,6 +291,18 @@ public void shouldCancelCommandOnQueueBatchFailure() throws Exception {
verify(commandMock).completeExceptionally(exception);
}

@Test
public void shouldFailOnDuplicateCommands() throws Exception {

Command<String, String, String> commandMock = mock(Command.class);

ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
sut.write(context, Arrays.asList(commandMock, commandMock), channelPromise);

assertThat(stack).isEmpty();
verify(commandMock).completeExceptionally(any(RedisException.class));
}

@Test
public void shouldWriteActiveCommands() throws Exception {

Expand All @@ -315,28 +325,41 @@ public void shouldNotWriteCancelledCommandBatch() throws Exception {
}

@Test
public void shouldWriteActiveCommandsInBatch() throws Exception {
public void shouldWriteSingleActiveCommandsInBatch() throws Exception {

List<Command<String, String, String>> commands = Arrays.asList(command);
when(promise.isVoid()).thenReturn(true);
sut.write(context, commands, promise);

verify(context).write(commands, promise);
verify(context).write(command, promise);
assertThat(stack).hasSize(1);
}

@Test
public void shouldWriteActiveCommandsInBatch() throws Exception {

Command<String, String, String> anotherCommand = new Command<>(CommandType.APPEND,
new StatusOutput<>(StringCodec.UTF8), null);

List<Command<String, String, String>> commands = Arrays.asList(command, anotherCommand);
when(promise.isVoid()).thenReturn(true);
sut.write(context, commands, promise);

verify(context).write(any(Set.class), eq(promise));
assertThat(stack).hasSize(2);
}

@Test
@SuppressWarnings("unchecked")
public void shouldWriteActiveCommandsInMixedBatch() throws Exception {

Command<String, String, String> command2 = new Command<>(CommandType.APPEND, new StatusOutput<>(new Utf8StringCodec()),
null);
Command<String, String, String> command2 = new Command<>(CommandType.APPEND, new StatusOutput<>(StringCodec.UTF8), null);
command.cancel();
when(promise.isVoid()).thenReturn(true);

sut.write(context, Arrays.asList(command, command2), promise);

ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<Collection> captor = ArgumentCaptor.forClass(Collection.class);
verify(context).write(captor.capture(), any());

assertThat(captor.getValue()).containsOnly(command2);
Expand Down
62 changes: 62 additions & 0 deletions src/test/jmh/io/lettuce/core/protocol/CommandHandlerBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/
package io.lettuce.core.protocol;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
Expand All @@ -28,6 +34,7 @@
* Benchmark for {@link CommandHandler}. Test cases:
* <ul>
* <li>netty (in-eventloop) writes</li>
* <li>netty (in-eventloop) batch writes</li>
* </ul>
*
* @author Mark Paluch
Expand All @@ -43,14 +50,33 @@ public class CommandHandlerBenchmark {

private CommandHandler commandHandler;
private Command command;
private Command batchCommand;
private Collection<Command> commands1;
private List<Command> commands10;
private List<Command> commands100;
private List<Command> commands1000;

@Setup
public void setup() {

commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE, new DefaultEndpoint(CLIENT_OPTIONS));
command = new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY));


commandHandler.setState(CommandHandler.LifecycleState.CONNECTED);

commands1 = Arrays.asList(command);
commands10 = IntStream.range(0, 10)
.mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)))
.collect(Collectors.toList());

commands100 = IntStream.range(0, 100)
.mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)))
.collect(Collectors.toList());

commands1000 = IntStream.range(0, 1000)
.mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)))
.collect(Collectors.toList());
}

@Benchmark
Expand All @@ -61,4 +87,40 @@ public void measureNettyWrite() throws Exception {
// Prevent OOME
commandHandler.getStack().clear();
}

@Benchmark
public void measureNettyWriteBatch1() throws Exception {

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands1, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
}

@Benchmark
public void measureNettyWriteBatch10() throws Exception {

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands10, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
}

@Benchmark
public void measureNettyWriteBatch100() throws Exception {

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands100, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
}

@Benchmark
public void measureNettyWriteBatch1000() throws Exception {

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands1000, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
}
}
4 changes: 2 additions & 2 deletions src/test/jmh/io/lettuce/core/protocol/JmhMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public static void main(String... args) throws IOException, RunnerException {
// run selectively
// runCommandBenchmark();
runCommandHandlerBenchmark();
runRedisEndpointBenchmark();
// runRedisEndpointBenchmark();
// runRedisStateMachineBenchmark();
// runCommandEncoderBenchmark();

// or all
//runBenchmarks();
// runBenchmarks();
}

private static void runBenchmarks() throws RunnerException {
Expand Down

0 comments on commit 9354727

Please sign in to comment.