Skip to content

Commit

Permalink
Reduce the chance of the race condition where an http2 connection get…
Browse files Browse the repository at this point in the history
…s picked up at the same time it gets closed
  • Loading branch information
zoewangg committed Jan 24, 2020
1 parent 59219bb commit e9d93a4
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-NettyNIOHTTPClient-037932d.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Netty NIO HTTP Client",
"type": "bugfix",
"description": "Reduce the chances of the race condition where an HTTP2 connection gets reused at the same time it gets inactive and update it to throw `IOException` so that the request can be retried."
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -79,7 +80,7 @@ boolean acquireStream(Promise<Channel> promise) {
return false;
}

private void acquireClaimedStream(Promise<Channel> promise) {
void acquireClaimedStream(Promise<Channel> promise) {
doInEventLoop(connection.eventLoop(), () -> {
if (state != RecordState.OPEN) {
String message;
Expand All @@ -91,7 +92,7 @@ private void acquireClaimedStream(Promise<Channel> promise) {
message = String.format("Connection %s was closed while acquiring new stream.", connection);
}
log.warn(() -> message);
promise.setFailure(new IllegalStateException(message));
promise.setFailure(new IOException(message));
return;
}

Expand Down Expand Up @@ -201,7 +202,7 @@ private void closeAndExecuteOnChildChannels(Consumer<Channel> childChannelConsum
});
}

public void closeAndReleaseChild(Channel childChannel) {
void closeAndReleaseChild(Channel childChannel) {
childChannel.close();
doInEventLoop(connection.eventLoop(), () -> {
childChannels.remove(childChannel.id());
Expand Down Expand Up @@ -248,9 +249,13 @@ public Channel getConnection() {
return connection;
}

public boolean claimStream() {
private boolean claimStream() {
lastReserveAttemptTimeMillis = System.currentTimeMillis();
for (int attempt = 0; attempt < 5; ++attempt) {
if (!connection.isActive()) {
return false;
}

if (state != RecordState.OPEN) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package software.amazon.awssdk.http.nio.netty.internal.http2;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertFalse;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -26,8 +28,10 @@
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -177,6 +181,30 @@ public void availableStream0_reusableShouldBeFalse() {
assertThat(record.acquireStream(null)).isFalse();
}

@Test
public void acquireStream_channelClosed_shouldReturnFalse() {
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());

MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 1, Duration.ofSeconds(10));
channel.close();
assertFalse(record.acquireStream(channelPromise));
}

@Test
public void acquireClaimedConnection_channelClosed_shouldThrowIOException() {
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());

MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 1, Duration.ofSeconds(10));

record.closeChildChannels();

record.acquireClaimedStream(channelPromise);

assertThatThrownBy(() -> channelPromise.get()).hasCauseInstanceOf(IOException.class);
}

private EmbeddedChannel newHttp2Channel() {
EmbeddedChannel channel = new EmbeddedChannel(Http2FrameCodecBuilder.forClient().build(),
new Http2MultiplexHandler(new NoOpHandler()));
Expand Down

0 comments on commit e9d93a4

Please sign in to comment.