Skip to content

Commit

Permalink
Stop Copying Every Http Request in Message Handler (elastic#44564)
Browse files Browse the repository at this point in the history
* Copying the request is not necessary here. We can simply release it once the response has been generated and a lot of `Unpooled` allocations that way
* Relates elastic#32228
   * I think the issue that preventet that PR  that PR from being merged was solved by elastic#39634 that moved the bulk index marker search to ByteBuf bulk access so the composite buffer shouldn't require many additional bounds checks  (I'd argue the bounds checks we add, we save when copying the composite buffer)
* I couldn't neccessarily reproduce much of a speedup from this change, but I could reproduce a very measureable reduction in GC time with e.g. Rally's PMC (4g heap node and bulk requests of size 5k saw a reduction in young GC time by ~10% for me)
  • Loading branch information
original-brownbear authored and SivagurunathanV committed Jan 21, 2020
1 parent a0bc152 commit f73bc2d
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
Expand All @@ -28,7 +30,6 @@
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -41,23 +42,30 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class Netty4HttpRequest implements HttpRequest {
private final FullHttpRequest request;
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final FullHttpRequest request;
private final boolean pooled;
private final BytesReference content;

Netty4HttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()));
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
BytesReference content) {
this.request = request;
headers = new HttpHeadersMap(request.headers());
this.sequence = sequence;
if (request.content().isReadable()) {
this.content = Netty4Utils.toBytesReference(request.content());
} else {
this.content = BytesArray.EMPTY;
}
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
}

@Override
Expand Down Expand Up @@ -105,9 +113,33 @@ public String uri() {

@Override
public BytesReference content() {
assert released.get() == false;
return content;
}

@Override
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
}
}

@Override
public HttpRequest releaseAndCopy() {
assert released.get() == false;
if (pooled == false) {
return this;
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
return new Netty4HttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
} finally {
release();
}
}

@Override
public final Map<String, List<String>> getHeaders() {
Expand Down Expand Up @@ -147,7 +179,8 @@ public HttpRequest removeHeader(String header) {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new Netty4HttpRequest(requestWithoutHeader, sequence);
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
pooled, content);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
Expand All @@ -41,32 +39,25 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
final FullHttpRequest copiedRequest;
boolean success = false;
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
try {
copiedRequest =
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());
} finally {
// As we have copied the buffer, we can release the request
request.release();
}
Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
serverTransport.incomingRequest(httpRequest, channel);
}
success = true;
} finally {
if (success == false) {
httpRequest.release();
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ public BytesReference content() {
return content;
}

@Override
public void release() {
// NioHttpRequest works from copied unpooled bytes no need to release anything
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}

@Override
public final Map<String, List<String>> getHeaders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected BytesStreamOutput newBytesOutput() {

@Override
public void sendResponse(RestResponse restResponse) {
final ArrayList<Releasable> toClose = new ArrayList<>(3);
final ArrayList<Releasable> toClose = new ArrayList<>(4);
toClose.add(httpRequest::release);
if (isCloseConnection()) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/http/HttpRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,16 @@ enum HttpVersion {
*/
HttpResponse createResponse(RestStatus status, BytesReference content);

/**
* Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()}
* after this method has been invoked is undefined and implementation specific.
*/
void release();

/**
* If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases
* any resources associated with this instance. If the instance does not use any shared resources, returns itself.
* @return a safe unpooled http request
*/
HttpRequest releaseAndCopy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
}
handler.handleRequest(request, responseChannel, client);
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ default boolean canTripCircuitBreaker() {
default boolean supportsContentStream() {
return false;
}

/**
* Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return
* {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the
* {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers
* {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}.
*
* @return true iff the handler supports requests that make use of pooled buffers
*/
default boolean allowsUnsafeBuffers() {
return false;
}
}
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params {
private final String rawPath;
private final Set<String> consumedParams = new HashSet<>();
private final SetOnce<XContentType> xContentType = new SetOnce<>();
private final HttpRequest httpRequest;
private final HttpChannel httpChannel;

private HttpRequest httpRequest;

private boolean contentConsumed = false;

public boolean isContentConsumed() {
Expand Down Expand Up @@ -97,6 +98,15 @@ protected RestRequest(RestRequest restRequest) {
restRequest.getHttpRequest(), restRequest.getHttpChannel());
}

/**
* Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request
* with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely
* handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}.
*/
void ensureSafeBuffers() {
httpRequest = httpRequest.releaseAndCopy();
}

/**
* Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be
* decoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
public boolean supportsContentStream() {
return true;
}

@Override
public boolean allowsUnsafeBuffers() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,9 @@ public static void checkRestTotalHits(RestRequest restRequest, SearchRequest sea
protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}

@Override
public boolean allowsUnsafeBuffers() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ public HttpRequest removeHeader(String header) {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return new TestResponse(status, content);
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}

private static class TestResponse implements HttpResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,15 @@ public HttpRequest removeHeader(String header) {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return null;
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}, null);

final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ public boolean containsHeader(String name) {
}
};
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}

private static class FakeHttpChannel implements HttpChannel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public boolean supportsContentStream() {
return restHandler.supportsContentStream();
}

@Override
public boolean allowsUnsafeBuffers() {
return restHandler.allowsUnsafeBuffers();
}

private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException {
if (restHandler instanceof RestRequestFilter) {
return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest);
Expand Down

0 comments on commit f73bc2d

Please sign in to comment.