Skip to content

Commit

Permalink
The implementation of the InputStream to ReadStream adapter can somet…
Browse files Browse the repository at this point in the history
…imes do a stack overflow when the InputStream buffer extraction on a worker thread is faster than the event-loop. We should guard against this recursion and trampoline when the reads starts to pile (8).
  • Loading branch information
vietj committed Mar 16, 2023
1 parent 3710714 commit 3b8ee08
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;

class InputStreamReadStream implements ReadStream<Buffer> {

private static final int CHUNK_SIZE = 2048;
private static final int MAX_DEPTH = 8;

private final VertxHttpRequest vertxHttpRequest;
private final InputStream is;
Expand All @@ -51,6 +53,13 @@ public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}

final ThreadLocal<AtomicInteger> counter = new ThreadLocal<AtomicInteger>() {
@Override
protected AtomicInteger initialValue() {
return new AtomicInteger();
}
};

@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
boolean start = inboundBuffer == null && handler != null;
Expand Down Expand Up @@ -80,6 +89,20 @@ public ReadStream<Buffer> handler(Handler<Buffer> handler) {
}

private void readChunk() {
AtomicInteger atomicInteger = counter.get();
try {
int depth = atomicInteger.getAndIncrement();
if (depth < MAX_DEPTH) {
readChunk2();
return;
}
} finally {
atomicInteger.decrementAndGet();
}
vertxHttpRequest.vertx.runOnContext(v -> readChunk());
}

private void readChunk2() {
Future<Buffer> fut = vertxHttpRequest.vertx.executeBlocking(p -> {
if (bytes == null) {
bytes = new byte[CHUNK_SIZE];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,48 @@ public int read() throws IOException {
assertEquals(StreamResetException.class, cause.getClass());
}
}

@Test
public void testStackOverflow() throws Exception {

requestHandler = req -> {
AtomicInteger size = new AtomicInteger();
req.handler(buff -> size.addAndGet(buff.length()));
req.endHandler(v -> {
req.response().end("" + size);
});
};
HttpClient.Builder builder = clientFactory.newBuilder();
HttpClient client = builder.build();

int contentLength = 128_000;
InputStream is = new InputStream() {
int remaining = contentLength;
@Override
public int read() {
if (remaining == 0) {
return -1;
}
return 'A';
}
@Override
public int read(byte[] b, int off, int len) {
if (remaining > 0) {
int size = Math.min(len, remaining);
remaining -= size;
for (int i = 0;i < size;i++) {
b[off++] = 'A';
}
return size;
} else {
return -1;
}
}
};

HttpRequest request = client.newHttpRequestBuilder().uri("http://localhost:8080").post("text/plain", is, -1).build();
HttpResponse<String> resp = client.sendAsync(request, String.class).get(10, TimeUnit.SECONDS);
int val = Integer.parseInt(resp.body());
assertEquals(contentLength, val);
}
}

0 comments on commit 3b8ee08

Please sign in to comment.