Skip to content

Commit

Permalink
Move command write deduplication to batch write #709
Browse files Browse the repository at this point in the history
Lettuce now checks for command stack duplicates in CommandHandler.writeBatch(…) by using LinkedHashSet. Duplicates occur usually in batch submissions and the check in the single command path causes additional cost that isn't necessary in the majority of cases.

Using LinkedHashSet as intermediate collection reduces contains cost from O(N^2) to constant time.
  • Loading branch information
mp911de committed Feb 27, 2018
1 parent e503915 commit 3ba91ca
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 36 deletions.
62 changes: 29 additions & 33 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,25 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
return;
}

if (msg instanceof List) {

List<RedisCommand<K, V, ?>> batch = (List<RedisCommand<K, V, ?>>) msg;

if (batch.size() == 1) {

writeSingleCommand(ctx, batch.get(0), promise);
return;
}

writeBatch(ctx, batch, promise);
return;
}

if (msg instanceof Collection) {
writeBatch(ctx, (Collection<RedisCommand<K, V, ?>>) msg, promise);

Collection<RedisCommand<K, V, ?>> batch = (Collection<RedisCommand<K, V, ?>>) msg;

writeBatch(ctx, batch, promise);
}
}

Expand All @@ -666,52 +683,35 @@ private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<K, V, ?>
private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<K, V, ?>> batch, ChannelPromise promise)
throws Exception {

Collection<RedisCommand<K, V, ?>> toWrite = batch;
int commandsToWrite = 0;
Collection<RedisCommand<K, V, ?>> deduplicated = new LinkedHashSet<>(batch.size(), 1);

boolean cancelledCommands = false;
for (RedisCommand<?, ?, ?> command : batch) {
for (RedisCommand<K, V, ?> command : batch) {

if (!isWriteable(command)) {
cancelledCommands = true;
break;
if (isWriteable(command) && !deduplicated.add(command)) {
deduplicated.remove(command);
command.completeExceptionally(new RedisException(
"Attempting to write duplicate command that is already enqueued: " + command));
}

commandsToWrite++;
}

try {
validateWrite(commandsToWrite);
validateWrite(deduplicated.size());
} catch (Exception e) {

for (RedisCommand<?, ?, ?> redisCommand : toWrite) {
for (RedisCommand<?, ?, ?> redisCommand : deduplicated) {
redisCommand.completeExceptionally(e);
}

promise.setFailure(e);
return;
}

if (cancelledCommands) {

toWrite = new ArrayList<>(batch.size());

for (RedisCommand<K, V, ?> command : batch) {

if (!isWriteable(command)) {
continue;
}

toWrite.add(command);
}
}

for (RedisCommand<K, V, ?> command : toWrite) {
for (RedisCommand<K, V, ?> command : deduplicated) {
addToStack(command, promise);
}

if (!toWrite.isEmpty()) {
ctx.write(toWrite, promise);
if (!deduplicated.isEmpty()) {
ctx.write(deduplicated, promise);
}
}

Expand All @@ -732,10 +732,6 @@ private void addToStack(RedisCommand<K, V, ?> command, ChannelPromise promise) {

RedisCommand<K, V, ?> commandToUse = potentiallyWrapLatencyCommand(command);

if (stack.contains(command)) {
throw new RedisException("Attempting to write duplicate command that is already enqueued: " + command);
}

if (promise.getClass() == VOID_PROMISE_CLASS) {
stack.add(commandToUse);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,18 @@ public void shouldCancelCommandOnQueueBatchFailure() throws Exception {
verify(commandMock).completeExceptionally(exception);
}

@Test
public void shouldFailOnDuplicateCommands() throws Exception {

Command<String, String, String> commandMock = mock(Command.class);

ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
sut.write(context, Arrays.asList(commandMock, commandMock), channelPromise);

assertThat(stack).isEmpty();
verify(commandMock).completeExceptionally(any(RedisException.class));
}

@Test
public void shouldWriteActiveCommands() throws Exception {

Expand All @@ -520,17 +532,29 @@ public void shouldNotWriteCancelledCommandBatch() throws Exception {
}

@Test
public void shouldWriteActiveCommandsInBatch() throws Exception {
public void shouldWriteSingleActiveCommandsInBatch() throws Exception {

when(promise.isSuccess()).thenReturn(true);

List<Command<String, String, String>> commands = Arrays.asList(command);
sut.write(context, commands, promise);

verify(context).write(commands, promise);
verify(context).write(command, promise);
assertThat(stack).hasSize(1);
}

@Test
public void shouldWriteActiveCommandsInBatch() throws Exception {

Command<String, String, String> anotherCommand = new Command<>(CommandType.APPEND,
new StatusOutput<>(StringCodec.UTF8), null);

List<Command<String, String, String>> commands = Arrays.asList(command, anotherCommand);
sut.write(context, commands, promise);

verify(context).write(any(Set.class), eq(promise));
}

@Test
@SuppressWarnings("unchecked")
public void shouldWriteActiveCommandsInMixedBatch() throws Exception {
Expand All @@ -543,7 +567,7 @@ public void shouldWriteActiveCommandsInMixedBatch() throws Exception {

sut.write(context, Arrays.asList(command, command2), promise);

ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<Collection> captor = ArgumentCaptor.forClass(Collection.class);
verify(context).write(captor.capture(), any());

assertThat(captor.getValue()).containsOnly(command2);
Expand Down

0 comments on commit 3ba91ca

Please sign in to comment.