diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index 0f35de483dc82..f268c3e6e744f 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.http.HttpBody; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,20 +32,26 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { Netty4HttpRequestBodyStream stream; static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); - @Before - public void createStream() { + @Override + public void setUp() throws Exception { + super.setUp(); channel = new EmbeddedChannel(); stream = new Netty4HttpRequestBodyStream(channel); stream.setHandler(discardHandler); // set default handler, each test might override one - channel.pipeline().addLast(new SimpleChannelInboundHandler() { + channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { - msg.retain(); stream.handleNettyContent(msg); } }); } + @Override + public void tearDown() throws Exception { + super.tearDown(); + stream.close(); + } + // ensures that no chunks are sent downstream without request public void testEnqueueChunksBeforeRequest() { var totalChunks = randomIntBetween(1, 100); @@ -63,6 +68,7 @@ public void testFlushAllReceivedChunks() { stream.setHandler((chunk, isLast) -> { chunks.add(chunk); totalBytes.addAndGet(chunk.length()); + chunk.close(); }); var chunkSize = 1024; @@ -84,6 +90,7 @@ public void testReadFromChannel() { stream.setHandler((chunk, isLast) -> { gotChunks.add(chunk); gotLast.set(isLast); + chunk.close(); }); channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() var chunkSize = 1024;