Skip to content

Commit

Permalink
Do not retry completed commands through RetryListener #767
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Apr 23, 2018
1 parent faa6cc0 commit 273ca69
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
53 changes: 43 additions & 10 deletions src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ protected void complete(Throwable t) {
}
}

private class AtMostOnceWriteListener extends ListenerSupport implements ChannelFutureListener {
class AtMostOnceWriteListener extends ListenerSupport implements ChannelFutureListener {

AtMostOnceWriteListener(RedisCommand<K, V, ?> sentCommand) {
super(sentCommand);
Expand All @@ -1276,7 +1276,7 @@ private class AtMostOnceWriteListener extends ListenerSupport implements Channel
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
public void operationComplete(ChannelFuture future) {

dequeue();

Expand All @@ -1289,7 +1289,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
/**
* A generic future listener which retries unsuccessful writes.
*/
private class RetryListener extends ListenerSupport implements GenericFutureListener<Future<Void>> {
class RetryListener extends ListenerSupport implements GenericFutureListener<Future<Void>> {

RetryListener(RedisCommand<K, V, ?> sentCommand) {
super(sentCommand);
Expand All @@ -1301,7 +1301,7 @@ private class RetryListener extends ListenerSupport implements GenericFutureList

@SuppressWarnings("unchecked")
@Override
public void operationComplete(Future<Void> future) throws Exception {
public void operationComplete(Future<Void> future) {

Throwable cause = future.cause();

Expand All @@ -1314,12 +1314,7 @@ public void operationComplete(Future<Void> future) throws Exception {
return;
}

Channel channel = CommandHandler.this.channel;
if (channel != null) {
channel.eventLoop().submit(this::requeueCommands);
} else {
CommandHandler.this.clientResources.eventExecutorGroup().submit(this::requeueCommands);
}
potentiallyRequeueCommands(channel, sentCommand, sentCommands);
}

if (!success && !(cause instanceof ClosedChannelException)) {
Expand All @@ -1335,7 +1330,45 @@ public void operationComplete(Future<Void> future) throws Exception {
}
}

/**
* Requeue command/commands
*
* @param channel
* @param sentCommand
* @param sentCommands
*/
private void potentiallyRequeueCommands(Channel channel, RedisCommand<?, ?, ?> sentCommand,
Collection<? extends RedisCommand<?, ?, ?>> sentCommands) {

if (sentCommand != null && sentCommand.isDone()) {
return;
}

if (sentCommands != null) {

boolean foundToSend = false;

for (RedisCommand<?, ?, ?> command : sentCommands) {
if (!command.isDone()) {
foundToSend = true;
break;
}
}

if (!foundToSend) {
return;
}
}

if (channel != null) {
channel.eventLoop().submit(this::requeueCommands);
} else {
CommandHandler.this.clientResources.eventExecutorGroup().submit(this::requeueCommands);
}
}

private void requeueCommands() {

if (sentCommand != null) {
try {
write(sentCommand);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,19 @@ public void shouldRebuildHugeQueue() throws Exception {
assertThat(disconnectedBuffer).isEmpty();
}

@Test
public void retryListenerDoesNotRetryCompletedCommands() {

CommandHandler.RetryListener listener = sut.new RetryListener(command);

command.complete();
when(promise.cause()).thenReturn(new Exception());

listener.operationComplete(promise);

verify(channel, never()).writeAndFlush(command);
}

@Test
public void testMTCConcurrentWriteThenReset() throws Throwable {
TestFramework.runOnce(new MTCConcurrentWriteThenReset(clientResources, command));
Expand Down

0 comments on commit 273ca69

Please sign in to comment.