Skip to content

Commit

Permalink
Do not requeue commands on unrecoverable errors #680
Browse files Browse the repository at this point in the history
Commands are canceled now when the attempted write fails with an recoverable error (EncoderException, Errors).
  • Loading branch information
mp911de committed Jan 30, 2018
1 parent 9f47a2c commit cb1bda3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
7 changes: 7 additions & 0 deletions src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
Expand Down Expand Up @@ -734,6 +735,12 @@ public void operationComplete(Future<Void> future) throws Exception {
dequeue();

if (!success) {

if (cause instanceof EncoderException || cause instanceof Error || cause.getCause() instanceof Error) {
complete(cause);
return;
}

Channel channel = DefaultEndpoint.this.channel;
if (channel != null) {
channel.eventLoop().submit(this::requeueCommands);
Expand Down
32 changes: 28 additions & 4 deletions src/test/java/io/lettuce/core/protocol/DefaultEndpointTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@
import io.lettuce.core.codec.Utf8StringCodec;
import io.lettuce.core.internal.LettuceFactories;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.resource.ClientResources;
import io.netty.channel.Channel;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.ImmediateEventExecutor;

@RunWith(MockitoJUnitRunner.class)
public class DefaultEndpointTest {
Expand All @@ -70,9 +71,6 @@ public class DefaultEndpointTest {
@Mock
private ConnectionFacade connectionFacade;

@Mock
private ClientResources clientResources;

@Mock
private ConnectionWatchdog connectionWatchdog;

Expand Down Expand Up @@ -203,6 +201,32 @@ public void notifyDrainQueuedCommandsShouldWriteCommands() {
verify(channel).writeAndFlush(eq(Arrays.asList(command)));
}

@Test
public void shouldCancelCommandsOnEncoderException() {

when(channel.isActive()).thenReturn(true);
sut.notifyChannelActive(channel);

DefaultChannelPromise promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);

when(channel.writeAndFlush(any())).thenAnswer(invocation -> {
if (invocation.getArguments()[0] instanceof RedisCommand) {
queue.add((RedisCommand) invocation.getArguments()[0]);
}

if (invocation.getArguments()[0] instanceof Collection) {
queue.addAll((Collection) invocation.getArguments()[0]);
}
return promise;
});

promise.setFailure(new EncoderException("foo"));

sut.write(command);

assertThat(command.exception).isInstanceOf(EncoderException.class);
}

@Test
public void writeShouldRejectCommandsInDisconnectedState() {

Expand Down

0 comments on commit cb1bda3

Please sign in to comment.