From bce33f4e2bc87bc1bafb31edaa25ff98d6ea95ed Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 7 Jun 2024 14:51:33 +0900 Subject: [PATCH 1/3] Prevents the connection from being used immediately when `WriteTimeoutException` occurs. Motivation: I got a report from LINE internally that the connection was not terminated but hung when the vm instance was shut down due to maintainance of the underlying hyperviser. `operationComplete()` was not called for `Channel.write()` for this abnormal connection. There was neither a normal response nor a failure response. https://github.com/line/armeria/blob/f0ec7cb729d1fb33d238c6ea8fb9af41460ab37d/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java#L126-L126 As a result, `WriteTimeoutException` occurred and try to reset the connection. https://github.com/line/armeria/blob/f0ec7cb729d1fb33d238c6ea8fb9af41460ab37d/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java#L331-L331 `failAndReset()` calls `Channel.write(Unpooled.EMPTY_BUFFER)` first and then calls `Channel.close()`. Since `channel.write()` does not respond, the connection cannot be closed. Modifications: - Deactivate `HttpSession` first before proceeding with the reset process. - Unhealthy connections will be cleaned up eventually with `KeeyAliveHandler` by idle timeout. Result: Fixed a bug where a connection was reused after `WriteTimeoutException` occurred. --- .../client/AbstractHttpRequestHandler.java | 4 +++ .../armeria/client/WriteTimeoutTest.java | 27 ++++++++++++++----- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java b/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java index b613a2f9216..68853d353b8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java +++ b/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java @@ -329,6 +329,10 @@ private void fail(Throwable cause) { } final void failAndReset(Throwable cause) { + // Mark the session as unhealthy so that subsequent requests do not use it. + final HttpSession session = HttpSession.get(ch); + session.deactivate(); + if (cause instanceof ProxyConnectException || cause instanceof ResponseCompleteException) { // - ProxyConnectException is handled by HttpSessionHandler.exceptionCaught(). // - ResponseCompleteException means the response is successfully received. diff --git a/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java b/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java index ace7c823ff8..f41a2115bbf 100644 --- a/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java @@ -16,8 +16,12 @@ package com.linecorp.armeria.client; +import static com.linecorp.armeria.internal.client.HttpSession.get; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.util.concurrent.CompletionException; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -29,6 +33,7 @@ import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.RequestHeadersBuilder; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.logging.RequestLog; import com.linecorp.armeria.server.Route; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -63,12 +68,20 @@ void testWriteTimeout() { headersBuilder.add("header1", Strings.repeat("a", 2048)); // set a header over 1KB // using h1c since http2 compresses headers - assertThatThrownBy(() -> WebClient.builder(SessionProtocol.H1C, server.httpEndpoint()) - .factory(clientFactory) - .writeTimeoutMillis(1000) - .build() - .blocking() - .execute(headersBuilder.build(), "content")) - .isInstanceOf(WriteTimeoutException.class); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + final HttpResponse res = WebClient.builder(SessionProtocol.H1C, server.httpEndpoint()) + .factory(clientFactory) + .writeTimeoutMillis(1000) + .build() + .execute(headersBuilder.build(), "content"); + final ClientRequestContext ctx = captor.get(); + assertThatThrownBy(() -> res.aggregate().join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(WriteTimeoutException.class); + + final RequestLog log = ctx.log().whenComplete().join(); + // Make sure that the session is deactivated after the write timeout. + assertThat(get(log.channel()).isAcquirable()).isFalse(); + } } } From 422f30e2ca6b50916fc5ee220d2ef9bc48038985 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Fri, 7 Jun 2024 15:45:23 +0900 Subject: [PATCH 2/3] remove unexpected static import in test --- .../java/com/linecorp/armeria/client/WriteTimeoutTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java b/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java index f41a2115bbf..3890ed8f262 100644 --- a/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/WriteTimeoutTest.java @@ -16,7 +16,6 @@ package com.linecorp.armeria.client; -import static com.linecorp.armeria.internal.client.HttpSession.get; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -34,6 +33,7 @@ import com.linecorp.armeria.common.RequestHeadersBuilder; import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.internal.client.HttpSession; import com.linecorp.armeria.server.Route; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -81,7 +81,7 @@ void testWriteTimeout() { final RequestLog log = ctx.log().whenComplete().join(); // Make sure that the session is deactivated after the write timeout. - assertThat(get(log.channel()).isAcquirable()).isFalse(); + assertThat(HttpSession.get(log.channel()).isAcquirable()).isFalse(); } } } From 8e34185e921ca057c69cd1dc362b89cb14f26ab1 Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Mon, 10 Jun 2024 12:54:04 +0900 Subject: [PATCH 3/3] Address comments --- .../armeria/client/AbstractHttpRequestHandler.java | 12 +++++++----- .../armeria/client/Http1ResponseDecoder.java | 2 +- .../armeria/client/Http2ResponseDecoder.java | 4 ++-- .../client/HttpClientPipelineConfigurator.java | 2 +- .../linecorp/armeria/client/HttpSessionHandler.java | 2 +- .../client/WebSocketHttp1ClientChannelHandler.java | 2 +- .../internal/client/DefaultClientRequestContext.java | 2 +- .../armeria/internal/client/HttpSession.java | 6 ++++-- 8 files changed, 18 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java b/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java index 68853d353b8..0c41b11e7f8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java +++ b/core/src/main/java/com/linecorp/armeria/client/AbstractHttpRequestHandler.java @@ -169,7 +169,7 @@ final boolean tryInitialize() { "Can't send requests. ID: " + id + ", session active: " + session.isAcquirable(responseDecoder.keepAliveHandler())); } - session.deactivate(); + session.markUnacquirable(); // No need to send RST because we didn't send any packet and this will be disconnected anyway. fail(UnprocessedRequestException.of(exception)); return false; @@ -223,7 +223,7 @@ final void writeHeaders(RequestHeaders headers) { // connection by sending a GOAWAY frame that will be sent after receiving the corresponding // response from the remote peer. The "Connection: close" header is stripped when it is converted to // a Netty HTTP/2 header. - session.deactivate(); + session.markUnacquirable(); } final ChannelPromise promise = ch.newPromise(); @@ -329,9 +329,11 @@ private void fail(Throwable cause) { } final void failAndReset(Throwable cause) { - // Mark the session as unhealthy so that subsequent requests do not use it. - final HttpSession session = HttpSession.get(ch); - session.deactivate(); + if (cause instanceof WriteTimeoutException) { + final HttpSession session = HttpSession.get(ch); + // Mark the session as unhealthy so that subsequent requests do not use it. + session.markUnacquirable(); + } if (cause instanceof ProxyConnectException || cause instanceof ResponseCompleteException) { // - ProxyConnectException is handled by HttpSessionHandler.exceptionCaught(). diff --git a/core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java b/core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java index 3e0f571e4ae..d1304716dcc 100644 --- a/core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java +++ b/core/src/main/java/com/linecorp/armeria/client/Http1ResponseDecoder.java @@ -183,7 +183,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (!HttpUtil.isKeepAlive(nettyRes)) { - session().deactivate(); + session().markUnacquirable(); } final HttpResponseWrapper res = getResponse(resId); diff --git a/core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java b/core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java index 147e976cfe3..26e2d076716 100644 --- a/core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java +++ b/core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java @@ -159,14 +159,14 @@ public void onStreamRemoved(Http2Stream stream) {} @Override public void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) { - session().deactivate(); + session().markUnacquirable(); goAwayHandler.onGoAwaySent(channel(), lastStreamId, errorCode, debugData); } @Override public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { // Should not reuse a connection that received a GOAWAY frame. - session().deactivate(); + session().markUnacquirable(); goAwayHandler.onGoAwayReceived(channel(), lastStreamId, errorCode, debugData); } diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java b/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java index 6da8aea4a18..4937682bff0 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java @@ -805,7 +805,7 @@ private static final class ReadSuppressingAndChannelDeactivatingHandler extends @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - HttpSession.get(ctx.channel()).deactivate(); + HttpSession.get(ctx.channel()).markUnacquirable(); super.close(ctx, promise); } } diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java b/core/src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java index 6fcb5c129c5..01375220af8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpSessionHandler.java @@ -327,7 +327,7 @@ public boolean isAcquirable(KeepAliveHandler keepAliveHandler) { } @Override - public void deactivate() { + public void markUnacquirable() { isAcquirable = false; } diff --git a/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java b/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java index baf4518234c..0b604248860 100644 --- a/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java +++ b/core/src/main/java/com/linecorp/armeria/client/WebSocketHttp1ClientChannelHandler.java @@ -180,7 +180,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (!HttpUtil.isKeepAlive(nettyRes)) { - session().deactivate(); + session().markUnacquirable(); } if (res == null && ArmeriaHttpUtil.isRequestTimeoutResponse(nettyRes)) { diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java index 16bb7f4c01f..279b5d52218 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java @@ -1013,7 +1013,7 @@ public CompletableFuture initiateConnectionShutdown() { }); // To deactivate the channel when initiateShutdown is called after the RequestHeaders is sent. // The next request will trigger shutdown. - HttpSession.get(ch).deactivate(); + HttpSession.get(ch).markUnacquirable(); } }); return completableFuture; diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/HttpSession.java b/core/src/main/java/com/linecorp/armeria/internal/client/HttpSession.java index 675400205ad..2b69160e015 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/HttpSession.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/HttpSession.java @@ -17,6 +17,7 @@ package com.linecorp.armeria.internal.client; import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.WriteTimeoutException; import com.linecorp.armeria.common.ClosedSessionException; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.SerializationFormat; @@ -90,7 +91,7 @@ public boolean isAcquirable(KeepAliveHandler keepAliveHandler) { } @Override - public void deactivate() {} + public void markUnacquirable() {} @Override public int incrementAndGetNumRequestsSent() { @@ -137,9 +138,10 @@ static HttpSession get(Channel ch) { *
  • A connection is closed.
  • *
  • "Connection: close" header is sent or received.
  • *
  • A GOAWAY frame is sent or received.
  • + *
  • A {@link WriteTimeoutException} is raised
  • * */ - void deactivate(); + void markUnacquirable(); /** * Returns {@code true} if a new request can be sent with this {@link HttpSession}.