diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpResponse.java similarity index 81% rename from modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java rename to modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpResponse.java index e7a88fac4fc67..2a5a1fb6e05d8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpResponse.java @@ -16,11 +16,11 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.netty4.Netty4Utils; -public class Netty4HttpResponse extends DefaultFullHttpResponse implements Netty4RestResponse { +public final class Netty4FullHttpResponse extends DefaultFullHttpResponse implements Netty4RestResponse { private final int sequence; - Netty4HttpResponse(int sequence, HttpVersion version, RestStatus status, BytesReference content) { + Netty4FullHttpResponse(int sequence, HttpVersion version, RestStatus status, BytesReference content) { super(version, HttpResponseStatus.valueOf(status.getStatus()), Netty4Utils.toByteBuf(content)); this.sequence = sequence; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index f5a32a0ec768c..669c9714dd8a1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -57,7 +57,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { private final int maxEventsHeld; private final PriorityQueue> outboundHoldingQueue; - private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Netty4ChunkedHttpResponse response) {} + private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {} /** * The current {@link ChunkedWrite} if a chunked write is executed at the moment. @@ -150,6 +150,8 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann ); } // response is not at the current sequence number so we add it to the outbound queue and return + assert outboundHoldingQueue.stream().noneMatch(t -> t.v1().getSequence() == writeSequence) + : "duplicate outbound entries for seqno " + writeSequence; outboundHoldingQueue.add(new Tuple<>(restResponse, promise)); success = true; return; @@ -191,17 +193,22 @@ private void doWriteQueued(ChannelHandlerContext ctx) throws IOException { private void doWrite(ChannelHandlerContext ctx, Netty4RestResponse readyResponse, ChannelPromise promise) throws IOException { assert currentChunkedWrite == null : "unexpected existing write [" + currentChunkedWrite + "]"; - if (readyResponse instanceof Netty4HttpResponse) { - doWrite(ctx, (Netty4HttpResponse) readyResponse, promise); + assert readyResponse != null : "cannot write null response"; + assert readyResponse.getSequence() == writeSequence; + if (readyResponse instanceof Netty4FullHttpResponse fullResponse) { + doWriteFullResponse(ctx, fullResponse, promise); + } else if (readyResponse instanceof Netty4ChunkedHttpResponse chunkedResponse) { + doWriteChunkedResponse(ctx, chunkedResponse, promise); } else { - doWrite(ctx, (Netty4ChunkedHttpResponse) readyResponse, promise); + assert false : readyResponse.getClass().getCanonicalName(); + throw new IllegalStateException("illegal message type: " + readyResponse.getClass().getCanonicalName()); } } /** * Split up large responses to prevent batch compression {@link JdkZlibEncoder} down the pipeline. */ - private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse, ChannelPromise promise) { + private void doWriteFullResponse(ChannelHandlerContext ctx, Netty4FullHttpResponse readyResponse, ChannelPromise promise) { if (DO_NOT_SPLIT_HTTP_RESPONSES || readyResponse.content().readableBytes() <= SPLIT_THRESHOLD) { enqueueWrite(ctx, readyResponse, promise); } else { @@ -210,16 +217,19 @@ private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse writeSequence++; } - private void doWrite(ChannelHandlerContext ctx, Netty4ChunkedHttpResponse readyResponse, ChannelPromise promise) throws IOException { + private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttpResponse readyResponse, ChannelPromise promise) + throws IOException { final PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); final ChannelPromise first = ctx.newPromise(); combiner.add((Future) first); - currentChunkedWrite = new ChunkedWrite(combiner, promise, readyResponse); + final var responseBody = readyResponse.body(); + assert currentChunkedWrite == null; + currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody); if (enqueueWrite(ctx, readyResponse, first)) { // We were able to write out the first chunk directly, try writing out subsequent chunks until the channel becomes unwritable. // NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel. while (ctx.channel().isWritable()) { - if (writeChunk(ctx, combiner, readyResponse.body())) { + if (writeChunk(ctx, combiner, responseBody)) { finishChunkedWrite(); return; } @@ -228,15 +238,15 @@ private void doWrite(ChannelHandlerContext ctx, Netty4ChunkedHttpResponse readyR } private void finishChunkedWrite() { - try { - currentChunkedWrite.combiner.finish(currentChunkedWrite.onDone); - } finally { - currentChunkedWrite = null; - writeSequence++; - } + assert currentChunkedWrite != null; + assert currentChunkedWrite.responseBody().isDone(); + final var finishingWrite = currentChunkedWrite; + currentChunkedWrite = null; + writeSequence++; + finishingWrite.combiner.finish(finishingWrite.onDone()); } - private void splitAndWrite(ChannelHandlerContext ctx, Netty4HttpResponse msg, ChannelPromise promise) { + private void splitAndWrite(ChannelHandlerContext ctx, Netty4FullHttpResponse msg, ChannelPromise promise) { final PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers()); combiner.add(enqueueWrite(ctx, response)); @@ -293,7 +303,7 @@ private boolean doFlush(ChannelHandlerContext ctx) throws IOException { if (currentWrite == null) { // no bytes were found queued, check if a chunked message might have become writable if (currentChunkedWrite != null) { - if (writeChunk(ctx, currentChunkedWrite.combiner, currentChunkedWrite.response.body())) { + if (writeChunk(ctx, currentChunkedWrite.combiner, currentChunkedWrite.responseBody())) { finishChunkedWrite(); } continue; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 37b6794238b81..0e1bb527fed9d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -171,8 +171,8 @@ public HttpRequest removeHeader(String header) { } @Override - public Netty4HttpResponse createResponse(RestStatus status, BytesReference contentRef) { - return new Netty4HttpResponse(sequence, request.protocolVersion(), status, contentRef); + public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) { + return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 8eb1a5789102c..4bee7cd52a214 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -419,8 +419,8 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { protected boolean isContentAlwaysEmpty(HttpResponse msg) { // non-chunked responses (Netty4HttpResponse extends Netty's DefaultFullHttpResponse) with chunked transfer // encoding are only sent by us in response to HEAD requests and must always have an empty body - if (msg instanceof Netty4HttpResponse netty4HttpResponse && HttpUtil.isTransferEncodingChunked(msg)) { - assert netty4HttpResponse.content().isReadable() == false; + if (msg instanceof Netty4FullHttpResponse netty4FullHttpResponse && HttpUtil.isTransferEncodingChunked(msg)) { + assert netty4FullHttpResponse.content().isReadable() == false; return true; } return super.isContentAlwaysEmpty(msg); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java index 1a3ec4bdc4b75..eacdaa765add4 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RestResponse.java @@ -12,7 +12,7 @@ import org.elasticsearch.http.HttpResponse; -public interface Netty4RestResponse extends HttpResponse, HttpMessage { +public sealed interface Netty4RestResponse extends HttpResponse, HttpMessage permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse { int getSequence(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index aa6e51cf9fb00..db59da8076b0a 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -18,6 +18,8 @@ import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; @@ -30,12 +32,14 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.bytes.ZeroBytesReference; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.ChunkedRestResponseBody; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.netty4.Netty4Utils; +import org.elasticsearch.transport.netty4.NettyAllocator; import org.junit.After; import java.nio.channels.ClosedChannelException; @@ -56,6 +60,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.Is.is; import static org.mockito.Mockito.mock; @@ -70,18 +75,18 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { @After public void tearDown() throws Exception { waitingRequests.keySet().forEach(this::finishRequest); - // shutdown the Executor Service - if (handlerService.isShutdown() == false) { - handlerService.shutdown(); - handlerService.awaitTermination(10, TimeUnit.SECONDS); - } - if (eventLoopService.isShutdown() == false) { - eventLoopService.shutdown(); - eventLoopService.awaitTermination(10, TimeUnit.SECONDS); - } + terminateExecutorService(handlerService); + terminateExecutorService(eventLoopService); super.tearDown(); } + private void terminateExecutorService(ExecutorService executorService) throws InterruptedException { + if (executorService.isShutdown() == false) { + executorService.shutdown(); + assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + } + } + private CountDownLatch finishRequest(String url) { waitingRequests.get(url).countDown(); return finishingRequests.get(url); @@ -92,7 +97,7 @@ public void testThatPipeliningWorksWithFastSerializedRequests() throws Interrupt final EmbeddedChannel embeddedChannel = makeEmbeddedChannelWithSimulatedWork(numberOfRequests); for (int i = 0; i < numberOfRequests; i++) { - embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i))); + embeddedChannel.writeInbound(createHttpRequest("/" + i)); } final List latches = new ArrayList<>(); @@ -127,7 +132,7 @@ public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws Int final EmbeddedChannel embeddedChannel = makeEmbeddedChannelWithSimulatedWork(numberOfRequests); for (int i = 0; i < numberOfRequests; i++) { - embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i))); + embeddedChannel.writeInbound(createHttpRequest("/" + i)); } // random order execution @@ -156,7 +161,7 @@ public void testThatPipeliningClosesConnectionWithTooManyEvents() throws Interru final EmbeddedChannel embeddedChannel = makeEmbeddedChannelWithSimulatedWork(numberOfRequests); for (int i = 0; i < 1 + numberOfRequests + 1; i++) { - embeddedChannel.writeInbound(createHttpRequest("/" + Integer.toString(i))); + embeddedChannel.writeInbound(createHttpRequest("/" + i)); } final List latches = new ArrayList<>(); @@ -178,7 +183,7 @@ public void testThatPipeliningClosesConnectionWithTooManyEvents() throws Interru assertFalse(embeddedChannel.isOpen()); } - public void testPipeliningRequestsAreReleased() throws InterruptedException { + public void testPipeliningRequestsAreReleased() { final int numberOfRequests = 10; final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests + 1, null)); @@ -197,7 +202,7 @@ public void testPipeliningRequestsAreReleased() throws InterruptedException { ChannelPromise promise = embeddedChannel.newPromise(); promises.add(promise); Netty4HttpRequest pipelinedRequest = requests.get(i); - Netty4HttpResponse resp = pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); + Netty4FullHttpResponse resp = pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); embeddedChannel.writeAndFlush(resp, promise); } @@ -211,6 +216,45 @@ public void testPipeliningRequestsAreReleased() throws InterruptedException { } } + public void testSmallFullResponsesAreSentDirectly() { + final List messagesSeen = new ArrayList<>(); + final var embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + embeddedChannel.writeInbound(createHttpRequest("/test")); + final Netty4HttpRequest request = embeddedChannel.readInbound(); + final var maxSize = (int) NettyAllocator.suggestedMaxAllocationSize() / 2; + final var content = new ZeroBytesReference(between(0, maxSize)); + final var response = request.createResponse(RestStatus.OK, content); + assertThat(response, instanceOf(FullHttpResponse.class)); + final var promise = embeddedChannel.newPromise(); + embeddedChannel.writeAndFlush(response, promise); + assertTrue(promise.isDone()); + assertThat(messagesSeen, hasSize(1)); + assertSame(response, messagesSeen.get(0)); + } + + public void testLargeFullResponsesAreSplit() { + final List messagesSeen = new ArrayList<>(); + final var embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + embeddedChannel.writeInbound(createHttpRequest("/test")); + final Netty4HttpRequest request = embeddedChannel.readInbound(); + final var minSize = (int) NettyAllocator.suggestedMaxAllocationSize(); + final var content = new ZeroBytesReference(between(minSize, minSize * 2)); + final var response = request.createResponse(RestStatus.OK, content); + assertThat(response, instanceOf(FullHttpResponse.class)); + final var promise = embeddedChannel.newPromise(); + embeddedChannel.writeAndFlush(response, promise); + assertTrue(promise.isDone()); + assertThat(messagesSeen, hasSize(3)); + final var headersMessage = asInstanceOf(DefaultHttpResponse.class, messagesSeen.get(0)); + assertEquals(RestStatus.OK.getStatus(), headersMessage.status().code()); + assertThat(headersMessage, not(instanceOf(FullHttpResponse.class))); + final var chunk1 = asInstanceOf(DefaultHttpContent.class, messagesSeen.get(1)); + final var chunk2 = asInstanceOf(DefaultLastHttpContent.class, messagesSeen.get(2)); + assertEquals(content.length(), chunk1.content().readableBytes() + chunk2.content().readableBytes()); + assertThat(chunk1, not(instanceOf(FullHttpResponse.class))); + assertThat(chunk2, not(instanceOf(FullHttpResponse.class))); + } + public void testDecoderErrorSurfacedAsNettyInboundError() { final EmbeddedChannel embeddedChannel = new EmbeddedChannel(getTestHttpHandler()); // a request with a decoder error @@ -304,7 +348,7 @@ public void testResumesSingleAfterChunkedMessage() { assertTrue(promise1.isDone()); assertThat(messagesSeen, hasSize(chunks1 + 1 + 1)); assertChunkedMessageAtIndex(messagesSeen, 0, chunks1, chunk); - assertThat(messagesSeen.get(chunks1 + 1), instanceOf(Netty4HttpResponse.class)); + assertThat(messagesSeen.get(chunks1 + 1), instanceOf(Netty4FullHttpResponse.class)); assertContentAtIndexEquals(messagesSeen, chunks1 + 1, single); assertTrue(promise2.isDone()); } @@ -339,7 +383,7 @@ public void testChunkedResumesAfterSingleMessage() { embeddedChannel.flush(); assertTrue(promise1.isDone()); assertThat(messagesSeen, hasSize(chunks2 + 2)); - assertThat(messagesSeen.get(0), instanceOf(Netty4HttpResponse.class)); + assertThat(messagesSeen.get(0), instanceOf(Netty4FullHttpResponse.class)); assertChunkedMessageAtIndex(messagesSeen, 1, chunks2, chunk); assertTrue(promise2.isDone()); } @@ -377,7 +421,7 @@ public void testChunkedWithSmallChunksResumesAfterSingleMessage() { embeddedChannel.flush(); assertTrue(promise1.isDone()); assertThat(messagesSeen, hasSize(chunks2 + 2)); - assertThat(messagesSeen.get(0), instanceOf(Netty4HttpResponse.class)); + assertThat(messagesSeen.get(0), instanceOf(Netty4FullHttpResponse.class)); assertChunkedMessageAtIndex(messagesSeen, 1, chunks2, chunk); assertTrue(promise2.isDone()); } @@ -410,7 +454,7 @@ public void testPipeliningRequestsAreReleasedAfterFailureOnChunked() { for (Netty4HttpRequest request : requests) { ChannelPromise promise = embeddedChannel.newPromise(); promises.add(promise); - Netty4HttpResponse resp = request.createResponse(RestStatus.OK, BytesArray.EMPTY); + Netty4FullHttpResponse resp = request.createResponse(RestStatus.OK, BytesArray.EMPTY); embeddedChannel.write(resp, promise); } assertFalse(chunkedWritePromise.isDone()); @@ -525,7 +569,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, Netty4HttpRequest r handlerService.submit(() -> { try { - waitingLatch.await(1000, TimeUnit.SECONDS); + assertTrue(waitingLatch.await(1000, TimeUnit.SECONDS)); final ChannelPromise promise = ctx.newPromise(); eventLoopService.submit(() -> { ctx.write(httpResponse, promise); diff --git a/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReference.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/ZeroBytesReference.java similarity index 100% rename from server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReference.java rename to test/framework/src/main/java/org/elasticsearch/common/bytes/ZeroBytesReference.java diff --git a/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java similarity index 79% rename from server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java rename to test/framework/src/main/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java index f90cb870ea22a..c3f4510cd144e 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/ZeroBytesReferenceTests.java @@ -8,8 +8,6 @@ package org.elasticsearch.common.bytes; -import java.io.IOException; - import static org.hamcrest.Matchers.containsString; public class ZeroBytesReferenceTests extends AbstractBytesReferenceTestCase { @@ -39,9 +37,11 @@ public void testSliceToBytesRef() { // ZeroBytesReference shifts offsets } - public void testWriteWithIterator() throws IOException { - AssertionError error = expectThrows(AssertionError.class, () -> super.testWriteWithIterator()); - assertThat(error.getMessage(), containsString("Internal pages from ZeroBytesReference must be zero")); + public void testWriteWithIterator() { + assertThat( + expectThrows(AssertionError.class, super::testWriteWithIterator).getMessage(), + containsString("Internal pages from ZeroBytesReference must be zero") + ); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java index 4b180548bfba4..a9d8c1dfc8d9e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SecurityNetty4HttpServerTransportTests.java @@ -36,7 +36,7 @@ import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.NullDispatcher; -import org.elasticsearch.http.netty4.Netty4HttpResponse; +import org.elasticsearch.http.netty4.Netty4FullHttpResponse; import org.elasticsearch.http.netty4.Netty4HttpServerTransport; import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils; import org.elasticsearch.http.netty4.internal.HttpHeadersWithAuthenticationContext; @@ -398,7 +398,7 @@ public void testHttpHeaderAuthnBypassHeaderValidator() throws Exception { }); writeFuture.get(); ch.flushOutbound(); - Netty4HttpResponse response = ch.readOutbound(); + Netty4FullHttpResponse response = ch.readOutbound(); assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); String responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); assertThat( @@ -501,7 +501,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th ch.flushInbound(); }).get(); ch.flushOutbound(); - Netty4HttpResponse response = ch.readOutbound(); + Netty4FullHttpResponse response = ch.readOutbound(); assertThat(response.status(), is(HttpResponseStatus.INTERNAL_SERVER_ERROR)); var responseContentString = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); assertThat(