From fb05f2148f5fff6e941e2d08b472910e096d3667 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 23 Apr 2018 14:07:34 +0200 Subject: [PATCH] Do not retry completed commands through RetryListener #767 --- .../core/protocol/DefaultEndpoint.java | 55 +++++++++++++++---- .../core/protocol/DefaultEndpointTest.java | 27 +++++++-- 2 files changed, 65 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index 7bbde3111d..5ac5e8c4b4 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -806,18 +806,11 @@ private void doComplete(Future future) { } Channel channel = endpoint.channel; - if (channel != null) { - // Capture values before recycler clears these. - RedisCommand sentCommand = this.sentCommand; - Collection> sentCommands = this.sentCommands; - DefaultEndpoint endpoint = this.endpoint; - channel.eventLoop().submit(() -> { - requeueCommands(sentCommand, sentCommands, endpoint); - }); - } else { - requeueCommands(sentCommand, sentCommands, endpoint); - } + // Capture values before recycler clears these. + RedisCommand sentCommand = this.sentCommand; + Collection> sentCommands = this.sentCommands; + potentiallyRequeueCommands(channel, sentCommand, sentCommands); if (!(cause instanceof ClosedChannelException)) { @@ -832,6 +825,46 @@ private void doComplete(Future future) { } } + /** + * Requeue command/commands + * + * @param channel + * @param sentCommand + * @param sentCommands + */ + private void potentiallyRequeueCommands(Channel channel, RedisCommand sentCommand, + Collection> sentCommands) { + + if (sentCommand != null && sentCommand.isDone()) { + return; + } + + if (sentCommands != null) { + + boolean foundToSend = false; + + for (RedisCommand command : sentCommands) { + if (!command.isDone()) { + foundToSend = true; + break; + } + } + + if (!foundToSend) { + return; + } + } + + if (channel != null) { + DefaultEndpoint endpoint = this.endpoint; + channel.eventLoop().submit(() -> { + requeueCommands(sentCommand, sentCommands, endpoint); + }); + } else { + requeueCommands(sentCommand, sentCommands, endpoint); + } + } + @SuppressWarnings("unchecked") private void requeueCommands(RedisCommand sentCommand, Collection sentCommands, DefaultEndpoint endpoint) { diff --git a/src/test/java/io/lettuce/core/protocol/DefaultEndpointTest.java b/src/test/java/io/lettuce/core/protocol/DefaultEndpointTest.java index 797c732a53..2c299f7d90 100644 --- a/src/test/java/io/lettuce/core/protocol/DefaultEndpointTest.java +++ b/src/test/java/io/lettuce/core/protocol/DefaultEndpointTest.java @@ -51,10 +51,7 @@ import io.lettuce.core.codec.Utf8StringCodec; import io.lettuce.core.internal.LettuceFactories; import io.lettuce.core.output.StatusOutput; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.DefaultChannelPromise; -import io.netty.channel.EventLoop; +import io.netty.channel.*; import io.netty.handler.codec.EncoderException; import io.netty.util.concurrent.ImmediateEventExecutor; @@ -77,6 +74,8 @@ public class DefaultEndpointTest { @Mock private ConnectionWatchdog connectionWatchdog; + private ChannelPromise promise; + @BeforeClass public static void beforeClass() { LoggerContext ctx = (LoggerContext) LogManager.getContext(); @@ -96,6 +95,7 @@ public static void afterClass() { @Before public void before() { + promise = new DefaultChannelPromise(channel); when(channel.writeAndFlush(any())).thenAnswer(invocation -> { if (invocation.getArguments()[0] instanceof RedisCommand) { queue.add((RedisCommand) invocation.getArguments()[0]); @@ -104,7 +104,7 @@ public void before() { if (invocation.getArguments()[0] instanceof Collection) { queue.addAll((Collection) invocation.getArguments()[0]); } - return new DefaultChannelPromise(channel); + return promise; }); when(channel.write(any())).thenAnswer(invocation -> { @@ -115,7 +115,7 @@ public void before() { if (invocation.getArguments()[0] instanceof Collection) { queue.addAll((Collection) invocation.getArguments()[0]); } - return new DefaultChannelPromise(channel); + return promise; }); sut = new DefaultEndpoint(ClientOptions.create()); @@ -338,6 +338,21 @@ public void retryListenerCompletesSuccessfullyAfterDeferredRequeue() { assertThat(command.exception).isInstanceOf(RedisException.class); } + @Test + public void retryListenerDoesNotRetryCompletedCommands() { + + DefaultEndpoint.RetryListener listener = DefaultEndpoint.RetryListener.newInstance(sut, command); + + when(channel.eventLoop()).thenReturn(mock(EventLoop.class)); + + command.complete(); + promise.tryFailure(new Exception()); + + listener.operationComplete(promise); + + verify(channel, never()).writeAndFlush(command); + } + @Test public void testMTCConcurrentConcurrentWrite() throws Throwable { TestFramework.runOnce(new MTCConcurrentConcurrentWrite(command));