From f73bc2d307cea851b407a0e8310d58321df53266 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 3 Dec 2019 22:44:20 +0100 Subject: [PATCH] Stop Copying Every Http Request in Message Handler (#44564) * Copying the request is not necessary here. We can simply release it once the response has been generated and a lot of `Unpooled` allocations that way * Relates #32228 * I think the issue that preventet that PR that PR from being merged was solved by #39634 that moved the bulk index marker search to ByteBuf bulk access so the composite buffer shouldn't require many additional bounds checks (I'd argue the bounds checks we add, we save when copying the composite buffer) * I couldn't neccessarily reproduce much of a speedup from this change, but I could reproduce a very measureable reduction in GC time with e.g. Rally's PMC (4g heap node and bulk requests of size 5k saw a reduction in young GC time by ~10% for me) --- .../http/netty4/Netty4HttpRequest.java | 53 +++++++++++++++---- .../http/netty4/Netty4HttpRequestHandler.java | 41 ++++++-------- .../http/nio/NioHttpRequest.java | 9 ++++ .../http/DefaultRestChannel.java | 3 +- .../org/elasticsearch/http/HttpRequest.java | 12 +++++ .../elasticsearch/rest/RestController.java | 4 ++ .../org/elasticsearch/rest/RestHandler.java | 12 +++++ .../org/elasticsearch/rest/RestRequest.java | 12 ++++- .../rest/action/document/RestBulkAction.java | 5 ++ .../rest/action/search/RestSearchAction.java | 5 ++ .../http/DefaultRestChannelTests.java | 9 ++++ .../rest/RestControllerTests.java | 9 ++++ .../test/rest/FakeRestRequest.java | 9 ++++ .../security/rest/SecurityRestFilter.java | 5 ++ 14 files changed, 151 insertions(+), 37 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index ffabe5cbbe224..e0ad3007c98a9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.http.netty4; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; @@ -28,7 +30,6 @@ import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.rest.RestRequest; @@ -41,23 +42,30 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class Netty4HttpRequest implements HttpRequest { - private final FullHttpRequest request; - private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; + private final AtomicBoolean released; + private final FullHttpRequest request; + private final boolean pooled; + private final BytesReference content; Netty4HttpRequest(FullHttpRequest request, int sequence) { + this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true, + Netty4Utils.toBytesReference(request.content())); + } + + private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, + BytesReference content) { this.request = request; - headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; - if (request.content().isReadable()) { - this.content = Netty4Utils.toBytesReference(request.content()); - } else { - this.content = BytesArray.EMPTY; - } + this.headers = headers; + this.content = content; + this.pooled = pooled; + this.released = released; } @Override @@ -105,9 +113,33 @@ public String uri() { @Override public BytesReference content() { + assert released.get() == false; return content; } + @Override + public void release() { + if (pooled && released.compareAndSet(false, true)) { + request.release(); + } + } + + @Override + public HttpRequest releaseAndCopy() { + assert released.get() == false; + if (pooled == false) { + return this; + } + try { + final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); + return new Netty4HttpRequest( + new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), + request.trailingHeaders()), + headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent)); + } finally { + release(); + } + } @Override public final Map> getHeaders() { @@ -147,7 +179,8 @@ public HttpRequest removeHeader(String header) { trailingHeaders.remove(header); FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), request.content(), headersWithoutContentTypeHeader, trailingHeaders); - return new Netty4HttpRequest(requestWithoutHeader, sequence); + return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released, + pooled, content); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index ad6d84dfcb499..7e7f45ef92e2e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -19,11 +19,9 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.http.HttpPipelinedRequest; @@ -41,32 +39,25 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) { Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); - final FullHttpRequest copiedRequest; + boolean success = false; + Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence()); try { - copiedRequest = - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - Unpooled.copiedBuffer(request.content()), - request.headers(), - request.trailingHeaders()); - } finally { - // As we have copied the buffer, we can release the request - request.release(); - } - Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence()); - - if (request.decoderResult().isFailure()) { - Throwable cause = request.decoderResult().cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause)); + if (request.decoderResult().isFailure()) { + Throwable cause = request.decoderResult().cause(); + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause)); + } else { + serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause); + } } else { - serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause); + serverTransport.incomingRequest(httpRequest, channel); + } + success = true; + } finally { + if (success == false) { + httpRequest.release(); } - } else { - serverTransport.incomingRequest(httpRequest, channel); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index 08937593f3ba6..8e17d37699cdd 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -108,6 +108,15 @@ public BytesReference content() { return content; } + @Override + public void release() { + // NioHttpRequest works from copied unpooled bytes no need to release anything + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } @Override public final Map> getHeaders() { diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index 098a01410897c..98f578bb3412c 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -77,7 +77,8 @@ protected BytesStreamOutput newBytesOutput() { @Override public void sendResponse(RestResponse restResponse) { - final ArrayList toClose = new ArrayList<>(3); + final ArrayList toClose = new ArrayList<>(4); + toClose.add(httpRequest::release); if (isCloseConnection()) { toClose.add(() -> CloseableChannel.closeChannel(httpChannel)); } diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index 02a3a58d1702d..4d67078fe571a 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -68,4 +68,16 @@ enum HttpVersion { */ HttpResponse createResponse(RestStatus status, BytesReference content); + /** + * Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()} + * after this method has been invoked is undefined and implementation specific. + */ + void release(); + + /** + * If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases + * any resources associated with this instance. If the instance does not use any shared resources, returns itself. + * @return a safe unpooled http request + */ + HttpRequest releaseAndCopy(); } diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 1c174c89df608..aa39ccdc4659c 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -218,6 +218,10 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl } // iff we could reserve bytes for the request we need to send the response also over this channel responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + // TODO: Count requests double in the circuit breaker if they need copying? + if (handler.allowsUnsafeBuffers() == false) { + request.ensureSafeBuffers(); + } handler.handleRequest(request, responseChannel, client); } catch (Exception e) { responseChannel.sendResponse(new BytesRestResponse(responseChannel, e)); diff --git a/server/src/main/java/org/elasticsearch/rest/RestHandler.java b/server/src/main/java/org/elasticsearch/rest/RestHandler.java index 1ebc7a7fd1bd2..605dd41078a54 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/RestHandler.java @@ -47,4 +47,16 @@ default boolean canTripCircuitBreaker() { default boolean supportsContentStream() { return false; } + + /** + * Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return + * {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the + * {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers + * {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}. + * + * @return true iff the handler supports requests that make use of pooled buffers + */ + default boolean allowsUnsafeBuffers() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 23e72d0c1f1d5..4a8fb44fc4299 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params { private final String rawPath; private final Set consumedParams = new HashSet<>(); private final SetOnce xContentType = new SetOnce<>(); - private final HttpRequest httpRequest; private final HttpChannel httpChannel; + private HttpRequest httpRequest; + private boolean contentConsumed = false; public boolean isContentConsumed() { @@ -97,6 +98,15 @@ protected RestRequest(RestRequest restRequest) { restRequest.getHttpRequest(), restRequest.getHttpChannel()); } + /** + * Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request + * with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely + * handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}. + */ + void ensureSafeBuffers() { + httpRequest = httpRequest.releaseAndCopy(); + } + /** * Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be * decoded diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 43cd684bd47a7..db925cd663ff9 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -86,4 +86,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC public boolean supportsContentStream() { return true; } + + @Override + public boolean allowsUnsafeBuffers() { + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index c68c31473d1bc..11dc9f89de532 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -310,4 +310,9 @@ public static void checkRestTotalHits(RestRequest restRequest, SearchRequest sea protected Set responseParams() { return RESPONSE_PARAMS; } + + @Override + public boolean allowsUnsafeBuffers() { + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 85670e893b970..d671b81a09bc9 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -460,6 +460,15 @@ public HttpRequest removeHeader(String header) { public HttpResponse createResponse(RestStatus status, BytesReference content) { return new TestResponse(status, content); } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } } private static class TestResponse implements HttpResponse { diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index eb3d76bc82ae2..8413864d2ea26 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -608,6 +608,15 @@ public HttpRequest removeHeader(String header) { public HttpResponse createResponse(RestStatus status, BytesReference content) { return null; } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } }, null); final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index a659d6af5c6aa..2f2f5fb76bfe7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -113,6 +113,15 @@ public boolean containsHeader(String name) { } }; } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } } private static class FakeHttpChannel implements HttpChannel { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java index df678f9c63ba4..4131d1e735883 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java @@ -86,6 +86,11 @@ public boolean supportsContentStream() { return restHandler.supportsContentStream(); } + @Override + public boolean allowsUnsafeBuffers() { + return restHandler.allowsUnsafeBuffers(); + } + private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException { if (restHandler instanceof RestRequestFilter) { return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest);