diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index c93b7223cd..505c9ad1f4 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -35,6 +35,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.channel.*; import io.netty.channel.local.LocalAddress; +import io.netty.handler.codec.EncoderException; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.InternalLogLevel; @@ -1261,6 +1262,11 @@ public void operationComplete(Future future) throws Exception { dequeue(); if (!success) { + if (cause instanceof EncoderException || cause instanceof Error || cause.getCause() instanceof Error) { + complete(cause); + return; + } + Channel channel = CommandHandler.this.channel; if (channel != null) { channel.eventLoop().submit(this::requeueCommands); @@ -1300,7 +1306,6 @@ private void requeueCommands() { complete(e); } } - } } } diff --git a/src/test/java/com/lambdaworks/redis/protocol/CommandHandlerTest.java b/src/test/java/com/lambdaworks/redis/protocol/CommandHandlerTest.java index 5258c63a63..8dae9773da 100644 --- a/src/test/java/com/lambdaworks/redis/protocol/CommandHandlerTest.java +++ b/src/test/java/com/lambdaworks/redis/protocol/CommandHandlerTest.java @@ -56,6 +56,7 @@ import edu.umd.cs.mtc.TestFramework; import io.netty.buffer.ByteBuf; import io.netty.channel.*; +import io.netty.handler.codec.EncoderException; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -430,12 +431,23 @@ public void isConnectedShouldReportFalseForDEACTIVATED() throws Exception { } @Test - public void isConnectedShouldReportFalseForCLOSED() throws Exception { + public void isConnectedShouldReportFalseForCLOSED() { sut.setState(CommandHandler.LifecycleState.CLOSED); assertThat(sut.isConnected()).isFalse(); } + @Test + public void shouldCancelCommandsOnEncoderException() throws Exception { + + sut.channelActive(context); + when(promise.cause()).thenReturn(new EncoderException("foo")); + + sut.write(command); + + assertThat(command.exception).isInstanceOf(EncoderException.class); + } + @Test public void shouldNotWriteCancelledCommands() throws Exception {