Skip to content

Commit

Permalink
Add commands to stack through write promise #616
Browse files Browse the repository at this point in the history
Lettuce now appends written commands to the command stack using the write promise. Appending the command with the promise does not require command dequeueing on write failures but increases GC pressure.
  • Loading branch information
mp911de committed Oct 9, 2017
1 parent 8852809 commit 5463f78
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 40 deletions.
16 changes: 3 additions & 13 deletions src/main/java/com/lambdaworks/redis/protocol/AsyncCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,26 +206,16 @@ public boolean equals(Object o) {
return false;
}

RedisCommand<?, ?, ?> left = command;
while (left instanceof DecoratedCommand) {
left = CommandWrapper.unwrap(left);
}

RedisCommand<?, ?, ?> right = (RedisCommand<?, ?, ?>) o;
while (right instanceof DecoratedCommand) {
right = CommandWrapper.unwrap(right);
}
RedisCommand<?, ?, ?> left = CommandWrapper.unwrap(command);
RedisCommand<?, ?, ?> right = CommandWrapper.unwrap((RedisCommand<?, ?, ?>) o);

return left == right;
}

@Override
public int hashCode() {

RedisCommand<?, ?, ?> toHash = command;
while (toHash instanceof DecoratedCommand) {
toHash = CommandWrapper.unwrap(toHash);
}
RedisCommand<?, ?, ?> toHash = CommandWrapper.unwrap(command);

return toHash != null ? toHash.hashCode() : 0;
}
Expand Down
29 changes: 14 additions & 15 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -712,12 +712,20 @@ private void addToStack(RedisCommand<K, V, ?> command, ChannelPromise promise) {
+ ". Commands are not accepted until the stack size drops.");
}

command = potentiallyWrapLatencyCommand(command);
RedisCommand<K, V, ?> commandToUse = potentiallyWrapLatencyCommand(command);

if (stack.contains(command)) {
throw new RedisException("Attempting to write duplicate command that is already enqueued: " + command);
}

if (promise.isVoid()) {
stack.add(commandToUse);
} else {
stack.add(command);
promise.addListener(future -> {
if (future.isSuccess()) {
stack.add(commandToUse);
}
});
}
}
} catch (RuntimeException e) {
Expand Down Expand Up @@ -1169,23 +1177,14 @@ private class ListenerSupport {
this.sentCommands = sentCommands;
}

void dequeue(boolean success) {
void dequeue() {

if (sentCommand != null) {

QUEUE_SIZE.decrementAndGet(CommandHandler.this);
if (!success) {
CommandHandler.this.stack.remove(sentCommand);
CommandHandler.this.disconnectedBuffer.remove(sentCommand);
}
}

if (sentCommands != null) {
QUEUE_SIZE.addAndGet(CommandHandler.this, -sentCommands.size());
if (!success) {
CommandHandler.this.stack.removeAll(sentCommands);
CommandHandler.this.disconnectedBuffer.removeAll(sentCommands);
}
}
}

Expand Down Expand Up @@ -1217,7 +1216,7 @@ private class AtMostOnceWriteListener extends ListenerSupport implements Channel
@Override
public void operationComplete(ChannelFuture future) throws Exception {

dequeue(true);
dequeue();

if (future.cause() != null) {
complete(future.cause());
Expand Down Expand Up @@ -1245,7 +1244,7 @@ public void operationComplete(Future<Void> future) throws Exception {
Throwable cause = future.cause();

boolean success = future.isSuccess();
dequeue(success);
dequeue();

if (!success) {
Channel channel = CommandHandler.this.channel;
Expand All @@ -1256,7 +1255,7 @@ public void operationComplete(Future<Void> future) throws Exception {
}
}

if (!future.isSuccess() && !(cause instanceof ClosedChannelException)) {
if (!success && !(cause instanceof ClosedChannelException)) {

String message = "Unexpected exception during request: {}";
InternalLogLevel logLevel = InternalLogLevel.WARN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;

import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void operateOnPartiallyDownCluster() throws Exception {
connection.sync().get(key_10439);
fail("Missing RedisException");
} catch (RedisException e) {
assertThat(e).hasRootCauseInstanceOf(ConnectException.class);
assertThat(e).hasRootCauseInstanceOf(IOException.class);
}

connection.close();
Expand Down Expand Up @@ -133,7 +134,7 @@ public void partitionNodesAreOffline() throws Exception {
redisClusterClient.connect();
fail("Missing RedisConnectionException");
} catch (RedisConnectionException e) {
assertThat(e).hasRootCauseInstanceOf(ConnectException.class);
assertThat(e).hasRootCauseInstanceOf(IOException.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import edu.umd.cs.mtc.TestFramework;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -87,6 +89,9 @@ public class CommandHandlerTest {
@Mock
private RedisChannelHandler channelHandler;

@Mock
private ChannelPromise promise;

@BeforeClass
public static void beforeClass() {
LoggerContext ctx = (LoggerContext) LogManager.getContext();
Expand Down Expand Up @@ -129,13 +134,21 @@ public void before() throws Exception {
stack.addAll((Collection) invocation.getArguments()[0]);
}

return new DefaultChannelPromise(channel);
return promise;
});

sut = new CommandHandler<>(ClientOptions.create(), clientResources);
sut.setRedisChannelHandler(channelHandler);
disconnectedBuffer = (Queue) ReflectionTestUtils.getField(sut, "disconnectedBuffer");
stack = (Queue) ReflectionTestUtils.getField(sut, "stack");

when(promise.addListener(any())).then(invocation -> {

GenericFutureListener<Future<Void>> listener = invocation.getArgument(0);
listener.operationComplete(promise);

return null;
});
}

@Test
Expand All @@ -145,7 +158,6 @@ public void testChannelActive() throws Exception {
sut.channelActive(context);

verify(pipeline).fireUserEventTriggered(any(ConnectionEvents.Activated.class));

}

@Test
Expand Down Expand Up @@ -173,6 +185,8 @@ public void testChannelActiveFailureShouldCancelCommands() throws Exception {
@Test
public void testChannelActiveWithBufferedAndQueuedCommands() throws Exception {

when(promise.isSuccess()).thenReturn(true);

Command<String, String, String> bufferedCommand = new Command<>(CommandType.GET, new StatusOutput<>(
new Utf8StringCodec()), null);

Expand Down Expand Up @@ -261,6 +275,8 @@ public void testChannelActiveReplayBufferedCommands() throws Exception {
disconnectedBuffer.add(bufferedCommand1);
disconnectedBuffer.add(bufferedCommand2);

when(promise.isSuccess()).thenReturn(true);

sut.channelRegistered(context);
sut.channelActive(context);

Expand Down Expand Up @@ -470,20 +486,33 @@ public void shouldCancelCommandOnQueueBatchFailure() throws Exception {
verify(commandMock).completeExceptionally(exception);
}

@Test
public void shouldWriteActiveCommandsForVoidPromise() throws Exception {

when(promise.isVoid()).thenReturn(true);

sut.write(context, command, promise);

verify(context).write(command, promise);
assertThat(stack).hasSize(1).allMatch(o -> o instanceof LatencyMeteredCommand);
}

@Test
public void shouldWriteActiveCommands() throws Exception {

sut.write(context, command, null);
when(promise.isSuccess()).thenReturn(true);

verify(context).write(command, null);
sut.write(context, command, promise);

verify(context).write(command, promise);
assertThat(stack).hasSize(1).allMatch(o -> o instanceof LatencyMeteredCommand);
}

@Test
public void shouldNotWriteCancelledCommandBatch() throws Exception {

command.cancel();
sut.write(context, Arrays.asList(command), null);
sut.write(context, Arrays.asList(command), promise);

verifyZeroInteractions(context);
assertThat(disconnectedBuffer).isEmpty();
Expand All @@ -492,23 +521,26 @@ public void shouldNotWriteCancelledCommandBatch() throws Exception {
@Test
public void shouldWriteActiveCommandsInBatch() throws Exception {

when(promise.isSuccess()).thenReturn(true);

List<Command<String, String, String>> commands = Arrays.asList(command);
sut.write(context, commands, null);
sut.write(context, commands, promise);

verify(context).write(commands, null);
verify(context).write(commands, promise);
assertThat(stack).hasSize(1);
}

@Test
@SuppressWarnings("unchecked")
public void shouldWriteActiveCommandsInMixedBatch() throws Exception {

when(promise.isSuccess()).thenReturn(true);

Command<String, String, String> command2 = new Command<>(CommandType.APPEND, new StatusOutput<>(new Utf8StringCodec()),
null);

command.cancel();

sut.write(context, Arrays.asList(command, command2), null);
sut.write(context, Arrays.asList(command, command2), promise);

ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
verify(context).write(captor.capture(), any());
Expand All @@ -532,10 +564,11 @@ public void shouldIgnoreNonReadableBuffers() throws Exception {
@Test(timeout = 5000)
public void shouldRebuildHugeQueue() throws Exception {

when(promise.isSuccess()).thenReturn(true);

for (int i = 0; i < 500000; i++) {

Command<String, String, String> command = new Command<>(CommandType.SET, new StatusOutput<>(StringCodec.UTF8));

disconnectedBuffer.add(new AsyncCommand<>(command));
}

Expand Down

0 comments on commit 5463f78

Please sign in to comment.