Skip to content

Commit

Permalink
Prevents the connection from being used immediately when `WriteTimeou…
Browse files Browse the repository at this point in the history
…tException` occurs. (#5738)

Motivation:

I got a report from LINE internally where the connection was not
terminated but hung when the VM instance was shut down due to
maintenance of the underlying hypervisor.

`operationComplete()` was not called for `Channel.write()` for this
abnormal connection. There was neither a normal response nor a failure
response.

https://github.com/line/armeria/blob/f0ec7cb729d1fb33d238c6ea8fb9af41460ab37d/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java#L126-L126
As a result, `WriteTimeoutException` occurred and tried to reset the
connection.

https://github.com/line/armeria/blob/f0ec7cb729d1fb33d238c6ea8fb9af41460ab37d/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java#L331-L331

`failAndReset()` calls `Channel.write(Unpooled.EMPTY_BUFFER)` first and
then calls `Channel.close()`.

https://github.com/line/armeria/blob/f0ec7cb729d1fb33d238c6ea8fb9af41460ab37d/core/src/main/java/com/linecorp/armeria/internal/common/Http1ObjectEncoder.java#L371-L374
Since `channel.write()` does not respond, the connection cannot be
closed.

Modifications:

- Rename `HttpSession.deactivate()` to `markUnacquirable()` for clarity.
- Mark the session as unacquirable first before proceeding with the reset
procedure.
- Unhealthy connections will be cleaned up eventually with
`KeeyAliveHandler` by idle timeout.

Result:

Fixed a bug where a connection was reused after `WriteTimeoutException`
occurred.
  • Loading branch information
ikhoon authored Jun 10, 2024
1 parent b47d6c9 commit f3dbf7b
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ final boolean tryInitialize() {
"Can't send requests. ID: " + id + ", session active: " +
session.isAcquirable(responseDecoder.keepAliveHandler()));
}
session.deactivate();
session.markUnacquirable();
// No need to send RST because we didn't send any packet and this will be disconnected anyway.
fail(UnprocessedRequestException.of(exception));
return false;
Expand Down Expand Up @@ -223,7 +223,7 @@ final void writeHeaders(RequestHeaders headers) {
// connection by sending a GOAWAY frame that will be sent after receiving the corresponding
// response from the remote peer. The "Connection: close" header is stripped when it is converted to
// a Netty HTTP/2 header.
session.deactivate();
session.markUnacquirable();
}

final ChannelPromise promise = ch.newPromise();
Expand Down Expand Up @@ -329,6 +329,12 @@ private void fail(Throwable cause) {
}

final void failAndReset(Throwable cause) {
if (cause instanceof WriteTimeoutException) {
final HttpSession session = HttpSession.get(ch);
// Mark the session as unhealthy so that subsequent requests do not use it.
session.markUnacquirable();
}

if (cause instanceof ProxyConnectException || cause instanceof ResponseCompleteException) {
// - ProxyConnectException is handled by HttpSessionHandler.exceptionCaught().
// - ResponseCompleteException means the response is successfully received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

if (!HttpUtil.isKeepAlive(nettyRes)) {
session().deactivate();
session().markUnacquirable();
}

final HttpResponseWrapper res = getResponse(resId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ public void onStreamRemoved(Http2Stream stream) {}

@Override
public void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
session().deactivate();
session().markUnacquirable();
goAwayHandler.onGoAwaySent(channel(), lastStreamId, errorCode, debugData);
}

@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
// Should not reuse a connection that received a GOAWAY frame.
session().deactivate();
session().markUnacquirable();
goAwayHandler.onGoAwayReceived(channel(), lastStreamId, errorCode, debugData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ private static final class ReadSuppressingAndChannelDeactivatingHandler extends

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
HttpSession.get(ctx.channel()).deactivate();
HttpSession.get(ctx.channel()).markUnacquirable();
super.close(ctx, promise);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public boolean isAcquirable(KeepAliveHandler keepAliveHandler) {
}

@Override
public void deactivate() {
public void markUnacquirable() {
isAcquirable = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

if (!HttpUtil.isKeepAlive(nettyRes)) {
session().deactivate();
session().markUnacquirable();
}

if (res == null && ArmeriaHttpUtil.isRequestTimeoutResponse(nettyRes)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ public CompletableFuture<Void> initiateConnectionShutdown() {
});
// To deactivate the channel when initiateShutdown is called after the RequestHeaders is sent.
// The next request will trigger shutdown.
HttpSession.get(ch).deactivate();
HttpSession.get(ch).markUnacquirable();
}
});
return completableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.armeria.internal.client;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.WriteTimeoutException;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.SerializationFormat;
Expand Down Expand Up @@ -90,7 +91,7 @@ public boolean isAcquirable(KeepAliveHandler keepAliveHandler) {
}

@Override
public void deactivate() {}
public void markUnacquirable() {}

@Override
public int incrementAndGetNumRequestsSent() {
Expand Down Expand Up @@ -137,9 +138,10 @@ static HttpSession get(Channel ch) {
* <li>A connection is closed.</li>
* <li>"Connection: close" header is sent or received.</li>
* <li>A GOAWAY frame is sent or received.</li>
* <li>A {@link WriteTimeoutException} is raised</li>
* </ul>
*/
void deactivate();
void markUnacquirable();

/**
* Returns {@code true} if a new request can be sent with this {@link HttpSession}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.linecorp.armeria.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.concurrent.CompletionException;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -29,6 +32,8 @@
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.RequestHeadersBuilder;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.internal.client.HttpSession;
import com.linecorp.armeria.server.Route;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
Expand Down Expand Up @@ -63,12 +68,20 @@ void testWriteTimeout() {
headersBuilder.add("header1", Strings.repeat("a", 2048)); // set a header over 1KB

// using h1c since http2 compresses headers
assertThatThrownBy(() -> WebClient.builder(SessionProtocol.H1C, server.httpEndpoint())
.factory(clientFactory)
.writeTimeoutMillis(1000)
.build()
.blocking()
.execute(headersBuilder.build(), "content"))
.isInstanceOf(WriteTimeoutException.class);
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
final HttpResponse res = WebClient.builder(SessionProtocol.H1C, server.httpEndpoint())
.factory(clientFactory)
.writeTimeoutMillis(1000)
.build()
.execute(headersBuilder.build(), "content");
final ClientRequestContext ctx = captor.get();
assertThatThrownBy(() -> res.aggregate().join())
.isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(WriteTimeoutException.class);

final RequestLog log = ctx.log().whenComplete().join();
// Make sure that the session is deactivated after the write timeout.
assertThat(HttpSession.get(log.channel()).isAcquirable()).isFalse();
}
}
}

0 comments on commit f3dbf7b

Please sign in to comment.