Skip to content

Commit

Permalink
wrap chunked body and report size on close
Browse files Browse the repository at this point in the history
  • Loading branch information
ywangd committed Sep 27, 2023
1 parent 4504e52 commit 5c1fdb7
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 0, numClientNodes = 0)
public class HttpStatsIT extends HttpSmokeTestCase {
Expand Down Expand Up @@ -86,25 +87,30 @@ private void assertHttpStats(XContentTestUtils.JsonMapView jsonMapView) {
final List<String> routes = List.of("/", "/_cat/nodes", "/{index}/_search", "/_cluster/state");

for (var route : routes) {
assertThat(jsonMapView.get("http.routes." + route), notNullValue());
assertThat(jsonMapView.get("http.routes." + route + ".requests.count"), equalTo(1));
assertThat(jsonMapView.get("http.routes." + route + ".requests.total_size_in_bytes"), greaterThanOrEqualTo(0));
assertThat(jsonMapView.get("http.routes." + route + ".responses.count"), equalTo(1));
assertThat(jsonMapView.get("http.routes." + route + ".responses.total_size_in_bytes"), greaterThan(1));
assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram"), hasSize(1));
assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.count"), equalTo(1));
assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.lt_bytes"), notNullValue());
assertThat(route, jsonMapView.get("http.routes." + route), notNullValue());
assertThat(route, jsonMapView.get("http.routes." + route + ".requests.count"), equalTo(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".requests.total_size_in_bytes"), greaterThanOrEqualTo(0));
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.count"), equalTo(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.total_size_in_bytes"), greaterThan(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram"), hasSize(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.count"), equalTo(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.lt_bytes"), notNullValue());
if (route.equals("/{index}/_search")) {
assertThat(jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.ge_bytes"), notNullValue());
assertThat(route, jsonMapView.get("http.routes." + route + ".requests.size_histogram.0.ge_bytes"), notNullValue());
}
assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram"), hasSize(1));
assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.count"), equalTo(1));
assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.lt_bytes"), notNullValue());
assertThat(jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.ge_bytes"), notNullValue());
assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram"), hasSize(1));
assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.count"), equalTo(1));
assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.lt_millis"), notNullValue());
assertThat(jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.ge_millis"), notNullValue());
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram"), hasSize(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.count"), equalTo(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.lt_bytes"), notNullValue());
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.size_histogram.0.ge_bytes"), notNullValue());
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram"), hasSize(1));
assertThat(route, jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.count"), equalTo(1));
final int ltMillis = jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.lt_millis");
assertThat(route, ltMillis, notNullValue());
assertThat(
route,
jsonMapView.get("http.routes." + route + ".responses.handling_time_histogram.0.ge_millis"),
ltMillis > 1 ? notNullValue() : nullValue()
);
}
}
}
15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/http/HttpRouteStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@
import java.util.Objects;
import java.util.stream.IntStream;

/**
* This class encapsulates the stats for a single HTTP route {@link org.elasticsearch.rest.MethodHandlers}
*
* @param requestCount the number of request handled by the HTTP route
* @param totalRequestSize the total body size (bytes) of requests handled by the HTTP route
* @param requestSizeHistogram an array of frequencies of request size (bytes) in buckets with upper bounds
* as returned by {@link HttpRouteStatsTracker#getBucketUpperBounds()}, plus
* an extra bucket for handling size larger than the largest upper bound (currently 64MB).
* @param responseCount the number of responses produced by the HTTP route
* @param totalResponseSize the total body size (bytes) of responses produced by the HTTP route
* @param responseSizeHistogram similar to {@code requestSizeHistogram} but for response size
* @param responseTimeHistogram an array of frequencies of response time (millis) in buckets with upper bounds
* as returned by {@link HandlingTimeTracker#getBucketUpperBounds()}, plus
* an extra bucket for handling response time larger than the longest upper bound (currently 65536ms).
*/
public record HttpRouteStats(
long requestCount,
long totalRequestSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.function.Consumer;

/**
* The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and
Expand Down Expand Up @@ -58,10 +57,6 @@ public interface ChunkedRestResponseBody extends Releasable {
*/
String getResponseContentTypeString();

default void setChunkedSizeListener(Consumer<Integer> sizeConsumer) {
throw new UnsupportedOperationException("not supported");
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a {@link ChunkedToXContent}.
*
Expand All @@ -80,9 +75,6 @@ static ChunkedRestResponseBody fromXContent(

return new ChunkedRestResponseBody() {

private int size = 0;
private Consumer<Integer> sizeConsumer = null;

private final OutputStream out = new OutputStream() {
@Override
public void write(int b) throws IOException {
Expand Down Expand Up @@ -110,12 +102,7 @@ public void write(byte[] b, int off, int len) throws IOException {

@Override
public boolean isDone() {
var result = serialization.hasNext() == false;
if (result && sizeConsumer != null) {
sizeConsumer.accept(size);
sizeConsumer = null;
}
return result;
return serialization.hasNext() == false;
}

@Override
Expand All @@ -138,7 +125,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
() -> Releasables.closeExpectNoException(chunkStream)
);
target = null;
size += result.length();
return result;
} finally {
if (target != null) {
Expand All @@ -155,10 +141,6 @@ public String getResponseContentTypeString() {
}

@Override
public void setChunkedSizeListener(Consumer<Integer> sizeConsumer) {
this.sizeConsumer = sizeConsumer;
}

public void close() {
Releasables.closeExpectNoException(releasable);
}
Expand All @@ -175,8 +157,6 @@ static ChunkedRestResponseBody fromTextChunks(
@Nullable Releasable releasable
) {
return new ChunkedRestResponseBody() {
private int size = 0;
private Consumer<Integer> sizeConsumer = null;
private RecyclerBytesStreamOutput currentOutput;
private final Writer writer = new OutputStreamWriter(new OutputStream() {
@Override
Expand Down Expand Up @@ -206,12 +186,7 @@ public void close() {

@Override
public boolean isDone() {
var result = chunkIterator.hasNext() == false;
if (result && sizeConsumer != null) {
sizeConsumer.accept(size);
sizeConsumer = null;
}
return result;
return chunkIterator.hasNext() == false;
}

@Override
Expand All @@ -236,7 +211,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
() -> Releasables.closeExpectNoException(chunkOutput)
);
currentOutput = null;
size += result.length();
return result;
} finally {
if (currentOutput != null) {
Expand All @@ -253,10 +227,6 @@ public String getResponseContentTypeString() {
}

@Override
public void setChunkedSizeListener(Consumer<Integer> sizeConsumer) {
this.sizeConsumer = sizeConsumer;
}

public void close() {
Releasables.closeExpectNoException(releasable);
}
Expand Down
44 changes: 43 additions & 1 deletion server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -50,6 +53,7 @@
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -779,7 +783,10 @@ public void sendResponse(RestResponse response) {
if (response.isChunked() == false) {
methodHandlers.addResponseStats(response.content().length());
} else {
response.chunkedContent().setChunkedSizeListener(methodHandlers::addResponseStats);
response = RestResponse.chunked(
response.status(),
new EncodeLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers::addResponseStats)
);
}
delegate.sendResponse(response);
success = true;
Expand All @@ -799,6 +806,41 @@ private void close() {
}
}

private static class EncodeLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody {

private final ChunkedRestResponseBody delegate;
private final Consumer<Integer> encodedLengthConsumer;
private int encodedLength = 0;

private EncodeLengthTrackingChunkedRestResponseBody(ChunkedRestResponseBody delegate, Consumer<Integer> encodedLengthConsumer) {
this.delegate = delegate;
this.encodedLengthConsumer = encodedLengthConsumer;
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
final ReleasableBytesReference bytesReference = delegate.encodeChunk(sizeHint, recycler);
encodedLength += bytesReference.length();
return bytesReference;
}

@Override
public String getResponseContentTypeString() {
return delegate.getResponseContentTypeString();
}

@Override
public void close() {
delegate.close();
encodedLengthConsumer.accept(encodedLength);
}
}

private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) {
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
Expand Down

0 comments on commit 5c1fdb7

Please sign in to comment.