Skip to content

Commit

Permalink
Update Netty HealthCheckedChannelPool to check KEEP_ALIVE attribute o…
Browse files Browse the repository at this point in the history
…f a channel before picking it up in the pool. See #1380
  • Loading branch information
zoewangg committed Oct 17, 2019
1 parent 99a44ce commit 494f26d
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 9 deletions.
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-NettyNIOHttpClient-9894ee4.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Netty NIO Http Client",
"type": "bugfix",
"description": "Update `HealthCheckedChannelPool` to check `KEEP_ALIVE` when acquiring a channel from the pool to avoid soon-to-be inactive channels being picked up by a new request. This should reduce the frequency of `IOException: Server failed to complete response` errors. See [#1380](https://github.com/aws/aws-sdk-java-v2/issues/1380), [#1466](https://github.com/aws/aws-sdk-java-v2/issues/1466)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public final class ChannelAttributeKey {
static final AttributeKey<Long> EXECUTION_ID_KEY = AttributeKey.newInstance(
"aws.http.nio.netty.async.executionId");

/**
* {@link AttributeKey} to keep track of whether we should close the connection after this request
* has completed.
*/
static final AttributeKey<Boolean> KEEP_ALIVE = AttributeKey.newInstance("aws.http.nio.netty.async.keepAlive");


/**
* Whether the channel is still in use
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
Expand Down Expand Up @@ -159,6 +161,12 @@ private void closeIfUnhealthy(Channel channel) {
* Determine whether the provided channel is 'healthy' enough to use.
*/
private boolean isHealthy(Channel channel) {
// There might be cases where the channel is not reusable but still active at the moment
// See https://github.com/aws/aws-sdk-java-v2/issues/1380
if (channel.attr(KEEP_ALIVE).get() != null && !channel.attr(KEEP_ALIVE).get()) {
return false;
}

return channel.isActive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch;
Expand All @@ -38,7 +39,6 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -66,12 +66,6 @@
@SdkInternalApi
public class ResponseHandler extends SimpleChannelInboundHandler<HttpObject> {

/**
* {@link AttributeKey} to keep track of whether we should close the connection after this request
* has completed.
*/
private static final AttributeKey<Boolean> KEEP_ALIVE = AttributeKey.newInstance("aws.http.nio.netty.async.keepAlive");

private static final Logger log = LoggerFactory.getLogger(ResponseHandler.class);

private static final ResponseHandler INSTANCE = new ResponseHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.Attribute;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
Expand All @@ -52,6 +56,7 @@ public class HealthCheckedChannelPoolTest {
private ChannelPool downstreamChannelPool = Mockito.mock(ChannelPool.class);
private List<Channel> channels = new ArrayList<>();
private ScheduledFuture<?> scheduledFuture = Mockito.mock(ScheduledFuture.class);
private Attribute<Boolean> attribute = mock(Attribute.class);

private static final NettyConfiguration NETTY_CONFIGURATION =
new NettyConfiguration(AttributeMap.builder()
Expand All @@ -64,7 +69,7 @@ public class HealthCheckedChannelPoolTest {

@Before
public void reset() {
Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture);
Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture, attribute);
channels.clear();

Mockito.when(eventLoopGroup.next()).thenReturn(eventLoop);
Expand Down Expand Up @@ -104,6 +109,39 @@ public void acquireCanMakeManyCalls() throws Exception {
Mockito.verify(downstreamChannelPool, Mockito.times(5)).acquire(any());
}

@Test
public void acquireActiveAndKeepAliveTrue_shouldAcquireOnce() throws Exception {
stubForIgnoredTimeout();
stubAcquireActiveAndKeepAlive();

Future<Channel> acquire = channelPool.acquire();

acquire.get(5, TimeUnit.SECONDS);

assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(0));

Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
}


@Test
public void acquire_firstChannelKeepAliveFalse_shouldAcquireAnother() throws Exception {
stubForIgnoredTimeout();
stubAcquireTwiceFirstTimeNotKeepAlive();

Future<Channel> acquire = channelPool.acquire();

acquire.get(5, TimeUnit.SECONDS);

assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(1));

Mockito.verify(downstreamChannelPool, Mockito.times(2)).acquire(any());
}

@Test
public void badDownstreamAcquiresCausesException() throws Exception {
stubForIgnoredTimeout();
Expand Down Expand Up @@ -154,6 +192,7 @@ public void slowAcquireTimesOut() throws Exception {
public void releaseHealthyDoesNotClose() {
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(true);
stubKeepAliveAttribute(channel, null);

channelPool.release(channel);

Expand All @@ -165,7 +204,7 @@ public void releaseHealthyDoesNotClose() {
public void releaseHealthyCloses() {
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(false);

stubKeepAliveAttribute(channel, null);
channelPool.release(channel);

Mockito.verify(channel, times(1)).close();
Expand All @@ -179,13 +218,34 @@ public void stubAcquireHealthySequence(Boolean... acquireHealthySequence) {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(shouldAcquireBeHealthy);
stubKeepAliveAttribute(channel, null);
channels.add(channel);
promise.setSuccess(channel);
return promise;
});
}
}

private void stubAcquireActiveAndKeepAlive() {
OngoingStubbing<Future<Channel>> stubbing = Mockito.when(downstreamChannelPool.acquire(any()));
stubbing = stubbing.thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(true);

stubKeepAliveAttribute(channel, true);

channels.add(channel);
promise.setSuccess(channel);
return promise;
});
}

private void stubKeepAliveAttribute(Channel channel, Boolean isKeepAlive) {
Mockito.when(channel.attr(KEEP_ALIVE)).thenReturn(attribute);
when(attribute.get()).thenReturn(isKeepAlive);
}

public void stubBadDownstreamAcquire() {
Mockito.when(downstreamChannelPool.acquire(any())).thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Expand All @@ -202,4 +262,27 @@ public void stubForIgnoredTimeout() {
Mockito.when(eventLoopGroup.schedule(any(Runnable.class), anyLong(), any()))
.thenAnswer(i -> scheduledFuture);
}

private void stubAcquireTwiceFirstTimeNotKeepAlive() {
OngoingStubbing<Future<Channel>> stubbing = Mockito.when(downstreamChannelPool.acquire(any()));
stubbing = stubbing.thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
stubKeepAliveAttribute(channel, false);
Mockito.when(channel.isActive()).thenReturn(true);
channels.add(channel);
promise.setSuccess(channel);
return promise;
});

stubbing.thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(true);
channels.add(channel);
promise.setSuccess(channel);
stubKeepAliveAttribute(channel, true);
return promise;
});
}
}

0 comments on commit 494f26d

Please sign in to comment.