From 5c1fdb781c4513377b779c7c3f6ef0d7586e7b42 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 27 Sep 2023 12:50:38 +1000 Subject: [PATCH] wrap chunked body and report size on close --- .../org/elasticsearch/http/HttpStatsIT.java | 40 ++++++++++------- .../elasticsearch/http/HttpRouteStats.java | 15 +++++++ .../rest/ChunkedRestResponseBody.java | 34 +------------- .../elasticsearch/rest/RestController.java | 44 ++++++++++++++++++- 4 files changed, 83 insertions(+), 50 deletions(-) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HttpStatsIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HttpStatsIT.java index 5be12958bd17e..c582191c085f4 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HttpStatsIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HttpStatsIT.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 0, numClientNodes = 0) public class HttpStatsIT extends HttpSmokeTestCase { @@ -86,25 +87,30 @@ private void assertHttpStats(XContentTestUtils.JsonMapView jsonMapView) { final List routes = List.of("/", "/_cat/nodes", "/{index}/_search", "/_cluster/state"); for (var route : routes) { - assertThat(jsonMapView.get("http.routes." + route), notNullValue()); - assertThat(jsonMapView.get("http.routes." + route + ".requests.count"), equalTo(1)); - assertThat(jsonMapView.get("http.routes." + route + ".requests.total_size_in_bytes"), greaterThanOrEqualTo(0)); - assertThat(jsonMapView.get("http.routes." + route + ".responses.count"), equalTo(1)); - assertThat(jsonMapView.get("http.routes." + route + ".responses.total_size_in_bytes"), greaterThan(1)); - assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram"), hasSize(1)); - assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.count"), equalTo(1)); - assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.lt_bytes"), notNullValue()); + assertThat(route, jsonMapView.get("http.routes." + route), notNullValue()); + assertThat(route, jsonMapView.get("http.routes." + route + ".requests.count"), equalTo(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".requests.total_size_in_bytes"), greaterThanOrEqualTo(0)); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.count"), equalTo(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.total_size_in_bytes"), greaterThan(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram"), hasSize(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.count"), equalTo(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.lt_bytes"), notNullValue()); if (route.equals("/{index}/_search")) { - assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.ge_bytes"), notNullValue()); + assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.ge_bytes"), notNullValue()); } - assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram"), hasSize(1)); - assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.count"), equalTo(1)); - assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.lt_bytes"), notNullValue()); - assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.ge_bytes"), notNullValue()); - assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram"), hasSize(1)); - assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.count"), equalTo(1)); - assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.lt_millis"), notNullValue()); - assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.ge_millis"), notNullValue()); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram"), hasSize(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.count"), equalTo(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.lt_bytes"), notNullValue()); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.ge_bytes"), notNullValue()); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram"), hasSize(1)); + assertThat(route, jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.count"), equalTo(1)); + final int ltMillis = jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.lt_millis"); + assertThat(route, ltMillis, notNullValue()); + assertThat( + route, + jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.ge_millis"), + ltMillis > 1 ? notNullValue() : nullValue() + ); } } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpRouteStats.java b/server/src/main/java/org/elasticsearch/http/HttpRouteStats.java index f91547c55816b..41a134923e136 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRouteStats.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRouteStats.java @@ -21,6 +21,21 @@ import java.util.Objects; import java.util.stream.IntStream; +/** + * This class encapsulates the stats for a single HTTP route {@link org.elasticsearch.rest.MethodHandlers} + * + * @param requestCount the number of request handled by the HTTP route + * @param totalRequestSize the total body size (bytes) of requests handled by the HTTP route + * @param requestSizeHistogram an array of frequencies of request size (bytes) in buckets with upper bounds + * as returned by {@link HttpRouteStatsTracker#getBucketUpperBounds()}, plus + * an extra bucket for handling size larger than the largest upper bound (currently 64MB). + * @param responseCount the number of responses produced by the HTTP route + * @param totalResponseSize the total body size (bytes) of responses produced by the HTTP route + * @param responseSizeHistogram similar to {@code requestSizeHistogram} but for response size + * @param responseTimeHistogram an array of frequencies of response time (millis) in buckets with upper bounds + * as returned by {@link HandlingTimeTracker#getBucketUpperBounds()}, plus + * an extra bucket for handling response time larger than the longest upper bound (currently 65536ms). + */ public record HttpRouteStats( long requestCount, long totalRequestSize, diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index 45b05757c7bcd..9cfe7b84577db 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -29,7 +29,6 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.util.Iterator; -import java.util.function.Consumer; /** * The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and @@ -58,10 +57,6 @@ public interface ChunkedRestResponseBody extends Releasable { */ String getResponseContentTypeString(); - default void setChunkedSizeListener(Consumer sizeConsumer) { - throw new UnsupportedOperationException("not supported"); - } - /** * Create a chunked response body to be written to a specific {@link RestChannel} from a {@link ChunkedToXContent}. * @@ -80,9 +75,6 @@ static ChunkedRestResponseBody fromXContent( return new ChunkedRestResponseBody() { - private int size = 0; - private Consumer sizeConsumer = null; - private final OutputStream out = new OutputStream() { @Override public void write(int b) throws IOException { @@ -110,12 +102,7 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public boolean isDone() { - var result = serialization.hasNext() == false; - if (result && sizeConsumer != null) { - sizeConsumer.accept(size); - sizeConsumer = null; - } - return result; + return serialization.hasNext() == false; } @Override @@ -138,7 +125,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec () -> Releasables.closeExpectNoException(chunkStream) ); target = null; - size += result.length(); return result; } finally { if (target != null) { @@ -155,10 +141,6 @@ public String getResponseContentTypeString() { } @Override - public void setChunkedSizeListener(Consumer sizeConsumer) { - this.sizeConsumer = sizeConsumer; - } - public void close() { Releasables.closeExpectNoException(releasable); } @@ -175,8 +157,6 @@ static ChunkedRestResponseBody fromTextChunks( @Nullable Releasable releasable ) { return new ChunkedRestResponseBody() { - private int size = 0; - private Consumer sizeConsumer = null; private RecyclerBytesStreamOutput currentOutput; private final Writer writer = new OutputStreamWriter(new OutputStream() { @Override @@ -206,12 +186,7 @@ public void close() { @Override public boolean isDone() { - var result = chunkIterator.hasNext() == false; - if (result && sizeConsumer != null) { - sizeConsumer.accept(size); - sizeConsumer = null; - } - return result; + return chunkIterator.hasNext() == false; } @Override @@ -236,7 +211,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec () -> Releasables.closeExpectNoException(chunkOutput) ); currentOutput = null; - size += result.length(); return result; } finally { if (currentOutput != null) { @@ -253,10 +227,6 @@ public String getResponseContentTypeString() { } @Override - public void setChunkedSizeListener(Consumer sizeConsumer) { - this.sizeConsumer = sizeConsumer; - } - public void close() { Releasables.closeExpectNoException(releasable); } diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index e44cd3ea116ab..a376b15f2f243 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -11,15 +11,18 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.path.PathTrie; +import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; @@ -50,6 +53,7 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.StreamSupport; @@ -779,7 +783,10 @@ public void sendResponse(RestResponse response) { if (response.isChunked() == false) { methodHandlers.addResponseStats(response.content().length()); } else { - response.chunkedContent().setChunkedSizeListener(methodHandlers::addResponseStats); + response = RestResponse.chunked( + response.status(), + new EncodeLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers::addResponseStats) + ); } delegate.sendResponse(response); success = true; @@ -799,6 +806,41 @@ private void close() { } } + private static class EncodeLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody { + + private final ChunkedRestResponseBody delegate; + private final Consumer encodedLengthConsumer; + private int encodedLength = 0; + + private EncodeLengthTrackingChunkedRestResponseBody(ChunkedRestResponseBody delegate, Consumer encodedLengthConsumer) { + this.delegate = delegate; + this.encodedLengthConsumer = encodedLengthConsumer; + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { + final ReleasableBytesReference bytesReference = delegate.encodeChunk(sizeHint, recycler); + encodedLength += bytesReference.length(); + return bytesReference; + } + + @Override + public String getResponseContentTypeString() { + return delegate.getResponseContentTypeString(); + } + + @Override + public void close() { + delegate.close(); + encodedLengthConsumer.accept(encodedLength); + } + } + private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { // We always obtain a fresh breaker to reflect changes to the breaker configuration. return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);