Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create HttpRequest earlier in pipeline #56393

Merged
merged 24 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e75cd4f
Add basic memory controller
Tim-Brooks Apr 13, 2020
27980d9
Merge remote-tracking branch 'upstream/master' into http_incremental_…
Tim-Brooks May 5, 2020
23489cd
WIP
Tim-Brooks May 5, 2020
5c880dd
Merge remote-tracking branch 'upstream/master' into http_incremental_…
Tim-Brooks May 6, 2020
f17aadb
Changes
Tim-Brooks May 6, 2020
9bffa33
WIP
Tim-Brooks May 6, 2020
daba26c
Chnages
Tim-Brooks May 6, 2020
49cb92e
Changes
Tim-Brooks May 7, 2020
d0fea01
WIP
Tim-Brooks May 7, 2020
e8f6724
Changes
Tim-Brooks May 7, 2020
e0e2fdb
Merge remote-tracking branch 'upstream/master' into http_move_request…
Tim-Brooks May 7, 2020
df9992f
Remove
Tim-Brooks May 7, 2020
e28520c
Add exceptions
Tim-Brooks May 8, 2020
6375585
Merge remote-tracking branch 'upstream/master' into http_move_request…
Tim-Brooks May 8, 2020
fcbc059
Changes
Tim-Brooks May 8, 2020
c1d54d1
Chnage
Tim-Brooks May 8, 2020
362bf7a
Merge remote-tracking branch 'upstream/master' into http_move_request…
Tim-Brooks May 11, 2020
c58edda
Fix test
Tim-Brooks May 11, 2020
52ce9cd
Merge remote-tracking branch 'upstream/master' into http_move_request…
Tim-Brooks May 12, 2020
9637d41
Change
Tim-Brooks May 13, 2020
efea586
Merge remote-tracking branch 'upstream/master' into http_move_request…
Tim-Brooks May 13, 2020
6b6ed06
Merge remote-tracking branch 'upstream/master' into http_move_request…
Tim-Brooks May 14, 2020
a663e34
Merge remote-tracking branch 'upstream/master' into http_move_request…
Tim-Brooks May 15, 2020
754d367
Changes
Tim-Brooks May 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipelinedResponse;
import org.elasticsearch.http.HttpPipeliningAggregator;

import java.nio.channels.ClosedChannelException;
Expand All @@ -37,7 +37,7 @@
public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {

private final Logger logger;
private final HttpPipeliningAggregator<Netty4HttpResponse, ChannelPromise> aggregator;
private final HttpPipeliningAggregator<ChannelPromise> aggregator;

/**
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
Expand All @@ -53,20 +53,20 @@ public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest pipelinedRequest = aggregator.read(((Netty4HttpRequest) msg));
ctx.fireChannelRead(pipelinedRequest);
}

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();
Netty4HttpResponse response = (Netty4HttpResponse) msg;
assert msg instanceof HttpPipelinedResponse : "Invalid message type: " + msg.getClass();
HttpPipelinedResponse response = (HttpPipelinedResponse) msg;
boolean success = false;
try {
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1(), readyResponse.v2());
List<Tuple<HttpPipelinedResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<HttpPipelinedResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1().getDelegateRequest(), readyResponse.v2());
}
success = true;
} catch (IllegalStateException e) {
Expand All @@ -80,11 +80,11 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();
List<Tuple<HttpPipelinedResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();

if (inflightResponses.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
for (Tuple<HttpPipelinedResponse, ChannelPromise> inflightResponse : inflightResponses) {
try {
inflightResponse.v2().setFailure(closedChannelException);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,37 @@
import java.util.stream.Collectors;

public class Netty4HttpRequest implements HttpRequest {

private final FullHttpRequest request;
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final FullHttpRequest request;
private final Exception inboundException;
private final boolean pooled;
private final BytesReference content;

Netty4HttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
Netty4HttpRequest(FullHttpRequest request) {
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()));
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
Netty4HttpRequest(FullHttpRequest request, Exception inboundException) {
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()), inboundException);
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
BytesReference content) {
this(request, headers, released, pooled, content, null);
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
BytesReference content, Exception inboundException) {
this.request = request;
this.sequence = sequence;
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
this.inboundException = inboundException;
}

@Override
Expand Down Expand Up @@ -135,7 +146,7 @@ public HttpRequest releaseAndCopy() {
return new Netty4HttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
headers, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
} finally {
release();
}
Expand Down Expand Up @@ -179,7 +190,7 @@ public HttpRequest removeHeader(String header) {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), released,
pooled, content);
}

Expand All @@ -188,12 +199,18 @@ public Netty4HttpResponse createResponse(RestStatus status, BytesReference conte
return new Netty4HttpResponse(this, status, content);
}

public FullHttpRequest nettyRequest() {
return request;
@Override
public boolean hasInboundException() {
return inboundException != null;
}

@Override
public Exception getInboundException() {
return inboundException;
}

int sequence() {
return sequence;
public FullHttpRequest nettyRequest() {
return request;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;

import java.util.List;

class Netty4HttpRequestCreator extends MessageToMessageDecoder<FullHttpRequest> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make this @Sharable ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change for Netty module. The path is more complicated on the Nio side and I think this PR is kind of an intermediate step. So I want to hold off for now on Nio side.


@Override
protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List<Object> out) {
if (msg.decoderResult().isFailure()) {
final Throwable cause = msg.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
} else {
nonError = (Exception) cause;
}
out.add(new Netty4HttpRequest(msg.retain(), nonError));
} else {
out.add(new Netty4HttpRequest(msg.retain()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {

private final Netty4HttpServerTransport serverTransport;

Expand All @@ -36,23 +35,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
boolean success = false;
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
try {
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
serverTransport.incomingRequest(httpRequest, channel);
success = true;
} finally {
if (success == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpPipelinedMessage;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;

public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse, HttpPipelinedMessage {
public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse {

private final int sequence;
private final Netty4HttpRequest request;

Netty4HttpResponse(Netty4HttpRequest request, RestStatus status, BytesReference content) {
super(request.nettyRequest().protocolVersion(), HttpResponseStatus.valueOf(status.getStatus()), Netty4Utils.toByteBuf(content));
this.sequence = request.sequence();
this.request = request;
}

Expand All @@ -48,11 +45,6 @@ public boolean containsHeader(String name) {
return headers().contains(name);
}

@Override
public int getSequence() {
return sequence;
}

public Netty4HttpRequest getRequest() {
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ protected void initChannel(Channel ch) throws Exception {
if (handlingSettings.isCompression()) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addLast("request_creator", new Netty4HttpRequestCreator());
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
if (handlingSettings.isCorsEnabled()) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
Expand All @@ -33,6 +32,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.http.CorsHandler;
import org.elasticsearch.http.netty4.Netty4HttpRequest;
import org.elasticsearch.http.netty4.Netty4HttpResponse;

import java.util.Date;
Expand All @@ -52,7 +52,7 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
private static Pattern SCHEME_PATTERN = Pattern.compile("^https?://");

private final CorsHandler.Config config;
private FullHttpRequest request;
private Netty4HttpRequest request;

/**
* Creates a new instance with the specified {@link CorsHandler.Config}.
Expand All @@ -66,20 +66,20 @@ public Netty4CorsHandler(final CorsHandler.Config config) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
if (config.isCorsSupportEnabled()) {
request = (FullHttpRequest) msg;
if (isPreflightRequest(request)) {
request = (Netty4HttpRequest) msg;
if (isPreflightRequest(request.nettyRequest())) {
try {
handlePreflight(ctx, request);
handlePreflight(ctx, request.nettyRequest());
return;
} finally {
releaseRequest();
}
}
if (!validateOrigin()) {
try {
forbidden(ctx, request);
forbidden(ctx, request.nettyRequest());
return;
} finally {
releaseRequest();
Expand Down Expand Up @@ -167,7 +167,7 @@ private void setPreflightHeaders(final HttpResponse response) {
}

private boolean setOrigin(final HttpResponse response) {
final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
if (!Strings.isNullOrEmpty(origin)) {
if (config.isAnyOriginSupported()) {
if (config.isCredentialsAllowed()) {
Expand All @@ -192,22 +192,22 @@ private boolean validateOrigin() {
return true;
}

final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
if (Strings.isNullOrEmpty(origin)) {
// Not a CORS request so we cannot validate it. It may be a non CORS request.
return true;
}

// if the origin is the same as the host of the request, then allow
if (isSameOrigin(origin, request.headers().get(HttpHeaderNames.HOST))) {
if (isSameOrigin(origin, request.nettyRequest().headers().get(HttpHeaderNames.HOST))) {
return true;
}

return config.isOriginAllowed(origin);
}

private void echoRequestOrigin(final HttpResponse response) {
setOrigin(response, request.headers().get(HttpHeaderNames.ORIGIN));
setOrigin(response, request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN));
}

private static void setVaryHeader(final HttpResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private FullHttpResponse executeRequest(final Settings settings, final String or
httpRequest.headers().add(HttpHeaderNames.HOST, host);
EmbeddedChannel embeddedChannel = new EmbeddedChannel();
embeddedChannel.pipeline().addLast(new Netty4CorsHandler(CorsHandler.fromSettings(settings)));
Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest, 0);
Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest);
embeddedChannel.writeOutbound(nettyRequest.createResponse(RestStatus.OK, new BytesArray("content")));
return embeddedChannel.readOutbound();
}
Expand Down
Loading