Skip to content

Commit

Permalink
Do not requeue commands on unrecoverable errors #680
Browse files Browse the repository at this point in the history
Commands are canceled now when the attempted write fails with an recoverable error (EncoderException, Errors).
  • Loading branch information
mp911de committed Jan 30, 2018
1 parent f914c8f commit 8ef0dfd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.local.LocalAddress;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
Expand Down Expand Up @@ -1261,6 +1262,11 @@ public void operationComplete(Future<Void> future) throws Exception {
dequeue();

if (!success) {
if (cause instanceof EncoderException || cause instanceof Error || cause.getCause() instanceof Error) {
complete(cause);
return;
}

Channel channel = CommandHandler.this.channel;
if (channel != null) {
channel.eventLoop().submit(this::requeueCommands);
Expand Down Expand Up @@ -1300,7 +1306,6 @@ private void requeueCommands() {
complete(e);
}
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import edu.umd.cs.mtc.TestFramework;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.handler.codec.EncoderException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
Expand Down Expand Up @@ -430,12 +431,23 @@ public void isConnectedShouldReportFalseForDEACTIVATED() throws Exception {
}

@Test
public void isConnectedShouldReportFalseForCLOSED() throws Exception {
public void isConnectedShouldReportFalseForCLOSED() {

sut.setState(CommandHandler.LifecycleState.CLOSED);
assertThat(sut.isConnected()).isFalse();
}

@Test
public void shouldCancelCommandsOnEncoderException() throws Exception {

sut.channelActive(context);
when(promise.cause()).thenReturn(new EncoderException("foo"));

sut.write(command);

assertThat(command.exception).isInstanceOf(EncoderException.class);
}

@Test
public void shouldNotWriteCancelledCommands() throws Exception {

Expand Down

0 comments on commit 8ef0dfd

Please sign in to comment.