diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 827963a2332c9..ef8a7b4d96343 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -22,10 +22,10 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.FullHttpRequest; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpPipelinedResponse; import org.elasticsearch.http.HttpPipeliningAggregator; import java.nio.channels.ClosedChannelException; @@ -37,7 +37,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { private final Logger logger; - private final HttpPipeliningAggregator aggregator; + private final HttpPipeliningAggregator aggregator; /** * Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation. @@ -53,20 +53,20 @@ public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass(); - HttpPipelinedRequest pipelinedRequest = aggregator.read(((FullHttpRequest) msg)); + assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass(); + HttpPipelinedRequest pipelinedRequest = aggregator.read(((Netty4HttpRequest) msg)); ctx.fireChannelRead(pipelinedRequest); } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass(); - Netty4HttpResponse response = (Netty4HttpResponse) msg; + assert msg instanceof HttpPipelinedResponse : "Invalid message type: " + msg.getClass(); + HttpPipelinedResponse response = (HttpPipelinedResponse) msg; boolean success = false; try { - List> readyResponses = aggregator.write(response, promise); - for (Tuple readyResponse : readyResponses) { - ctx.write(readyResponse.v1(), readyResponse.v2()); + List> readyResponses = aggregator.write(response, promise); + for (Tuple readyResponse : readyResponses) { + ctx.write(readyResponse.v1().getDelegateRequest(), readyResponse.v2()); } success = true; } catch (IllegalStateException e) { @@ -80,11 +80,11 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) { - List> inflightResponses = aggregator.removeAllInflightResponses(); + List> inflightResponses = aggregator.removeAllInflightResponses(); if (inflightResponses.isEmpty() == false) { ClosedChannelException closedChannelException = new ClosedChannelException(); - for (Tuple inflightResponse : inflightResponses) { + for (Tuple inflightResponse : inflightResponses) { try { inflightResponse.v2().setFailure(closedChannelException); } catch (RuntimeException e) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index e0ad3007c98a9..f988f353db0c1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -46,26 +46,37 @@ import java.util.stream.Collectors; public class Netty4HttpRequest implements HttpRequest { + + private final FullHttpRequest request; + private final BytesReference content; private final HttpHeadersMap headers; - private final int sequence; private final AtomicBoolean released; - private final FullHttpRequest request; + private final Exception inboundException; private final boolean pooled; - private final BytesReference content; - Netty4HttpRequest(FullHttpRequest request, int sequence) { - this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true, + Netty4HttpRequest(FullHttpRequest request) { + this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content())); } - private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, + Netty4HttpRequest(FullHttpRequest request, Exception inboundException) { + this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true, + Netty4Utils.toBytesReference(request.content()), inboundException); + } + + private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled, BytesReference content) { + this(request, headers, released, pooled, content, null); + } + + private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled, + BytesReference content, Exception inboundException) { this.request = request; - this.sequence = sequence; this.headers = headers; this.content = content; this.pooled = pooled; this.released = released; + this.inboundException = inboundException; } @Override @@ -135,7 +146,7 @@ public HttpRequest releaseAndCopy() { return new Netty4HttpRequest( new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), request.trailingHeaders()), - headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent)); + headers, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent)); } finally { release(); } @@ -179,7 +190,7 @@ public HttpRequest removeHeader(String header) { trailingHeaders.remove(header); FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), request.content(), headersWithoutContentTypeHeader, trailingHeaders); - return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released, + return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), released, pooled, content); } @@ -188,12 +199,13 @@ public Netty4HttpResponse createResponse(RestStatus status, BytesReference conte return new Netty4HttpResponse(this, status, content); } - public FullHttpRequest nettyRequest() { - return request; + @Override + public Exception getInboundException() { + return inboundException; } - int sequence() { - return sequence; + public FullHttpRequest nettyRequest() { + return request; } /** diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestCreator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestCreator.java new file mode 100644 index 0000000000000..2960d75367fe4 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestCreator.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.FullHttpRequest; +import org.elasticsearch.ExceptionsHelper; + +import java.util.List; + +@ChannelHandler.Sharable +class Netty4HttpRequestCreator extends MessageToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List out) { + if (msg.decoderResult().isFailure()) { + final Throwable cause = msg.decoderResult().cause(); + final Exception nonError; + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + nonError = new Exception(cause); + } else { + nonError = (Exception) cause; + } + out.add(new Netty4HttpRequest(msg.retain(), nonError)); + } else { + out.add(new Netty4HttpRequest(msg.retain())); + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 7e7f45ef92e2e..033ac21ad2fca 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -22,12 +22,11 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.FullHttpRequest; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.http.HttpPipelinedRequest; @ChannelHandler.Sharable -class Netty4HttpRequestHandler extends SimpleChannelInboundHandler> { +class Netty4HttpRequestHandler extends SimpleChannelInboundHandler { private final Netty4HttpServerTransport serverTransport; @@ -36,23 +35,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) { - Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); - FullHttpRequest request = msg.getRequest(); + protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) { + final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); boolean success = false; - Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence()); try { - if (request.decoderResult().isFailure()) { - Throwable cause = request.decoderResult().cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause)); - } else { - serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause); - } - } else { - serverTransport.incomingRequest(httpRequest, channel); - } + serverTransport.incomingRequest(httpRequest, channel); success = true; } finally { if (success == false) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java index 68c1d31278091..fec796cf69012 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java @@ -22,19 +22,16 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.http.HttpPipelinedMessage; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.netty4.Netty4Utils; -public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse, HttpPipelinedMessage { +public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse { - private final int sequence; private final Netty4HttpRequest request; Netty4HttpResponse(Netty4HttpRequest request, RestStatus status, BytesReference content) { super(request.nettyRequest().protocolVersion(), HttpResponseStatus.valueOf(status.getStatus()), Netty4Utils.toByteBuf(content)); - this.sequence = request.sequence(); this.request = request; } @@ -48,11 +45,6 @@ public boolean containsHeader(String name) { return headers().contains(name); } - @Override - public int getSequence() { - return sequence; - } - public Netty4HttpRequest getRequest() { return request; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index e5c98b8cfb461..137ff735141d2 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -283,12 +283,14 @@ public ChannelHandler configureServerChannelHandler() { protected static class HttpChannelHandler extends ChannelInitializer { private final Netty4HttpServerTransport transport; + private final Netty4HttpRequestCreator requestCreator; private final Netty4HttpRequestHandler requestHandler; private final HttpHandlingSettings handlingSettings; protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) { this.transport = transport; this.handlingSettings = handlingSettings; + this.requestCreator = new Netty4HttpRequestCreator(); this.requestHandler = new Netty4HttpRequestHandler(transport); } @@ -311,6 +313,7 @@ protected void initChannel(Channel ch) throws Exception { if (handlingSettings.isCompression()) { ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); } + ch.pipeline().addLast("request_creator", requestCreator); if (handlingSettings.isCorsEnabled()) { ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig)); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/cors/Netty4CorsHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/cors/Netty4CorsHandler.java index 1855d7c8b757e..abf945bd934a4 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/cors/Netty4CorsHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/cors/Netty4CorsHandler.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; @@ -33,6 +32,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import org.elasticsearch.common.Strings; import org.elasticsearch.http.CorsHandler; +import org.elasticsearch.http.netty4.Netty4HttpRequest; import org.elasticsearch.http.netty4.Netty4HttpResponse; import java.util.Date; @@ -52,7 +52,7 @@ public class Netty4CorsHandler extends ChannelDuplexHandler { private static Pattern SCHEME_PATTERN = Pattern.compile("^https?://"); private final CorsHandler.Config config; - private FullHttpRequest request; + private Netty4HttpRequest request; /** * Creates a new instance with the specified {@link CorsHandler.Config}. @@ -66,12 +66,12 @@ public Netty4CorsHandler(final CorsHandler.Config config) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass(); + assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass(); if (config.isCorsSupportEnabled()) { - request = (FullHttpRequest) msg; - if (isPreflightRequest(request)) { + request = (Netty4HttpRequest) msg; + if (isPreflightRequest(request.nettyRequest())) { try { - handlePreflight(ctx, request); + handlePreflight(ctx, request.nettyRequest()); return; } finally { releaseRequest(); @@ -79,7 +79,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (!validateOrigin()) { try { - forbidden(ctx, request); + forbidden(ctx, request.nettyRequest()); return; } finally { releaseRequest(); @@ -167,7 +167,7 @@ private void setPreflightHeaders(final HttpResponse response) { } private boolean setOrigin(final HttpResponse response) { - final String origin = request.headers().get(HttpHeaderNames.ORIGIN); + final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN); if (!Strings.isNullOrEmpty(origin)) { if (config.isAnyOriginSupported()) { if (config.isCredentialsAllowed()) { @@ -192,14 +192,14 @@ private boolean validateOrigin() { return true; } - final String origin = request.headers().get(HttpHeaderNames.ORIGIN); + final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN); if (Strings.isNullOrEmpty(origin)) { // Not a CORS request so we cannot validate it. It may be a non CORS request. return true; } // if the origin is the same as the host of the request, then allow - if (isSameOrigin(origin, request.headers().get(HttpHeaderNames.HOST))) { + if (isSameOrigin(origin, request.nettyRequest().headers().get(HttpHeaderNames.HOST))) { return true; } @@ -207,7 +207,7 @@ private boolean validateOrigin() { } private void echoRequestOrigin(final HttpResponse response) { - setOrigin(response, request.headers().get(HttpHeaderNames.ORIGIN)); + setOrigin(response, request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN)); } private static void setVaryHeader(final HttpResponse response) { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4CorsTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4CorsTests.java index 0ed380d68f75e..8a6b405fe9c8b 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4CorsTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4CorsTests.java @@ -142,7 +142,7 @@ private FullHttpResponse executeRequest(final Settings settings, final String or httpRequest.headers().add(HttpHeaderNames.HOST, host); EmbeddedChannel embeddedChannel = new EmbeddedChannel(); embeddedChannel.pipeline().addLast(new Netty4CorsHandler(CorsHandler.fromSettings(settings))); - Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest, 0); + Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest); embeddedChannel.writeOutbound(nettyRequest.createResponse(RestStatus.OK, new BytesArray("content"))); return embeddedChannel.readOutbound(); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index 8b3ba19fe0144..26ae2cfa7e321 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -25,16 +25,15 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.QueryStringDecoder; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpPipelinedResponse; +import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.junit.After; @@ -44,12 +43,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -182,8 +179,8 @@ public void testPipeliningRequestsAreReleased() throws InterruptedException { embeddedChannel.writeInbound(createHttpRequest("/" + i)); } - HttpPipelinedRequest inbound; - ArrayList> requests = new ArrayList<>(); + HttpPipelinedRequest inbound; + ArrayList requests = new ArrayList<>(); while ((inbound = embeddedChannel.readInbound()) != null) { requests.add(inbound); } @@ -192,9 +189,8 @@ public void testPipeliningRequestsAreReleased() throws InterruptedException { for (int i = 1; i < requests.size(); ++i) { ChannelPromise promise = embeddedChannel.newPromise(); promises.add(promise); - HttpPipelinedRequest pipelinedRequest = requests.get(i); - Netty4HttpRequest nioHttpRequest = new Netty4HttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence()); - Netty4HttpResponse resp = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); + HttpPipelinedRequest pipelinedRequest = requests.get(i); + HttpPipelinedResponse resp = pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); embeddedChannel.writeAndFlush(resp, promise); } @@ -217,37 +213,20 @@ private void assertReadHttpMessageHasContent(EmbeddedChannel embeddedChannel, St assertThat(data, is(expectedContent)); } - private FullHttpRequest createHttpRequest(String uri) { - return new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri); + private Netty4HttpRequest createHttpRequest(String uri) { + return new Netty4HttpRequest(new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri)); } - private static class AggregateUrisAndHeadersHandler extends SimpleChannelInboundHandler { - - static final Queue QUEUE_URI = new LinkedTransferQueue<>(); - - @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception { - QUEUE_URI.add(request.uri()); - } - - } - - private class WorkEmulatorHandler extends SimpleChannelInboundHandler> { + private class WorkEmulatorHandler extends SimpleChannelInboundHandler { @Override - protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) { - LastHttpContent request = pipelinedRequest.getRequest(); - final QueryStringDecoder decoder; - if (request instanceof FullHttpRequest) { - decoder = new QueryStringDecoder(((FullHttpRequest)request).uri()); - } else { - decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll()); - } + protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) { + final org.elasticsearch.http.HttpRequest request = pipelinedRequest.getDelegateRequest(); + final QueryStringDecoder decoder = new QueryStringDecoder(request.uri()); final String uri = decoder.path().replace("/", ""); final BytesReference content = new BytesArray(uri.getBytes(StandardCharsets.UTF_8)); - Netty4HttpRequest nioHttpRequest = new Netty4HttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence()); - Netty4HttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, content); + HttpResponse httpResponse = pipelinedRequest.createResponse(RestStatus.OK, content); httpResponse.addHeader(CONTENT_LENGTH.toString(), Integer.toString(content.length())); final CountDownLatch waitingLatch = new CountDownLatch(1); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index a81415d7165ad..a873293ab5b9f 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -26,9 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.util.ReferenceCounted; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.network.NetworkService; @@ -38,6 +36,7 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpResponse; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -154,7 +153,7 @@ protected void initChannel(Channel ch) throws Exception { } - class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler> { + class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler { private final ExecutorService executorService; @@ -163,7 +162,7 @@ class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler msg) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest msg) throws Exception { executorService.submit(new PossiblySlowRunnable(ctx, msg)); } @@ -178,26 +177,23 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E class PossiblySlowRunnable implements Runnable { private ChannelHandlerContext ctx; - private HttpPipelinedRequest pipelinedRequest; - private FullHttpRequest fullHttpRequest; + private HttpPipelinedRequest pipelinedRequest; - PossiblySlowRunnable(ChannelHandlerContext ctx, HttpPipelinedRequest msg) { + PossiblySlowRunnable(ChannelHandlerContext ctx, HttpPipelinedRequest msg) { this.ctx = ctx; this.pipelinedRequest = msg; - this.fullHttpRequest = pipelinedRequest.getRequest(); } @Override public void run() { try { - final String uri = fullHttpRequest.uri(); + final String uri = pipelinedRequest.uri(); final ByteBuf buffer = Unpooled.copiedBuffer(uri, StandardCharsets.UTF_8); - Netty4HttpRequest httpRequest = new Netty4HttpRequest(fullHttpRequest, pipelinedRequest.getSequence()); - Netty4HttpResponse response = - httpRequest.createResponse(RestStatus.OK, new BytesArray(uri.getBytes(StandardCharsets.UTF_8))); - response.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes()); + HttpResponse response = + pipelinedRequest.createResponse(RestStatus.OK, new BytesArray(uri.getBytes(StandardCharsets.UTF_8))); + response.addHeader("content-length", Integer.toString(buffer.readableBytes())); final boolean slow = uri.matches("/slow/\\d+"); if (slow) { @@ -213,7 +209,7 @@ public void run() { final ChannelPromise promise = ctx.newPromise(); ctx.writeAndFlush(response, promise); } finally { - fullHttpRequest.release(); + pipelinedRequest.release(); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 26205e3f36943..6d7e8f3ed8a2e 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -21,17 +21,16 @@ import io.netty.channel.ChannelHandler; import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.http.CorsHandler; import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpPipelinedResponse; import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.nio.cors.NioCorsHandler; import org.elasticsearch.nio.FlushOperation; @@ -68,7 +67,7 @@ public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTranspor this.nanoClock = nanoClock; this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis()); - List handlers = new ArrayList<>(5); + List handlers = new ArrayList<>(8); HttpRequestDecoder decoder = new HttpRequestDecoder(settings.getMaxInitialLineLength(), settings.getMaxHeaderSize(), settings.getMaxChunkSize()); decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); @@ -79,6 +78,7 @@ public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTranspor if (settings.isCompression()) { handlers.add(new HttpContentCompressor(settings.getCompressionLevel())); } + handlers.add(new NioHttpRequestCreator()); if (settings.isCorsEnabled()) { handlers.add(new NioCorsHandler(corsConfig)); } @@ -112,15 +112,13 @@ public int consumeReads(InboundChannelBuffer channelBuffer) { @Override public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { - assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: " - + NioHttpResponse.class + ". Found type: " + message.getClass() + "."; - return new HttpWriteOperation(context, (NioHttpResponse) message, listener); + assert assertMessageTypes(message); + return new HttpWriteOperation(context, (HttpPipelinedResponse) message, listener); } @Override public List writeToBytes(WriteOperation writeOperation) { - assert writeOperation.getObject() instanceof NioHttpResponse : "This channel only supports messages that are of type: " - + NioHttpResponse.class + ". Found type: " + writeOperation.getObject().getClass() + "."; + assert assertMessageTypes(writeOperation.getObject()); assert channelActive : "channelActive should have been called"; --inFlightRequests; assert inFlightRequests >= 0 : "Inflight requests should never drop below zero, found: " + inFlightRequests; @@ -154,26 +152,14 @@ public void close() throws IOException { @SuppressWarnings("unchecked") private void handleRequest(Object msg) { - final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg; - FullHttpRequest request = pipelinedRequest.getRequest(); + final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg; boolean success = false; - NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence()); try { - if (request.decoderResult().isFailure()) { - Throwable cause = request.decoderResult().cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause)); - } else { - transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause); - } - } else { - transport.incomingRequest(httpRequest, nioHttpChannel); - } + transport.incomingRequest(pipelinedRequest, nioHttpChannel); success = true; } finally { if (success == false) { - request.release(); + pipelinedRequest.release(); } } } @@ -190,4 +176,13 @@ private void maybeReadTimeout() { private void scheduleReadTimeout() { taskScheduler.scheduleAtRelativeTime(this::maybeReadTimeout, nanoClock.getAsLong() + readTimeoutNanos); } + + private static boolean assertMessageTypes(Object message) { + assert message instanceof HttpPipelinedResponse : "This channel only supports messages that are of type: " + + HttpPipelinedResponse.class + ". Found type: " + message.getClass() + "."; + assert ((HttpPipelinedResponse) message).getDelegateRequest() instanceof NioHttpResponse : + "This channel only pipelined responses with a delegate of type: " + NioHttpResponse.class + + ". Found type: " + ((HttpPipelinedResponse) message).getDelegateRequest().getClass() + "."; + return true; + } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java index 207843bfe396a..bbb5bf5194cc9 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.nio; +import org.elasticsearch.http.HttpPipelinedResponse; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.WriteOperation; @@ -27,10 +28,10 @@ public class HttpWriteOperation implements WriteOperation { private final SocketChannelContext channelContext; - private final NioHttpResponse response; + private final HttpPipelinedResponse response; private final BiConsumer listener; - HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer listener) { + HttpWriteOperation(SocketChannelContext channelContext, HttpPipelinedResponse response, BiConsumer listener) { this.channelContext = channelContext; this.response = response; this.listener = listener; @@ -47,7 +48,7 @@ public SocketChannelContext getChannel() { } @Override - public NioHttpResponse getObject() { + public HttpPipelinedResponse getObject() { return response; } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java index 977092ddac0aa..e2a610ab80c57 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java @@ -22,11 +22,12 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.FullHttpRequest; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpPipelinedResponse; import org.elasticsearch.http.HttpPipeliningAggregator; +import org.elasticsearch.http.HttpRequest; import java.nio.channels.ClosedChannelException; import java.util.List; @@ -37,7 +38,7 @@ public class NioHttpPipeliningHandler extends ChannelDuplexHandler { private final Logger logger; - private final HttpPipeliningAggregator aggregator; + private final HttpPipeliningAggregator aggregator; /** * Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation. @@ -53,22 +54,22 @@ public NioHttpPipeliningHandler(Logger logger, final int maxEventsHeld) { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass(); - HttpPipelinedRequest pipelinedRequest = aggregator.read(((FullHttpRequest) msg)); + assert msg instanceof HttpRequest : "Invalid message type: " + msg.getClass(); + HttpPipelinedRequest pipelinedRequest = aggregator.read((HttpRequest) msg); ctx.fireChannelRead(pipelinedRequest); } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - assert msg instanceof NioHttpResponse : "Invalid message type: " + msg.getClass(); - NioHttpResponse response = (NioHttpResponse) msg; + assert msg instanceof HttpPipelinedResponse : "Invalid message type: " + msg.getClass(); + HttpPipelinedResponse response = (HttpPipelinedResponse) msg; boolean success = false; try { NettyListener listener = NettyListener.fromChannelPromise(promise); - List> readyResponses = aggregator.write(response, listener); + List> readyResponses = aggregator.write(response, listener); success = true; - for (Tuple responseToWrite : readyResponses) { - ctx.write(responseToWrite.v1(), responseToWrite.v2()); + for (Tuple responseToWrite : readyResponses) { + ctx.write(responseToWrite.v1().getDelegateRequest(), responseToWrite.v2()); } } catch (IllegalStateException e) { ctx.channel().close(); @@ -81,11 +82,11 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) { - List> inflightResponses = aggregator.removeAllInflightResponses(); + List> inflightResponses = aggregator.removeAllInflightResponses(); if (inflightResponses.isEmpty() == false) { ClosedChannelException closedChannelException = new ClosedChannelException(); - for (Tuple inflightResponse : inflightResponses) { + for (Tuple inflightResponse : inflightResponses) { try { inflightResponse.v2().setFailure(closedChannelException); } catch (RuntimeException e) { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index 442ae42f0b932..738c0fbf8a6a0 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -49,23 +49,33 @@ public class NioHttpRequest implements HttpRequest { private final FullHttpRequest request; private final BytesReference content; private final HttpHeadersMap headers; - private final int sequence; private final AtomicBoolean released; + private final Exception inboundException; private final boolean pooled; - NioHttpRequest(FullHttpRequest request, int sequence) { - this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true, + NioHttpRequest(FullHttpRequest request) { + this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true, ByteBufUtils.toBytesReference(request.content())); } - private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, - BytesReference content) { + NioHttpRequest(FullHttpRequest request, Exception inboundException) { + this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true, + ByteBufUtils.toBytesReference(request.content()), inboundException); + } + + private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled, + BytesReference content) { + this(request, headers, released, pooled, content, null); + } + + private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled, + BytesReference content, Exception inboundException) { this.request = request; - this.sequence = sequence; this.headers = headers; this.content = content; this.pooled = pooled; this.released = released; + this.inboundException = inboundException; } @Override @@ -135,7 +145,7 @@ public HttpRequest releaseAndCopy() { return new NioHttpRequest( new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), request.trailingHeaders()), - headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent)); + headers, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent)); } finally { release(); } @@ -179,8 +189,7 @@ public HttpRequest removeHeader(String header) { trailingHeaders.remove(header); FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), request.content(), headersWithoutContentTypeHeader, trailingHeaders); - return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released, - pooled, content); + return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), released, pooled, content); } @Override @@ -188,12 +197,13 @@ public NioHttpResponse createResponse(RestStatus status, BytesReference content) return new NioHttpResponse(this, status, content); } - public FullHttpRequest nettyRequest() { - return request; + @Override + public Exception getInboundException() { + return inboundException; } - int sequence() { - return sequence; + public FullHttpRequest nettyRequest() { + return request; } /** diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequestCreator.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequestCreator.java new file mode 100644 index 0000000000000..f11e376e76e53 --- /dev/null +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequestCreator.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.nio; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.FullHttpRequest; +import org.elasticsearch.ExceptionsHelper; + +import java.util.List; + +class NioHttpRequestCreator extends MessageToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List out) { + if (msg.decoderResult().isFailure()) { + final Throwable cause = msg.decoderResult().cause(); + final Exception nonError; + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + nonError = new Exception(cause); + } else { + nonError = (Exception) cause; + } + out.add(new NioHttpRequest(msg.retain(), nonError)); + } else { + out.add(new NioHttpRequest(msg.retain())); + } + } +} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java index d67494667384a..a6826a4885222 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java @@ -22,18 +22,15 @@ import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.http.HttpPipelinedMessage; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.RestStatus; -public class NioHttpResponse extends DefaultFullHttpResponse implements HttpResponse, HttpPipelinedMessage { +public class NioHttpResponse extends DefaultFullHttpResponse implements HttpResponse { - private final int sequence; private final NioHttpRequest request; NioHttpResponse(NioHttpRequest request, RestStatus status, BytesReference content) { super(request.nettyRequest().protocolVersion(), HttpResponseStatus.valueOf(status.getStatus()), ByteBufUtils.toByteBuf(content)); - this.sequence = request.sequence(); this.request = request; } @@ -47,11 +44,6 @@ public boolean containsHeader(String name) { return headers().contains(name); } - @Override - public int getSequence() { - return sequence; - } - public NioHttpRequest getRequest() { return request; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/cors/NioCorsHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/cors/NioCorsHandler.java index bdeffdfa1c715..3131128ee7c65 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/cors/NioCorsHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/cors/NioCorsHandler.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; @@ -33,6 +32,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import org.elasticsearch.common.Strings; import org.elasticsearch.http.CorsHandler; +import org.elasticsearch.http.nio.NioHttpRequest; import org.elasticsearch.http.nio.NioHttpResponse; import java.util.Date; @@ -53,7 +53,7 @@ public class NioCorsHandler extends ChannelDuplexHandler { private static Pattern SCHEME_PATTERN = Pattern.compile("^https?://"); private final CorsHandler.Config config; - private FullHttpRequest request; + private NioHttpRequest request; /** * Creates a new instance with the specified {@link CorsHandler.Config}. @@ -67,12 +67,12 @@ public NioCorsHandler(final CorsHandler.Config config) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass(); + assert msg instanceof NioHttpRequest : "Invalid message type: " + msg.getClass(); if (config.isCorsSupportEnabled()) { - request = (FullHttpRequest) msg; - if (isPreflightRequest(request)) { + request = (NioHttpRequest) msg; + if (isPreflightRequest(request.nettyRequest())) { try { - handlePreflight(ctx, request); + handlePreflight(ctx, request.nettyRequest()); return; } finally { releaseRequest(); @@ -80,7 +80,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } if (!validateOrigin()) { try { - forbidden(ctx, request); + forbidden(ctx, request.nettyRequest()); return; } finally { releaseRequest(); @@ -168,7 +168,7 @@ private void setPreflightHeaders(final HttpResponse response) { } private boolean setOrigin(final HttpResponse response) { - final String origin = request.headers().get(HttpHeaderNames.ORIGIN); + final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN); if (!Strings.isNullOrEmpty(origin)) { if (config.isAnyOriginSupported()) { if (config.isCredentialsAllowed()) { @@ -193,14 +193,14 @@ private boolean validateOrigin() { return true; } - final String origin = request.headers().get(HttpHeaderNames.ORIGIN); + final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN); if (Strings.isNullOrEmpty(origin)) { // Not a CORS request so we cannot validate it. It may be a non CORS request. return true; } // if the origin is the same as the host of the request, then allow - if (isSameOrigin(origin, request.headers().get(HttpHeaderNames.HOST))) { + if (isSameOrigin(origin, request.nettyRequest().headers().get(HttpHeaderNames.HOST))) { return true; } @@ -208,7 +208,7 @@ private boolean validateOrigin() { } private void echoRequestOrigin(final HttpResponse response) { - setOrigin(response, request.headers().get(HttpHeaderNames.ORIGIN)); + setOrigin(response, request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN)); } private static void setVaryHeader(final HttpResponse response) { diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java index ca9354058d2e6..e5974b53b95e4 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java @@ -40,6 +40,8 @@ import org.elasticsearch.http.CorsHandler; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpHandlingSettings; +import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpPipelinedResponse; import org.elasticsearch.http.HttpReadTimeoutException; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; @@ -114,7 +116,7 @@ public void testSuccessfulDecodeHttpRequest() throws IOException { ByteBuf buf = requestEncoder.encode(httpRequest); int slicePoint = randomInt(buf.writerIndex() - 1); ByteBuf slicedBuf = buf.retainedSlice(0, slicePoint); - ByteBuf slicedBuf2 = buf.retainedSlice(slicePoint, buf.writerIndex()); + ByteBuf slicedBuf2 = buf.retainedSlice(slicePoint, buf.writerIndex() - slicePoint); try { handler.consumeReads(toChannelBuffer(slicedBuf)); @@ -148,10 +150,11 @@ public void testDecodeHttpRequestError() throws IOException { handler.consumeReads(toChannelBuffer(buf)); - ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); - verify(transport).incomingRequestError(any(HttpRequest.class), any(NioHttpChannel.class), exceptionCaptor.capture()); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class)); - assertTrue(exceptionCaptor.getValue() instanceof IllegalArgumentException); + assertNotNull(requestCaptor.getValue().getInboundException()); + assertTrue(requestCaptor.getValue().getInboundException() instanceof IllegalArgumentException); } finally { buf.release(); } @@ -169,7 +172,6 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t } finally { buf.release(); } - verify(transport, times(0)).incomingRequestError(any(), any(), any()); verify(transport, times(0)).incomingRequest(any(), any()); List flushOperations = handler.pollFlushOperations(); @@ -193,7 +195,7 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t @SuppressWarnings("unchecked") public void testEncodeHttpResponse() throws IOException { prepareHandlerForResponse(handler); - NioHttpResponse httpResponse = emptyGetResponse(0); + HttpPipelinedResponse httpResponse = emptyGetResponse(0); SocketChannelContext context = mock(SocketChannelContext.class); HttpWriteOperation writeOperation = new HttpWriteOperation(context, httpResponse, mock(BiConsumer.class)); @@ -372,10 +374,10 @@ public void testReadTimeout() throws IOException { assertNull(taskScheduler.pollTask(timeValue.getNanos() + 9)); } - private static NioHttpResponse emptyGetResponse(int sequenceNumber) { + private static HttpPipelinedResponse emptyGetResponse(int sequence) { DefaultFullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); - NioHttpRequest nioHttpRequest = new NioHttpRequest(nettyRequest, sequenceNumber); - NioHttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); + HttpPipelinedRequest httpRequest = new HttpPipelinedRequest(sequence, new NioHttpRequest(nettyRequest)); + HttpPipelinedResponse httpResponse = httpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); httpResponse.addHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), "0"); return httpResponse; } @@ -392,9 +394,9 @@ private FullHttpResponse executeCorsRequest(final Settings settings, final Strin httpRequest.headers().add(HttpHeaderNames.ORIGIN, originValue); } httpRequest.headers().add(HttpHeaderNames.HOST, host); - NioHttpRequest nioHttpRequest = new NioHttpRequest(httpRequest, 0); + HttpPipelinedRequest pipelinedRequest = new HttpPipelinedRequest(0, new NioHttpRequest(httpRequest)); BytesArray content = new BytesArray("content"); - HttpResponse response = nioHttpRequest.createResponse(RestStatus.OK, content); + HttpResponse response = pipelinedRequest.createResponse(RestStatus.OK, content); response.addHeader("Content-Length", Integer.toString(content.length())); SocketChannelContext context = mock(SocketChannelContext.class); @@ -420,18 +422,18 @@ private void prepareHandlerForResponse(HttpReadWriteHandler handler) throws IOEx buf.release(); } - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(NioHttpRequest.class); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpPipelinedRequest.class); verify(transport, atLeastOnce()).incomingRequest(requestCaptor.capture(), any(HttpChannel.class)); - NioHttpRequest nioHttpRequest = requestCaptor.getValue(); - assertNotNull(nioHttpRequest); - assertEquals(method.name(), nioHttpRequest.method().name()); + HttpRequest httpRequest = requestCaptor.getValue(); + assertNotNull(httpRequest); + assertEquals(method.name(), httpRequest.method().name()); if (version == HttpVersion.HTTP_1_1) { - assertEquals(HttpRequest.HttpVersion.HTTP_1_1, nioHttpRequest.protocolVersion()); + assertEquals(HttpRequest.HttpVersion.HTTP_1_1, httpRequest.protocolVersion()); } else { - assertEquals(HttpRequest.HttpVersion.HTTP_1_0, nioHttpRequest.protocolVersion()); + assertEquals(HttpRequest.HttpVersion.HTTP_1_0, httpRequest.protocolVersion()); } - assertEquals(nioHttpRequest.uri(), uri); + assertEquals(httpRequest.uri(), uri); } private InboundChannelBuffer toChannelBuffer(ByteBuf buf) { diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java index 5f2784a356714..5ae0851c6b839 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java @@ -25,16 +25,16 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.QueryStringDecoder; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.HttpPipelinedResponse; +import org.elasticsearch.http.HttpRequest; +import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.junit.After; @@ -44,12 +44,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -172,7 +170,7 @@ public void testThatPipeliningClosesConnectionWithTooManyEvents() throws Interru assertFalse(embeddedChannel.isOpen()); } - public void testPipeliningRequestsAreReleased() throws InterruptedException { + public void testPipeliningRequestsAreReleased() { final int numberOfRequests = 10; final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests + 1)); @@ -181,8 +179,8 @@ public void testPipeliningRequestsAreReleased() throws InterruptedException { embeddedChannel.writeInbound(createHttpRequest("/" + i)); } - HttpPipelinedRequest inbound; - ArrayList> requests = new ArrayList<>(); + HttpPipelinedRequest inbound; + ArrayList requests = new ArrayList<>(); while ((inbound = embeddedChannel.readInbound()) != null) { requests.add(inbound); } @@ -191,9 +189,8 @@ public void testPipeliningRequestsAreReleased() throws InterruptedException { for (int i = 1; i < requests.size(); ++i) { ChannelPromise promise = embeddedChannel.newPromise(); promises.add(promise); - HttpPipelinedRequest pipelinedRequest = requests.get(i); - NioHttpRequest nioHttpRequest = new NioHttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence()); - NioHttpResponse resp = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); + HttpPipelinedRequest pipelinedRequest = requests.get(i); + HttpPipelinedResponse resp = pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); embeddedChannel.writeAndFlush(resp, promise); } @@ -215,37 +212,20 @@ private void assertReadHttpMessageHasContent(EmbeddedChannel embeddedChannel, St assertThat(data, is(expectedContent)); } - private FullHttpRequest createHttpRequest(String uri) { - return new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri); + private NioHttpRequest createHttpRequest(String uri) { + return new NioHttpRequest(new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri)); } - private static class AggregateUrisAndHeadersHandler extends SimpleChannelInboundHandler { - - static final Queue QUEUE_URI = new LinkedTransferQueue<>(); - - @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception { - QUEUE_URI.add(request.uri()); - } - - } - - private class WorkEmulatorHandler extends SimpleChannelInboundHandler> { + private class WorkEmulatorHandler extends SimpleChannelInboundHandler { @Override - protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) { - LastHttpContent request = pipelinedRequest.getRequest(); - final QueryStringDecoder decoder; - if (request instanceof FullHttpRequest) { - decoder = new QueryStringDecoder(((FullHttpRequest)request).uri()); - } else { - decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll()); - } + protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) { + final HttpRequest request = pipelinedRequest.getDelegateRequest(); + final QueryStringDecoder decoder = new QueryStringDecoder(request.uri()); final String uri = decoder.path().replace("/", ""); final BytesReference content = new BytesArray(uri.getBytes(StandardCharsets.UTF_8)); - NioHttpRequest nioHttpRequest = new NioHttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence()); - NioHttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, content); + HttpResponse httpResponse = pipelinedRequest.createResponse(RestStatus.OK, content); httpResponse.addHeader(CONTENT_LENGTH.toString(), Integer.toString(content.length())); final CountDownLatch waitingLatch = new CountDownLatch(1); diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 8d02eac5b8b13..204c9ad0365b6 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -305,18 +305,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { * @param httpChannel that received the http request */ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) { - handleIncomingRequest(httpRequest, httpChannel, null); - } - - /** - * This method handles an incoming http request that has encountered an error. - * - * @param httpRequest that is incoming - * @param httpChannel that received the http request - * @param exception that was encountered - */ - public void incomingRequestError(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) { - handleIncomingRequest(httpRequest, httpChannel, exception); + handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException()); } // Visible for testing diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index f14219d8c9a51..7acd381de6ebc 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -68,6 +68,7 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann HttpHandlingSettings settings, ThreadContext threadContext, @Nullable HttpTracer tracerLog) { super(request, settings.getDetailedErrorsEnabled()); this.httpChannel = httpChannel; + // TODO: Fix this.httpRequest = httpRequest; this.bigArrays = bigArrays; this.settings = settings; diff --git a/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java b/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java index db3a2bae16714..d98dcb8bde632 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java @@ -16,16 +16,79 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.http; -public class HttpPipelinedRequest implements HttpPipelinedMessage { +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; + +import java.util.List; +import java.util.Map; + +public class HttpPipelinedRequest implements HttpRequest, HttpPipelinedMessage { - private final R request; private final int sequence; + private final HttpRequest delegate; - HttpPipelinedRequest(int sequence, R request) { + public HttpPipelinedRequest(int sequence, HttpRequest delegate) { this.sequence = sequence; - this.request = request; + this.delegate = delegate; + } + + @Override + public RestRequest.Method method() { + return delegate.method(); + } + + @Override + public String uri() { + return delegate.uri(); + } + + @Override + public BytesReference content() { + return delegate.content(); + } + + @Override + public Map> getHeaders() { + return delegate.getHeaders(); + } + + @Override + public List strictCookies() { + return delegate.strictCookies(); + } + + @Override + public HttpVersion protocolVersion() { + return delegate.protocolVersion(); + } + + @Override + public HttpRequest removeHeader(String header) { + return delegate.removeHeader(header); + } + + @Override + public HttpPipelinedResponse createResponse(RestStatus status, BytesReference content) { + return new HttpPipelinedResponse(sequence, delegate.createResponse(status, content)); + } + + @Override + public void release() { + delegate.release(); + } + + @Override + public HttpRequest releaseAndCopy() { + return delegate.releaseAndCopy(); + } + + @Override + public Exception getInboundException() { + return delegate.getInboundException(); } @Override @@ -33,7 +96,7 @@ public int getSequence() { return sequence; } - public R getRequest() { - return request; + public HttpRequest getDelegateRequest() { + return delegate; } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpPipelinedResponse.java b/server/src/main/java/org/elasticsearch/http/HttpPipelinedResponse.java new file mode 100644 index 0000000000000..568e6e67d8a9a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpPipelinedResponse.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +public class HttpPipelinedResponse implements HttpPipelinedMessage, HttpResponse { + + private final int sequence; + private final HttpResponse delegate; + + public HttpPipelinedResponse(int sequence, HttpResponse delegate) { + this.sequence = sequence; + this.delegate = delegate; + } + + @Override + public int getSequence() { + return sequence; + } + + @Override + public void addHeader(String name, String value) { + delegate.addHeader(name, value); + } + + @Override + public boolean containsHeader(String name) { + return delegate.containsHeader(name); + } + + public HttpResponse getDelegateRequest() { + return delegate; + } +} diff --git a/server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java b/server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java index f38e9677979db..266386c494356 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java +++ b/server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java @@ -25,10 +25,10 @@ import java.util.List; import java.util.PriorityQueue; -public class HttpPipeliningAggregator { +public class HttpPipeliningAggregator { private final int maxEventsHeld; - private final PriorityQueue> outboundHoldingQueue; + private final PriorityQueue> outboundHoldingQueue; /* * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the * channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the @@ -42,20 +42,20 @@ public HttpPipeliningAggregator(int maxEventsHeld) { this.outboundHoldingQueue = new PriorityQueue<>(1, Comparator.comparing(Tuple::v1)); } - public HttpPipelinedRequest read(final Request request) { - return new HttpPipelinedRequest<>(readSequence++, request); + public HttpPipelinedRequest read(final HttpRequest request) { + return new HttpPipelinedRequest(readSequence++, request); } - public List> write(final Response response, Listener listener) { + public List> write(final HttpPipelinedResponse response, Listener listener) { if (outboundHoldingQueue.size() < maxEventsHeld) { - ArrayList> readyResponses = new ArrayList<>(); + ArrayList> readyResponses = new ArrayList<>(); outboundHoldingQueue.add(new Tuple<>(response, listener)); while (!outboundHoldingQueue.isEmpty()) { /* * Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence * number does not match the current write sequence number then we have not processed all preceding responses yet. */ - final Tuple top = outboundHoldingQueue.peek(); + final Tuple top = outboundHoldingQueue.peek(); if (top.v1().getSequence() != writeSequence) { break; @@ -73,8 +73,8 @@ public List> write(final Response response, Listener l } } - public List> removeAllInflightResponses() { - ArrayList> responses = new ArrayList<>(outboundHoldingQueue); + public List> removeAllInflightResponses() { + ArrayList> responses = new ArrayList<>(outboundHoldingQueue); outboundHoldingQueue.clear(); return responses; } diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index 4d67078fe571a..80b854fbe7d55 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.http; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; @@ -68,6 +69,9 @@ enum HttpVersion { */ HttpResponse createResponse(RestStatus status, BytesReference content); + @Nullable + Exception getInboundException(); + /** * Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()} * after this method has been invoked is undefined and implementation specific. diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index daed871e29bcd..348e8a83af731 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -250,33 +250,37 @@ public HttpStats stats() { "received other request", traceLoggerName, Level.TRACE, "\\[\\d+\\]\\[" + opaqueId + "\\]\\[OPTIONS\\]\\[/internal/testNotSeen\\] received request from \\[.*")); + final Exception inboundException; + if (badRequest) { + inboundException = new RuntimeException(); + } else { + inboundException = null; + } + final FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY) .withMethod(RestRequest.Method.OPTIONS) .withPath("/internal/test") .withHeaders(Collections.singletonMap(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId))) + .withInboundException(inboundException) .build(); - if (badRequest) { - transport.incomingRequestError(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel(), - new RuntimeException()); + transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel()); + + final Exception inboundExceptionExcludedPath; + if (randomBoolean()) { + inboundExceptionExcludedPath = new RuntimeException(); } else { - transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel()); + inboundExceptionExcludedPath = null; } final FakeRestRequest fakeRestRequestExcludedPath = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY) .withMethod(RestRequest.Method.OPTIONS) .withPath("/internal/testNotSeen") .withHeaders(Collections.singletonMap(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId))) + .withInboundException(inboundExceptionExcludedPath) .build(); - if (randomBoolean()) { - transport.incomingRequest(fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel()); - } else { - transport.incomingRequestError( - fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel(), - new RuntimeException()); - } - + transport.incomingRequest(fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel()); appender.assertAllExpectationsMatched(); } finally { Loggers.removeAppender(LogManager.getLogger(traceLoggerName), appender); diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 0f82be7f23b02..14ec4f296f447 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -469,6 +469,11 @@ public void release() { public HttpRequest releaseAndCopy() { return this; } + + @Override + public Exception getInboundException() { + return null; + } } private static class TestResponse implements HttpResponse { diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 146ded11b4299..484ba127d1d10 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -560,6 +560,11 @@ public void release() { public HttpRequest releaseAndCopy() { return this; } + + @Override + public Exception getInboundException() { + return null; + } }, null); final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 2f2f5fb76bfe7..bab268540154d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -54,12 +54,19 @@ private static class FakeHttpRequest implements HttpRequest { private final String uri; private final BytesReference content; private final Map> headers; + private final Exception inboundException; private FakeHttpRequest(Method method, String uri, BytesReference content, Map> headers) { + this(method, uri, content, headers, null); + } + + private FakeHttpRequest(Method method, String uri, BytesReference content, Map> headers, + Exception inboundException) { this.method = method; this.uri = uri; this.content = content; this.headers = headers; + this.inboundException = inboundException; } @Override @@ -122,6 +129,11 @@ public void release() { public HttpRequest releaseAndCopy() { return this; } + + @Override + public Exception getInboundException() { + return inboundException; + } } private static class FakeHttpChannel implements HttpChannel { @@ -178,6 +190,8 @@ public static class Builder { private InetSocketAddress address = null; + private Exception inboundException; + public Builder(NamedXContentRegistry xContentRegistry) { this.xContentRegistry = xContentRegistry; } @@ -215,8 +229,13 @@ public Builder withRemoteAddress(InetSocketAddress address) { return this; } + public Builder withInboundException(Exception exception) { + this.inboundException = exception; + return this; + } + public FakeRestRequest build() { - FakeHttpRequest fakeHttpRequest = new FakeHttpRequest(method, path, content, headers); + FakeHttpRequest fakeHttpRequest = new FakeHttpRequest(method, path, content, headers, inboundException); return new FakeRestRequest(xContentRegistry, fakeHttpRequest, params, new FakeHttpChannel(address)); } }