Skip to content

Commit

Permalink
fix fabric8io#5022: adds a small amount of additional buffering to ex…
Browse files Browse the repository at this point in the history
…ec streams
  • Loading branch information
shawkins committed Apr 17, 2023
1 parent 588d1eb commit ee07151
Showing 3 changed files with 30 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
* Fix #4477 exposing LeaderElector.release to force an elector to give up the lease
* Fix #4935: improve HTTP client implementation selection messages
* Fix #4975: exposing scale operations for all Resources
* Fix #5022: adding additional buffering to ExecWatchInputStream
* Fix #4992: Optimize Quantity parsing to avoid regex overhead
* Fix #4998: removing the internal usage of the Serialization yaml mapper

Original file line number Diff line number Diff line change
@@ -31,16 +31,24 @@
*/
public class ExecWatchInputStream extends InputStream {

private static final int BUFFER_SIZE = 1 << 15;

private final LinkedList<ByteBuffer> buffers = new LinkedList<>();
private boolean complete;
private boolean closed;
private Throwable failed;
private ByteBuffer currentBuffer;

private Runnable request;
private final Runnable request;
private final int bufferSize;

public ExecWatchInputStream(Runnable request) {
this(request, BUFFER_SIZE);
}

public ExecWatchInputStream(Runnable request, int bufferSize) {
this.request = request;
this.bufferSize = bufferSize;
}

void onExit(Integer exitCode, Throwable t) {
@@ -69,6 +77,10 @@ void consume(List<ByteBuffer> value) {
assert !complete || failed == null;
buffers.addAll(value);
buffers.notifyAll();
if ((currentBuffer != null ? currentBuffer.remaining() : 0)
+ buffers.stream().mapToInt(ByteBuffer::remaining).sum() < bufferSize) {
request.run();
}
}
}

Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ void testConsumerAfterClose() throws IOException {
@Test
void testConsume() throws IOException {
AtomicInteger count = new AtomicInteger();
ExecWatchInputStream is = new ExecWatchInputStream(() -> count.getAndIncrement());
ExecWatchInputStream is = new ExecWatchInputStream(() -> count.getAndIncrement(), 0);
is.consume(Collections.singletonList(ByteBuffer.allocate(1)));

assertEquals(0, is.read());
@@ -94,6 +94,21 @@ void testConsume() throws IOException {
readFuture.join();
}

@Test
void testConsumeBuffering() throws IOException {
AtomicInteger count = new AtomicInteger();
ExecWatchInputStream is = new ExecWatchInputStream(() -> count.getAndIncrement(), 2);
is.consume(Collections.singletonList(ByteBuffer.allocate(1)));

// should keep going as the amount is less than the buffer size
assertEquals(1, count.get());

is.consume(Collections.singletonList(ByteBuffer.allocate(1)));

// should not request as we're at the buffer limit
assertEquals(1, count.get());
}

@Test
void testCompleteInlineWithRequestMore() throws IOException {
AtomicReference<ExecWatchInputStream> is = new AtomicReference<>();

0 comments on commit ee07151

Please sign in to comment.