From 494f26d7299830b80c2db6c1b9ec4b5c44104694 Mon Sep 17 00:00:00 2001 From: Zoe Wang Date: Thu, 17 Oct 2019 14:43:12 -0700 Subject: [PATCH] Update Netty HealthCheckedChannelPool to check KEEP_ALIVE attribute of a channel before picking it up in the pool. See #1380 --- .../bugfix-NettyNIOHttpClient-9894ee4.json | 5 ++ .../netty/internal/ChannelAttributeKey.java | 7 ++ .../internal/HealthCheckedChannelPool.java | 8 ++ .../nio/netty/internal/ResponseHandler.java | 8 +- .../HealthCheckedChannelPoolTest.java | 87 ++++++++++++++++++- 5 files changed, 106 insertions(+), 9 deletions(-) create mode 100644 .changes/next-release/bugfix-NettyNIOHttpClient-9894ee4.json diff --git a/.changes/next-release/bugfix-NettyNIOHttpClient-9894ee4.json b/.changes/next-release/bugfix-NettyNIOHttpClient-9894ee4.json new file mode 100644 index 000000000000..e2b26da47816 --- /dev/null +++ b/.changes/next-release/bugfix-NettyNIOHttpClient-9894ee4.json @@ -0,0 +1,5 @@ +{ + "category": "Netty NIO Http Client", + "type": "bugfix", + "description": "Update `HealthCheckedChannelPool` to check `KEEP_ALIVE` when acquiring a channel from the pool to avoid soon-to-be inactive channels being picked up by a new request. This should reduce the frequency of `IOException: Server failed to complete response` errors. See [#1380](https://github.com/aws/aws-sdk-java-v2/issues/1380), [#1466](https://github.com/aws/aws-sdk-java-v2/issues/1466)." +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java index 65468364a67c..f5e33f3d5f66 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java @@ -66,6 +66,13 @@ public final class ChannelAttributeKey { static final AttributeKey EXECUTION_ID_KEY = AttributeKey.newInstance( "aws.http.nio.netty.async.executionId"); + /** + * {@link AttributeKey} to keep track of whether we should close the connection after this request + * has completed. + */ + static final AttributeKey KEEP_ALIVE = AttributeKey.newInstance("aws.http.nio.netty.async.keepAlive"); + + /** * Whether the channel is still in use */ diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java index 58945edef8b7..8172f91a5a5b 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPool.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.http.nio.netty.internal; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE; + import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelPool; @@ -159,6 +161,12 @@ private void closeIfUnhealthy(Channel channel) { * Determine whether the provided channel is 'healthy' enough to use. */ private boolean isHealthy(Channel channel) { + // There might be cases where the channel is not reusable but still active at the moment + // See https://github.com/aws/aws-sdk-java-v2/issues/1380 + if (channel.attr(KEEP_ALIVE).get() != null && !channel.attr(KEEP_ALIVE).get()) { + return false; + } + return channel.isActive(); } } diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java index 541a30fc7a9d..ad9c2df247c4 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java @@ -18,6 +18,7 @@ import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE; import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY; import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY; import static software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch; @@ -38,7 +39,6 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.WriteTimeoutException; -import io.netty.util.AttributeKey; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -66,12 +66,6 @@ @SdkInternalApi public class ResponseHandler extends SimpleChannelInboundHandler { - /** - * {@link AttributeKey} to keep track of whether we should close the connection after this request - * has completed. - */ - private static final AttributeKey KEEP_ALIVE = AttributeKey.newInstance("aws.http.nio.netty.async.keepAlive"); - private static final Logger log = LoggerFactory.getLogger(ResponseHandler.class); private static final ResponseHandler INSTANCE = new ResponseHandler(); diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java index 9a0ee5b3ad99..b3af40680ca6 100644 --- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HealthCheckedChannelPoolTest.java @@ -18,14 +18,18 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE; import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelPool; +import io.netty.util.Attribute; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; @@ -52,6 +56,7 @@ public class HealthCheckedChannelPoolTest { private ChannelPool downstreamChannelPool = Mockito.mock(ChannelPool.class); private List channels = new ArrayList<>(); private ScheduledFuture scheduledFuture = Mockito.mock(ScheduledFuture.class); + private Attribute attribute = mock(Attribute.class); private static final NettyConfiguration NETTY_CONFIGURATION = new NettyConfiguration(AttributeMap.builder() @@ -64,7 +69,7 @@ public class HealthCheckedChannelPoolTest { @Before public void reset() { - Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture); + Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture, attribute); channels.clear(); Mockito.when(eventLoopGroup.next()).thenReturn(eventLoop); @@ -104,6 +109,39 @@ public void acquireCanMakeManyCalls() throws Exception { Mockito.verify(downstreamChannelPool, Mockito.times(5)).acquire(any()); } + @Test + public void acquireActiveAndKeepAliveTrue_shouldAcquireOnce() throws Exception { + stubForIgnoredTimeout(); + stubAcquireActiveAndKeepAlive(); + + Future acquire = channelPool.acquire(); + + acquire.get(5, TimeUnit.SECONDS); + + assertThat(acquire.isDone()).isTrue(); + assertThat(acquire.isSuccess()).isTrue(); + assertThat(acquire.getNow()).isEqualTo(channels.get(0)); + + Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any()); + } + + + @Test + public void acquire_firstChannelKeepAliveFalse_shouldAcquireAnother() throws Exception { + stubForIgnoredTimeout(); + stubAcquireTwiceFirstTimeNotKeepAlive(); + + Future acquire = channelPool.acquire(); + + acquire.get(5, TimeUnit.SECONDS); + + assertThat(acquire.isDone()).isTrue(); + assertThat(acquire.isSuccess()).isTrue(); + assertThat(acquire.getNow()).isEqualTo(channels.get(1)); + + Mockito.verify(downstreamChannelPool, Mockito.times(2)).acquire(any()); + } + @Test public void badDownstreamAcquiresCausesException() throws Exception { stubForIgnoredTimeout(); @@ -154,6 +192,7 @@ public void slowAcquireTimesOut() throws Exception { public void releaseHealthyDoesNotClose() { Channel channel = Mockito.mock(Channel.class); Mockito.when(channel.isActive()).thenReturn(true); + stubKeepAliveAttribute(channel, null); channelPool.release(channel); @@ -165,7 +204,7 @@ public void releaseHealthyDoesNotClose() { public void releaseHealthyCloses() { Channel channel = Mockito.mock(Channel.class); Mockito.when(channel.isActive()).thenReturn(false); - + stubKeepAliveAttribute(channel, null); channelPool.release(channel); Mockito.verify(channel, times(1)).close(); @@ -179,6 +218,7 @@ public void stubAcquireHealthySequence(Boolean... acquireHealthySequence) { Promise promise = invocation.getArgumentAt(0, Promise.class); Channel channel = Mockito.mock(Channel.class); Mockito.when(channel.isActive()).thenReturn(shouldAcquireBeHealthy); + stubKeepAliveAttribute(channel, null); channels.add(channel); promise.setSuccess(channel); return promise; @@ -186,6 +226,26 @@ public void stubAcquireHealthySequence(Boolean... acquireHealthySequence) { } } + private void stubAcquireActiveAndKeepAlive() { + OngoingStubbing> stubbing = Mockito.when(downstreamChannelPool.acquire(any())); + stubbing = stubbing.thenAnswer(invocation -> { + Promise promise = invocation.getArgumentAt(0, Promise.class); + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.isActive()).thenReturn(true); + + stubKeepAliveAttribute(channel, true); + + channels.add(channel); + promise.setSuccess(channel); + return promise; + }); + } + + private void stubKeepAliveAttribute(Channel channel, Boolean isKeepAlive) { + Mockito.when(channel.attr(KEEP_ALIVE)).thenReturn(attribute); + when(attribute.get()).thenReturn(isKeepAlive); + } + public void stubBadDownstreamAcquire() { Mockito.when(downstreamChannelPool.acquire(any())).thenAnswer(invocation -> { Promise promise = invocation.getArgumentAt(0, Promise.class); @@ -202,4 +262,27 @@ public void stubForIgnoredTimeout() { Mockito.when(eventLoopGroup.schedule(any(Runnable.class), anyLong(), any())) .thenAnswer(i -> scheduledFuture); } + + private void stubAcquireTwiceFirstTimeNotKeepAlive() { + OngoingStubbing> stubbing = Mockito.when(downstreamChannelPool.acquire(any())); + stubbing = stubbing.thenAnswer(invocation -> { + Promise promise = invocation.getArgumentAt(0, Promise.class); + Channel channel = Mockito.mock(Channel.class); + stubKeepAliveAttribute(channel, false); + Mockito.when(channel.isActive()).thenReturn(true); + channels.add(channel); + promise.setSuccess(channel); + return promise; + }); + + stubbing.thenAnswer(invocation -> { + Promise promise = invocation.getArgumentAt(0, Promise.class); + Channel channel = Mockito.mock(Channel.class); + Mockito.when(channel.isActive()).thenReturn(true); + channels.add(channel); + promise.setSuccess(channel); + stubKeepAliveAttribute(channel, true); + return promise; + }); + } } \ No newline at end of file