Skip to content

Commit

Permalink
Retain sent commands on requeue failure completion #734
Browse files Browse the repository at this point in the history
Lettuce now retains the actual sent commands during exceptional completion. Failures on requeue can occur if the connection is closed or queue bounds are exceeded. Previously, we lost command context because the listener was recycled hence command completion ran into NullPointerExceptions.
  • Loading branch information
mp911de committed Mar 17, 2018
1 parent 5ef6722 commit c2ea58a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
11 changes: 7 additions & 4 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ public void operationComplete(ChannelFuture future) {

dequeue();

if (future.cause() != null) {
if (!future.isSuccess() && future.cause() != null) {
complete(future.cause());
}
} finally {
Expand Down Expand Up @@ -815,7 +815,6 @@ private void doComplete(Future<Void> future) {
RedisCommand<?, ?, ?> sentCommand = this.sentCommand;
Collection<? extends RedisCommand<?, ?, ?>> sentCommands = this.sentCommands;
DefaultEndpoint endpoint = this.endpoint;

channel.eventLoop().submit(() -> {
requeueCommands(sentCommand, sentCommands, endpoint);
});
Expand All @@ -836,18 +835,22 @@ private void doComplete(Future<Void> future) {
}
}

@SuppressWarnings("unchecked")
private void requeueCommands(RedisCommand<?, ?, ?> sentCommand, Collection sentCommands, DefaultEndpoint endpoint) {

if (sentCommand != null) {
try {
endpoint.write(sentCommand);
} catch (Exception e) {
complete(e);
sentCommand.completeExceptionally(e);
}
} else {
try {
endpoint.write(sentCommands);
} catch (Exception e) {
complete(e);
for (RedisCommand<?, ?, ?> command : (Collection<RedisCommand>) sentCommands) {
command.completeExceptionally(e);
}
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/io/lettuce/core/protocol/DefaultEndpointTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -36,6 +38,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
Expand All @@ -49,7 +52,9 @@
import io.lettuce.core.internal.LettuceFactories;
import io.lettuce.core.output.StatusOutput;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.ImmediateEventExecutor;

Expand Down Expand Up @@ -307,6 +312,32 @@ public void closeAllowsOnlyOneCall() {
verify(connectionWatchdog).prepareClose();
}

@Test
public void retryListenerCompletesSuccessfullyAfterDeferredRequeue() {

DefaultEndpoint.RetryListener listener = DefaultEndpoint.RetryListener.newInstance(sut, command);

ChannelFuture future = mock(ChannelFuture.class);
EventLoop eventLoopGroup = mock(EventLoop.class);

when(future.isSuccess()).thenReturn(false);
when(future.cause()).thenReturn(new ClosedChannelException());
when(channel.eventLoop()).thenReturn(eventLoopGroup);
when(channel.close()).thenReturn(mock(ChannelFuture.class));

sut.notifyChannelActive(channel);
sut.close();

listener.operationComplete(future);

ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(eventLoopGroup).submit(runnableCaptor.capture());

runnableCaptor.getValue().run();

assertThat(command.exception).isInstanceOf(RedisException.class);
}

@Test
public void testMTCConcurrentConcurrentWrite() throws Throwable {
TestFramework.runOnce(new MTCConcurrentConcurrentWrite(command));
Expand Down

0 comments on commit c2ea58a

Please sign in to comment.