Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop Copying Every Http Request in Message Handler (#44564) #49809

Merged
merged 1 commit into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
Expand All @@ -28,7 +30,6 @@
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -41,23 +42,30 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class Netty4HttpRequest implements HttpRequest {
private final FullHttpRequest request;
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final FullHttpRequest request;
private final boolean pooled;
private final BytesReference content;

Netty4HttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()));
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
BytesReference content) {
this.request = request;
headers = new HttpHeadersMap(request.headers());
this.sequence = sequence;
if (request.content().isReadable()) {
this.content = Netty4Utils.toBytesReference(request.content());
} else {
this.content = BytesArray.EMPTY;
}
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
}

@Override
Expand Down Expand Up @@ -105,9 +113,33 @@ public String uri() {

@Override
public BytesReference content() {
assert released.get() == false;
return content;
}

@Override
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
}
}

@Override
public HttpRequest releaseAndCopy() {
assert released.get() == false;
if (pooled == false) {
return this;
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
return new Netty4HttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
} finally {
release();
}
}

@Override
public final Map<String, List<String>> getHeaders() {
Expand Down Expand Up @@ -147,7 +179,8 @@ public HttpRequest removeHeader(String header) {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new Netty4HttpRequest(requestWithoutHeader, sequence);
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
pooled, content);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
Expand All @@ -41,32 +39,25 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
final FullHttpRequest copiedRequest;
boolean success = false;
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
try {
copiedRequest =
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());
} finally {
// As we have copied the buffer, we can release the request
request.release();
}
Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
serverTransport.incomingRequest(httpRequest, channel);
}
success = true;
} finally {
if (success == false) {
httpRequest.release();
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ public BytesReference content() {
return content;
}

@Override
public void release() {
// NioHttpRequest works from copied unpooled bytes no need to release anything
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}

@Override
public final Map<String, List<String>> getHeaders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected BytesStreamOutput newBytesOutput() {

@Override
public void sendResponse(RestResponse restResponse) {
final ArrayList<Releasable> toClose = new ArrayList<>(3);
final ArrayList<Releasable> toClose = new ArrayList<>(4);
toClose.add(httpRequest::release);
if (isCloseConnection()) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/http/HttpRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,16 @@ enum HttpVersion {
*/
HttpResponse createResponse(RestStatus status, BytesReference content);

/**
* Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()}
* after this method has been invoked is undefined and implementation specific.
*/
void release();

/**
* If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases
* any resources associated with this instance. If the instance does not use any shared resources, returns itself.
* @return a safe unpooled http request
*/
HttpRequest releaseAndCopy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
}
handler.handleRequest(request, responseChannel, client);
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ default boolean canTripCircuitBreaker() {
default boolean supportsContentStream() {
return false;
}

/**
* Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return
* {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the
* {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers
* {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}.
*
* @return true iff the handler supports requests that make use of pooled buffers
*/
default boolean allowsUnsafeBuffers() {
return false;
}
}
12 changes: 11 additions & 1 deletion server/src/main/java/org/elasticsearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params {
private final String rawPath;
private final Set<String> consumedParams = new HashSet<>();
private final SetOnce<XContentType> xContentType = new SetOnce<>();
private final HttpRequest httpRequest;
private final HttpChannel httpChannel;

private HttpRequest httpRequest;

private boolean contentConsumed = false;

public boolean isContentConsumed() {
Expand Down Expand Up @@ -97,6 +98,15 @@ protected RestRequest(RestRequest restRequest) {
restRequest.getHttpRequest(), restRequest.getHttpChannel());
}

/**
* Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request
* with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely
* handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}.
*/
void ensureSafeBuffers() {
httpRequest = httpRequest.releaseAndCopy();
}

/**
* Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be
* decoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
public boolean supportsContentStream() {
return true;
}

@Override
public boolean allowsUnsafeBuffers() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,9 @@ public static void checkRestTotalHits(RestRequest restRequest, SearchRequest sea
protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}

@Override
public boolean allowsUnsafeBuffers() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,15 @@ public HttpRequest removeHeader(String header) {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return new TestResponse(status, content);
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}

private static class TestResponse implements HttpResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,15 @@ public HttpRequest removeHeader(String header) {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return null;
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}, null);

final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ public boolean containsHeader(String name) {
}
};
}

@Override
public void release() {
}

@Override
public HttpRequest releaseAndCopy() {
return this;
}
}

private static class FakeHttpChannel implements HttpChannel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public boolean supportsContentStream() {
return restHandler.supportsContentStream();
}

@Override
public boolean allowsUnsafeBuffers() {
return restHandler.allowsUnsafeBuffers();
}

private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException {
if (restHandler instanceof RestRequestFilter) {
return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest);
Expand Down