Skip to content

Commit

Permalink
Release netty ByteBufs in Netty4HttpRequestBodyStreamTests (elastic#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
mhl-b authored and Tim-Brooks committed Sep 19, 2024
1 parent 6aa1f72 commit e3424bd
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -33,20 +32,26 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
Netty4HttpRequestBodyStream stream;
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();

@Before
public void createStream() {
@Override
public void setUp() throws Exception {
super.setUp();
channel = new EmbeddedChannel();
stream = new Netty4HttpRequestBodyStream(channel);
stream.setHandler(discardHandler); // set default handler, each test might override one
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>() {
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
msg.retain();
stream.handleNettyContent(msg);
}
});
}

@Override
public void tearDown() throws Exception {
super.tearDown();
stream.close();
}

// ensures that no chunks are sent downstream without request
public void testEnqueueChunksBeforeRequest() {
var totalChunks = randomIntBetween(1, 100);
Expand All @@ -63,6 +68,7 @@ public void testFlushAllReceivedChunks() {
stream.setHandler((chunk, isLast) -> {
chunks.add(chunk);
totalBytes.addAndGet(chunk.length());
chunk.close();
});

var chunkSize = 1024;
Expand All @@ -84,6 +90,7 @@ public void testReadFromChannel() {
stream.setHandler((chunk, isLast) -> {
gotChunks.add(chunk);
gotLast.set(isLast);
chunk.close();
});
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
var chunkSize = 1024;
Expand Down

0 comments on commit e3424bd

Please sign in to comment.