Skip to content

Commit

Permalink
Create HttpRequest earlier in pipeline (#56393)
Browse files Browse the repository at this point in the history
Elasticsearch requires that a HttpRequest abstraction be implemented
by http modules before server processing. This abstraction controls when
underlying resources are released. This commit moves this abstraction to
be created immediately after content aggregation. This change will
enable follow-up work including moving Cors logic into the server
package and tracking bytes as they are aggregated from the network
level.
  • Loading branch information
Tim-Brooks committed May 18, 2020
1 parent 22f54ba commit 57c3a61
Show file tree
Hide file tree
Showing 29 changed files with 458 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpRequest;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipelinedResponse;
import org.elasticsearch.http.HttpPipeliningAggregator;

import java.nio.channels.ClosedChannelException;
Expand All @@ -37,7 +37,7 @@
public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {

private final Logger logger;
private final HttpPipeliningAggregator<Netty4HttpResponse, ChannelPromise> aggregator;
private final HttpPipeliningAggregator<ChannelPromise> aggregator;

/**
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
Expand All @@ -53,20 +53,20 @@ public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest pipelinedRequest = aggregator.read(((Netty4HttpRequest) msg));
ctx.fireChannelRead(pipelinedRequest);
}

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();
Netty4HttpResponse response = (Netty4HttpResponse) msg;
assert msg instanceof HttpPipelinedResponse : "Invalid message type: " + msg.getClass();
HttpPipelinedResponse response = (HttpPipelinedResponse) msg;
boolean success = false;
try {
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1(), readyResponse.v2());
List<Tuple<HttpPipelinedResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<HttpPipelinedResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1().getDelegateRequest(), readyResponse.v2());
}
success = true;
} catch (IllegalStateException e) {
Expand All @@ -80,11 +80,11 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();
List<Tuple<HttpPipelinedResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();

if (inflightResponses.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
for (Tuple<HttpPipelinedResponse, ChannelPromise> inflightResponse : inflightResponses) {
try {
inflightResponse.v2().setFailure(closedChannelException);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,37 @@
import java.util.stream.Collectors;

public class Netty4HttpRequest implements HttpRequest {

private final FullHttpRequest request;
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final FullHttpRequest request;
private final Exception inboundException;
private final boolean pooled;
private final BytesReference content;

Netty4HttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
Netty4HttpRequest(FullHttpRequest request) {
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()));
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
Netty4HttpRequest(FullHttpRequest request, Exception inboundException) {
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()), inboundException);
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
BytesReference content) {
this(request, headers, released, pooled, content, null);
}

private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
BytesReference content, Exception inboundException) {
this.request = request;
this.sequence = sequence;
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
this.inboundException = inboundException;
}

@Override
Expand Down Expand Up @@ -135,7 +146,7 @@ public HttpRequest releaseAndCopy() {
return new Netty4HttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
headers, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
} finally {
release();
}
Expand Down Expand Up @@ -179,7 +190,7 @@ public HttpRequest removeHeader(String header) {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), released,
pooled, content);
}

Expand All @@ -188,12 +199,13 @@ public Netty4HttpResponse createResponse(RestStatus status, BytesReference conte
return new Netty4HttpResponse(this, status, content);
}

public FullHttpRequest nettyRequest() {
return request;
@Override
public Exception getInboundException() {
return inboundException;
}

int sequence() {
return sequence;
public FullHttpRequest nettyRequest() {
return request;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;

import java.util.List;

@ChannelHandler.Sharable
class Netty4HttpRequestCreator extends MessageToMessageDecoder<FullHttpRequest> {

@Override
protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List<Object> out) {
if (msg.decoderResult().isFailure()) {
final Throwable cause = msg.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
} else {
nonError = (Exception) cause;
}
out.add(new Netty4HttpRequest(msg.retain(), nonError));
} else {
out.add(new Netty4HttpRequest(msg.retain()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;

@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {

private final Netty4HttpServerTransport serverTransport;

Expand All @@ -36,23 +35,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
boolean success = false;
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
try {
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
serverTransport.incomingRequest(httpRequest, channel);
success = true;
} finally {
if (success == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpPipelinedMessage;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;

public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse, HttpPipelinedMessage {
public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse {

private final int sequence;
private final Netty4HttpRequest request;

Netty4HttpResponse(Netty4HttpRequest request, RestStatus status, BytesReference content) {
super(request.nettyRequest().protocolVersion(), HttpResponseStatus.valueOf(status.getStatus()), Netty4Utils.toByteBuf(content));
this.sequence = request.sequence();
this.request = request;
}

Expand All @@ -48,11 +45,6 @@ public boolean containsHeader(String name) {
return headers().contains(name);
}

@Override
public int getSequence() {
return sequence;
}

public Netty4HttpRequest getRequest() {
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,14 @@ public ChannelHandler configureServerChannelHandler() {
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {

private final Netty4HttpServerTransport transport;
private final Netty4HttpRequestCreator requestCreator;
private final Netty4HttpRequestHandler requestHandler;
private final HttpHandlingSettings handlingSettings;

protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.requestCreator = new Netty4HttpRequestCreator();
this.requestHandler = new Netty4HttpRequestHandler(transport);
}

Expand All @@ -311,6 +313,7 @@ protected void initChannel(Channel ch) throws Exception {
if (handlingSettings.isCompression()) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addLast("request_creator", requestCreator);
if (handlingSettings.isCorsEnabled()) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
Expand All @@ -33,6 +32,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.http.CorsHandler;
import org.elasticsearch.http.netty4.Netty4HttpRequest;
import org.elasticsearch.http.netty4.Netty4HttpResponse;

import java.util.Date;
Expand All @@ -52,7 +52,7 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
private static Pattern SCHEME_PATTERN = Pattern.compile("^https?://");

private final CorsHandler.Config config;
private FullHttpRequest request;
private Netty4HttpRequest request;

/**
* Creates a new instance with the specified {@link CorsHandler.Config}.
Expand All @@ -66,20 +66,20 @@ public Netty4CorsHandler(final CorsHandler.Config config) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
if (config.isCorsSupportEnabled()) {
request = (FullHttpRequest) msg;
if (isPreflightRequest(request)) {
request = (Netty4HttpRequest) msg;
if (isPreflightRequest(request.nettyRequest())) {
try {
handlePreflight(ctx, request);
handlePreflight(ctx, request.nettyRequest());
return;
} finally {
releaseRequest();
}
}
if (!validateOrigin()) {
try {
forbidden(ctx, request);
forbidden(ctx, request.nettyRequest());
return;
} finally {
releaseRequest();
Expand Down Expand Up @@ -167,7 +167,7 @@ private void setPreflightHeaders(final HttpResponse response) {
}

private boolean setOrigin(final HttpResponse response) {
final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
if (!Strings.isNullOrEmpty(origin)) {
if (config.isAnyOriginSupported()) {
if (config.isCredentialsAllowed()) {
Expand All @@ -192,22 +192,22 @@ private boolean validateOrigin() {
return true;
}

final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
if (Strings.isNullOrEmpty(origin)) {
// Not a CORS request so we cannot validate it. It may be a non CORS request.
return true;
}

// if the origin is the same as the host of the request, then allow
if (isSameOrigin(origin, request.headers().get(HttpHeaderNames.HOST))) {
if (isSameOrigin(origin, request.nettyRequest().headers().get(HttpHeaderNames.HOST))) {
return true;
}

return config.isOriginAllowed(origin);
}

private void echoRequestOrigin(final HttpResponse response) {
setOrigin(response, request.headers().get(HttpHeaderNames.ORIGIN));
setOrigin(response, request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN));
}

private static void setVaryHeader(final HttpResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private FullHttpResponse executeRequest(final Settings settings, final String or
httpRequest.headers().add(HttpHeaderNames.HOST, host);
EmbeddedChannel embeddedChannel = new EmbeddedChannel();
embeddedChannel.pipeline().addLast(new Netty4CorsHandler(CorsHandler.fromSettings(settings)));
Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest, 0);
Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest);
embeddedChannel.writeOutbound(nettyRequest.createResponse(RestStatus.OK, new BytesArray("content")));
return embeddedChannel.readOutbound();
}
Expand Down
Loading

0 comments on commit 57c3a61

Please sign in to comment.