Skip to content

Commit

Permalink
Unmute channel when flush last http stream chunk (elastic#113222)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhl-b authored and Tim-Brooks committed Oct 29, 2024
1 parent 003bea2 commit ab61817
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ public void handleNettyContent(HttpContent httpContent) {
send();
}
}
if (hasLast) {
channel.config().setAutoRead(true);
channel.closeFuture().removeListener(closeListener);
}
}

// adds chunk to current buffer, will allocate composite buffer when need to hold more than 1 chunk
Expand Down Expand Up @@ -133,6 +129,10 @@ private void send() {
tracer.onNext(bytesRef, hasLast);
}
handler.onNext(bytesRef, hasLast);
if (hasLast) {
channel.config().setAutoRead(true);
channel.closeFuture().removeListener(closeListener);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ public void testFlushAllReceivedChunks() {
assertEquals(chunkSize * totalChunks, totalBytes.get());
}

// ensures that channel.setAutoRead(true) only when we flush last chunk
public void testSetAutoReadOnLastFlush() {
channel.writeInbound(randomLastContent(10));
assertFalse("should not auto-read on last content reception", channel.config().isAutoRead());
stream.next();
channel.runPendingTasks();
assertTrue("should set auto-read once last content is flushed", channel.config().isAutoRead());
}

// ensures that we read from channel when no current chunks available
// and pass next chunk downstream without holding
public void testReadFromChannel() {
Expand Down

0 comments on commit ab61817

Please sign in to comment.