diff --git a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientResponseImpl.java b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientResponseImpl.java index 580d28c4aec..3094a7e3d22 100644 --- a/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientResponseImpl.java +++ b/webclient/http1/src/main/java/io/helidon/webclient/http1/Http1ClientResponseImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package io.helidon.webclient.http1; import java.io.InputStream; +import java.lang.System.Logger.Level; import java.util.List; import java.util.OptionalLong; import java.util.ServiceLoader; @@ -27,6 +28,7 @@ import io.helidon.common.HelidonServiceLoader; import io.helidon.common.LazyValue; import io.helidon.common.buffers.BufferData; +import io.helidon.common.buffers.DataReader; import io.helidon.common.media.type.ParserMode; import io.helidon.http.ClientRequestHeaders; import io.helidon.http.ClientResponseHeaders; @@ -153,7 +155,7 @@ public void close() { if (headers().contains(HeaderValues.CONNECTION_CLOSE)) { connection.closeResource(); } else { - if (entityFullyRead || entityLength == 0) { + if (entityFullyRead || entityLength == 0 || consumeUnreadEntity()) { connection.releaseResource(); } else { connection.closeResource(); @@ -186,6 +188,32 @@ ClientConnection connection() { return connection; } + /** + * Attempts to consume an unread entity for the purpose of re-using a cached connection. + * Only works for length-prefixed responses. + * + * @return {@code true} if consumed, {@code false} otherwise + */ + private boolean consumeUnreadEntity() { + if (entityLength == ENTITY_LENGTH_CHUNKED) { + return false; + } + DataReader reader = connection.reader(); + if (reader.available() != entityLength) { + return false; + } + try { + for (long i = 0; i < entityLength; i++) { + reader.read(); + } + entityFullyRead = true; + return true; + } catch (RuntimeException e) { + LOGGER.log(Level.DEBUG, "Exception while consuming entity", e); + return false; + } + } + private ReadableEntity entity(ClientRequestHeaders requestHeaders, ClientResponseHeaders responseHeaders) { if (inputStream == null) { diff --git a/webclient/tests/http1/src/test/java/io/helidon/webclient/http1/Http1ClientTest.java b/webclient/tests/http1/src/test/java/io/helidon/webclient/http1/Http1ClientTest.java index d23bb2d2bc5..4a82c5bb338 100644 --- a/webclient/tests/http1/src/test/java/io/helidon/webclient/http1/Http1ClientTest.java +++ b/webclient/tests/http1/src/test/java/io/helidon/webclient/http1/Http1ClientTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -255,6 +255,37 @@ void testConnectionQueueDequeue() { } } + @Test + void testConnectionCachingUnreadEntity() { + ClientConnection connectionNow; + ClientConnection connectionPrior = null; + for (int i = 0; i < 5; ++i) { + HttpClientRequest request = injectedHttp1client.put("/test"); + // connection will be dequeued if queue is not empty + WebClient webClient = WebClient.create(); + Http1ClientConfig clientConfig = Http1ClientConfig.create(); + Http1ClientImpl http1Client = new Http1ClientImpl(webClient, clientConfig); + connectionNow = http1Client + .connectionCache() + .connection(http1Client, + Duration.ZERO, + injectedHttp1client.prototype().tls(), + Proxy.noProxy(), + request.resolvedUri(), + request.headers(), + true); + request.connection(connectionNow); + // submitted entity is echoed back but not consumed here + HttpClientResponse response = request.submit("this is an entity"); + // connection will be queued up + response.close(); + if (connectionPrior != null) { + assertThat(connectionNow, is(connectionPrior)); + } + connectionPrior = connectionNow; + } + } + @Test void testConnectionQueueSizeLimit() { int connectionQueueSize = injectedHttp1client.prototype().connectionCacheSize();