Skip to content

Commit

Permalink
Do not retry completed commands through RetryListener #767
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Apr 23, 2018
1 parent 83eb1eb commit fb05f21
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 17 deletions.
55 changes: 44 additions & 11 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -806,18 +806,11 @@ private void doComplete(Future<Void> future) {
}

Channel channel = endpoint.channel;
if (channel != null) {

// Capture values before recycler clears these.
RedisCommand<?, ?, ?> sentCommand = this.sentCommand;
Collection<? extends RedisCommand<?, ?, ?>> sentCommands = this.sentCommands;
DefaultEndpoint endpoint = this.endpoint;
channel.eventLoop().submit(() -> {
requeueCommands(sentCommand, sentCommands, endpoint);
});
} else {
requeueCommands(sentCommand, sentCommands, endpoint);
}
// Capture values before recycler clears these.
RedisCommand<?, ?, ?> sentCommand = this.sentCommand;
Collection<? extends RedisCommand<?, ?, ?>> sentCommands = this.sentCommands;
potentiallyRequeueCommands(channel, sentCommand, sentCommands);

if (!(cause instanceof ClosedChannelException)) {

Expand All @@ -832,6 +825,46 @@ private void doComplete(Future<Void> future) {
}
}

/**
* Requeue command/commands
*
* @param channel
* @param sentCommand
* @param sentCommands
*/
private void potentiallyRequeueCommands(Channel channel, RedisCommand<?, ?, ?> sentCommand,
Collection<? extends RedisCommand<?, ?, ?>> sentCommands) {

if (sentCommand != null && sentCommand.isDone()) {
return;
}

if (sentCommands != null) {

boolean foundToSend = false;

for (RedisCommand<?, ?, ?> command : sentCommands) {
if (!command.isDone()) {
foundToSend = true;
break;
}
}

if (!foundToSend) {
return;
}
}

if (channel != null) {
DefaultEndpoint endpoint = this.endpoint;
channel.eventLoop().submit(() -> {
requeueCommands(sentCommand, sentCommands, endpoint);
});
} else {
requeueCommands(sentCommand, sentCommands, endpoint);
}
}

@SuppressWarnings("unchecked")
private void requeueCommands(RedisCommand<?, ?, ?> sentCommand, Collection sentCommands, DefaultEndpoint endpoint) {

Expand Down
27 changes: 21 additions & 6 deletions src/test/java/io/lettuce/core/protocol/DefaultEndpointTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@
import io.lettuce.core.codec.Utf8StringCodec;
import io.lettuce.core.internal.LettuceFactories;
import io.lettuce.core.output.StatusOutput;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.*;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.ImmediateEventExecutor;

Expand All @@ -77,6 +74,8 @@ public class DefaultEndpointTest {
@Mock
private ConnectionWatchdog connectionWatchdog;

private ChannelPromise promise;

@BeforeClass
public static void beforeClass() {
LoggerContext ctx = (LoggerContext) LogManager.getContext();
Expand All @@ -96,6 +95,7 @@ public static void afterClass() {
@Before
public void before() {

promise = new DefaultChannelPromise(channel);
when(channel.writeAndFlush(any())).thenAnswer(invocation -> {
if (invocation.getArguments()[0] instanceof RedisCommand) {
queue.add((RedisCommand) invocation.getArguments()[0]);
Expand All @@ -104,7 +104,7 @@ public void before() {
if (invocation.getArguments()[0] instanceof Collection) {
queue.addAll((Collection) invocation.getArguments()[0]);
}
return new DefaultChannelPromise(channel);
return promise;
});

when(channel.write(any())).thenAnswer(invocation -> {
Expand All @@ -115,7 +115,7 @@ public void before() {
if (invocation.getArguments()[0] instanceof Collection) {
queue.addAll((Collection) invocation.getArguments()[0]);
}
return new DefaultChannelPromise(channel);
return promise;
});

sut = new DefaultEndpoint(ClientOptions.create());
Expand Down Expand Up @@ -338,6 +338,21 @@ public void retryListenerCompletesSuccessfullyAfterDeferredRequeue() {
assertThat(command.exception).isInstanceOf(RedisException.class);
}

@Test
public void retryListenerDoesNotRetryCompletedCommands() {

DefaultEndpoint.RetryListener listener = DefaultEndpoint.RetryListener.newInstance(sut, command);

when(channel.eventLoop()).thenReturn(mock(EventLoop.class));

command.complete();
promise.tryFailure(new Exception());

listener.operationComplete(promise);

verify(channel, never()).writeAndFlush(command);
}

@Test
public void testMTCConcurrentConcurrentWrite() throws Throwable {
TestFramework.runOnce(new MTCConcurrentConcurrentWrite(command));
Expand Down

0 comments on commit fb05f21

Please sign in to comment.