Skip to content

Commit

Permalink
Modify pipelining handlers to require full requests (#31280)
Browse files Browse the repository at this point in the history
Currently the http pipelining handlers seem to support chunked http
content. However, this does not make sense. There is a content
aggregator in the pipeline before the pipelining handler. This means the
pipelining handler should only see full http messages. Additionally, the
request handler immediately after the pipelining handler only supports
full messages.

This commit modifies both nio and netty4 pipelining handlers to assert
that an inbound message is a full http message. Additionally it removes
the tests for chunked content.
  • Loading branch information
Tim-Brooks authored Jun 13, 2018
1 parent 0bfd18c commit 56ffe55
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
Expand Down Expand Up @@ -53,17 +53,14 @@ public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof LastHttpContent) {
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg));
ctx.fireChannelRead(pipelinedRequest);
} else {
ctx.fireChannelRead(msg);
}
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
ctx.fireChannelRead(pipelinedRequest);
}

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof Netty4HttpResponse : "Message must be type: " + Netty4HttpResponse.class;
assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();;
Netty4HttpResponse response = (Netty4HttpResponse) msg;
boolean success = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,38 +148,6 @@ public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws Int
assertTrue(embeddedChannel.isOpen());
}

public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(
new AggregateUrisAndHeadersHandler(),
new Netty4HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());

for (int i = 0; i < numberOfRequests; i++) {
final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i);
embeddedChannel.writeInbound(request);
embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
}

final List<CountDownLatch> latches = new ArrayList<>();
for (int i = numberOfRequests - 1; i >= 0; i--) {
latches.add(finishRequest(Integer.toString(i)));
}

for (final CountDownLatch latch : latches) {
latch.await();
}

embeddedChannel.flush();

for (int i = 0; i < numberOfRequests; i++) {
assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i));
}

assertTrue(embeddedChannel.isOpen());
}

public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipeliningAggregator;
import org.elasticsearch.http.nio.NettyListener;
import org.elasticsearch.http.nio.NioHttpResponse;

import java.nio.channels.ClosedChannelException;
import java.util.List;
Expand All @@ -55,17 +53,14 @@ public NioHttpPipeliningHandler(Logger logger, final int maxEventsHeld) {

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof LastHttpContent) {
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg));
ctx.fireChannelRead(pipelinedRequest);
} else {
ctx.fireChannelRead(msg);
}
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
ctx.fireChannelRead(pipelinedRequest);
}

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof NioHttpResponse : "Message must be type: " + NioHttpResponse.class;
assert msg instanceof NioHttpResponse : "Invalid message type: " + msg.getClass();
NioHttpResponse response = (NioHttpResponse) msg;
boolean success = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.elasticsearch.common.Randomness;
Expand Down Expand Up @@ -147,38 +145,6 @@ public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws Int
assertTrue(embeddedChannel.isOpen());
}

public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(
new AggregateUrisAndHeadersHandler(),
new NioHttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());

for (int i = 0; i < numberOfRequests; i++) {
final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i);
embeddedChannel.writeInbound(request);
embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
}

final List<CountDownLatch> latches = new ArrayList<>();
for (int i = numberOfRequests - 1; i >= 0; i--) {
latches.add(finishRequest(Integer.toString(i)));
}

for (final CountDownLatch latch : latches) {
latch.await();
}

embeddedChannel.flush();

for (int i = 0; i < numberOfRequests; i++) {
assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i));
}

assertTrue(embeddedChannel.isOpen());
}

public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests),
Expand Down

0 comments on commit 56ffe55

Please sign in to comment.