Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update HealthCheckedChannelPool to check KEEP_ALIVE attribute #1476

Merged
merged 1 commit into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Netty NIO Http Client",
"type": "bugfix",
"description": "Update `HealthCheckedChannelPool` to check `KEEP_ALIVE` when acquiring a channel from the pool to avoid soon-to-be inactive channels being picked up by a new request. This should reduce the frequency of `IOException: Server failed to complete response` errors. See [#1380](https://github.com/aws/aws-sdk-java-v2/issues/1380), [#1466](https://github.com/aws/aws-sdk-java-v2/issues/1466)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public final class ChannelAttributeKey {
static final AttributeKey<Long> EXECUTION_ID_KEY = AttributeKey.newInstance(
"aws.http.nio.netty.async.executionId");

/**
* {@link AttributeKey} to keep track of whether we should close the connection after this request
* has completed.
*/
static final AttributeKey<Boolean> KEEP_ALIVE = AttributeKey.newInstance("aws.http.nio.netty.async.keepAlive");


/**
* Whether the channel is still in use
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
Expand Down Expand Up @@ -159,6 +161,12 @@ private void closeIfUnhealthy(Channel channel) {
* Determine whether the provided channel is 'healthy' enough to use.
*/
private boolean isHealthy(Channel channel) {
// There might be cases where the channel is not reusable but still active at the moment
// See https://github.com/aws/aws-sdk-java-v2/issues/1380
if (channel.attr(KEEP_ALIVE).get() != null && !channel.attr(KEEP_ALIVE).get()) {
return false;
}

Comment on lines 163 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, but any reason we don't close the channel after the response ins consumed instead?

Copy link
Contributor Author

@zoewangg zoewangg Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do invoke ChannelHandlerContext#close in ResponseHandler for this case but the channel might not be actually closed and is still active at this point. It's not guaranteed the channel is closed before it's released into the pool.

Also, because of the fact that the underlying HttpStreamsClientHandler from netty-reactive-streams does not close the channel if there are still inflight messages, channel.close might not be initiated at all. (This is what we discussed in the morning and we might need to figure out why there is still inflight message when the response is complete in some cases)

return channel.isActive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_COMPLETE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch;
Expand All @@ -38,7 +39,6 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.WriteTimeoutException;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -66,12 +66,6 @@
@SdkInternalApi
public class ResponseHandler extends SimpleChannelInboundHandler<HttpObject> {

/**
* {@link AttributeKey} to keep track of whether we should close the connection after this request
* has completed.
*/
private static final AttributeKey<Boolean> KEEP_ALIVE = AttributeKey.newInstance("aws.http.nio.netty.async.keepAlive");

private static final Logger log = LoggerFactory.getLogger(ResponseHandler.class);

private static final ResponseHandler INSTANCE = new ResponseHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.KEEP_ALIVE;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.Attribute;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
Expand All @@ -52,6 +56,7 @@ public class HealthCheckedChannelPoolTest {
private ChannelPool downstreamChannelPool = Mockito.mock(ChannelPool.class);
private List<Channel> channels = new ArrayList<>();
private ScheduledFuture<?> scheduledFuture = Mockito.mock(ScheduledFuture.class);
private Attribute<Boolean> attribute = mock(Attribute.class);

private static final NettyConfiguration NETTY_CONFIGURATION =
new NettyConfiguration(AttributeMap.builder()
Expand All @@ -64,7 +69,7 @@ public class HealthCheckedChannelPoolTest {

@Before
public void reset() {
Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture);
Mockito.reset(eventLoopGroup, eventLoop, downstreamChannelPool, scheduledFuture, attribute);
channels.clear();

Mockito.when(eventLoopGroup.next()).thenReturn(eventLoop);
Expand Down Expand Up @@ -104,6 +109,39 @@ public void acquireCanMakeManyCalls() throws Exception {
Mockito.verify(downstreamChannelPool, Mockito.times(5)).acquire(any());
}

@Test
public void acquireActiveAndKeepAliveTrue_shouldAcquireOnce() throws Exception {
stubForIgnoredTimeout();
stubAcquireActiveAndKeepAlive();

Future<Channel> acquire = channelPool.acquire();

acquire.get(5, TimeUnit.SECONDS);

assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(0));

Mockito.verify(downstreamChannelPool, Mockito.times(1)).acquire(any());
}


@Test
public void acquire_firstChannelKeepAliveFalse_shouldAcquireAnother() throws Exception {
stubForIgnoredTimeout();
stubAcquireTwiceFirstTimeNotKeepAlive();

Future<Channel> acquire = channelPool.acquire();

acquire.get(5, TimeUnit.SECONDS);

assertThat(acquire.isDone()).isTrue();
assertThat(acquire.isSuccess()).isTrue();
assertThat(acquire.getNow()).isEqualTo(channels.get(1));

Mockito.verify(downstreamChannelPool, Mockito.times(2)).acquire(any());
}

@Test
public void badDownstreamAcquiresCausesException() throws Exception {
stubForIgnoredTimeout();
Expand Down Expand Up @@ -154,6 +192,7 @@ public void slowAcquireTimesOut() throws Exception {
public void releaseHealthyDoesNotClose() {
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(true);
stubKeepAliveAttribute(channel, null);

channelPool.release(channel);

Expand All @@ -165,7 +204,7 @@ public void releaseHealthyDoesNotClose() {
public void releaseHealthyCloses() {
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(false);

stubKeepAliveAttribute(channel, null);
channelPool.release(channel);

Mockito.verify(channel, times(1)).close();
Expand All @@ -179,13 +218,34 @@ public void stubAcquireHealthySequence(Boolean... acquireHealthySequence) {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(shouldAcquireBeHealthy);
stubKeepAliveAttribute(channel, null);
channels.add(channel);
promise.setSuccess(channel);
return promise;
});
}
}

private void stubAcquireActiveAndKeepAlive() {
OngoingStubbing<Future<Channel>> stubbing = Mockito.when(downstreamChannelPool.acquire(any()));
stubbing = stubbing.thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(true);

stubKeepAliveAttribute(channel, true);

channels.add(channel);
promise.setSuccess(channel);
return promise;
});
}

private void stubKeepAliveAttribute(Channel channel, Boolean isKeepAlive) {
Mockito.when(channel.attr(KEEP_ALIVE)).thenReturn(attribute);
when(attribute.get()).thenReturn(isKeepAlive);
}

public void stubBadDownstreamAcquire() {
Mockito.when(downstreamChannelPool.acquire(any())).thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Expand All @@ -202,4 +262,27 @@ public void stubForIgnoredTimeout() {
Mockito.when(eventLoopGroup.schedule(any(Runnable.class), anyLong(), any()))
.thenAnswer(i -> scheduledFuture);
}

private void stubAcquireTwiceFirstTimeNotKeepAlive() {
OngoingStubbing<Future<Channel>> stubbing = Mockito.when(downstreamChannelPool.acquire(any()));
stubbing = stubbing.thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
stubKeepAliveAttribute(channel, false);
Mockito.when(channel.isActive()).thenReturn(true);
channels.add(channel);
promise.setSuccess(channel);
return promise;
});

stubbing.thenAnswer(invocation -> {
Promise<Channel> promise = invocation.getArgumentAt(0, Promise.class);
Channel channel = Mockito.mock(Channel.class);
Mockito.when(channel.isActive()).thenReturn(true);
channels.add(channel);
promise.setSuccess(channel);
stubKeepAliveAttribute(channel, true);
return promise;
});
}
}