Skip to content

Commit

Permalink
Revert limiting synchronization and Add maxChunkSize
Browse files Browse the repository at this point in the history
maxChunkSize to current MaxChunkSize, at least 8192
  • Loading branch information
fredericBregier committed Nov 25, 2023
1 parent c40a3be commit d34c42a
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public int available() throws IOException {
return 0;
}
if (pooled != null && pooled.isReadable()) {
return pooled.readableBytes();
return pooled.readableBytes() + exchange.readBytesAvailable();
}

return exchange.readBytesAvailable();
Expand Down Expand Up @@ -235,7 +235,6 @@ public void handle(Throwable event) {

protected ByteBuf readBlocking() throws IOException {
long expire = System.currentTimeMillis() + timeout;
Buffer ret = null;
synchronized (request.connection()) {
while (input1 == null && !eof && readException == null) {
long rem = expire - System.currentTimeMillis();
Expand Down Expand Up @@ -263,16 +262,18 @@ protected ByteBuf readBlocking() throws IOException {
if (readException != null) {
throw new IOException(readException);
}
ret = input1;
Buffer ret = input1;
input1 = null;
if (inputOverflow != null) {
input1 = inputOverflow.poll();
if (input1 == null) {
request.fetch(1);
}
} else if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}
if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package org.jboss.resteasy.reactive.client.handlers;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

import io.vertx.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Adapt an InputStream to a ReadStream that can be used with a Pump in Vertx.
*/
public class AsyncInputStream implements ReadStream<Buffer>, AutoCloseable {
public static final String INPUTSTREAM_IS_CLOSED = "Inputstream is closed";
private static int BUF_SIZE = 8192;
// Based on the inputStream with the real data
private final InputStream in;
private final Context context;
private final InboundBuffer<Buffer> queue;
private final byte[] bytes = new byte[BUF_SIZE];
private final byte[] bytes;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean readInProgress = new AtomicBoolean(false);
private Handler<Buffer> dataHandler;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;
private final int maxChunkSize;

/**
* Create a new Async InputStream that can we used with a Pump
*/
public AsyncInputStream(final Vertx vertx, final InputStream in) {
public AsyncInputStream(final Vertx vertx, final InputStream in, final int maxChunkSize) {
this.maxChunkSize = Math.max(maxChunkSize, 8192);
bytes = new byte[this.maxChunkSize];
this.context = vertx.getOrCreateContext();
this.in = in;
queue = new InboundBuffer<>(context, 0);
Expand Down Expand Up @@ -128,7 +134,7 @@ public synchronized AsyncInputStream endHandler(final Handler<Void> endHandler)

private void doRead() {
checkClose();
doRead(BUF_SIZE);
doRead(maxChunkSize);
}

private void doRead(final int len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ private ReadStream<Buffer> setRequestHeadersForSendingInputStream(HttpClientRequ
InputStream inputStream = (InputStream) entity.getEntity();
httpClientRequest.setChunked(true);
Vertx vertx = Vertx.currentContext().owner();
ReadStream<Buffer> readStream = new AsyncInputStream(vertx, inputStream);
ReadStream<Buffer> readStream = new AsyncInputStream(vertx, inputStream, maxChunkSize);
// set the Vertx headers after we've run the interceptors because they can modify them
setVertxHeaders(httpClientRequest, headerMap);
return readStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public int available() throws IOException {
return 0;
}
if (pooled != null && pooled.isReadable()) {
return pooled.readableBytes();
return pooled.readableBytes() + exchange.readBytesAvailable();
}

return exchange.readBytesAvailable();
Expand Down Expand Up @@ -190,7 +190,6 @@ public void handle(Throwable event) {

protected ByteBuf readBlocking() throws IOException {
long expire = System.currentTimeMillis() + timeout;
Buffer ret = null;
synchronized (VertxBlockingInput.this) {
while (input1 == null && !eof && readException == null) {
long rem = expire - System.currentTimeMillis();
Expand Down Expand Up @@ -218,16 +217,18 @@ protected ByteBuf readBlocking() throws IOException {
if (readException != null) {
throw new IOException(readException);
}
ret = input1;
Buffer ret = input1;
input1 = null;
if (inputOverflow != null) {
input1 = inputOverflow.poll();
if (input1 == null) {
request.fetch(1);
}
} else if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}
if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public int available() throws IOException {
return 0;
}
if (pooled != null && pooled.isReadable()) {
return pooled.readableBytes();
return pooled.readableBytes() + exchange.readBytesAvailable();
}

return exchange.readBytesAvailable();
Expand Down Expand Up @@ -236,7 +236,6 @@ public void handle(Throwable event) {

protected ByteBuf readBlocking() throws IOException {
long expire = System.currentTimeMillis() + timeout;
Buffer ret = null;
synchronized (request.connection()) {
while (input1 == null && !eof && readException == null) {
long rem = expire - System.currentTimeMillis();
Expand Down Expand Up @@ -264,16 +263,18 @@ protected ByteBuf readBlocking() throws IOException {
if (readException != null) {
throw new IOException(readException);
}
ret = input1;
Buffer ret = input1;
input1 = null;
if (inputOverflow != null) {
input1 = inputOverflow.poll();
if (input1 == null) {
request.fetch(1);
}
} else if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}
if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.jboss.resteasy.reactive.server.vertx.test.inputstream;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.jboss.resteasy.reactive.server.vertx.test.inputstream;

import io.smallrye.common.annotation.Blocking;
import jakarta.ws.rs.*;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -32,8 +37,9 @@ public String test(InputStream is) throws IOException {
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Blocking
public InputStream test(@PathParam("len") long len) throws IOException {
LOG.infof("To Write %d", len);
return new FakeInputStream(len);
long lenMb = len * 1024 * 1024L;
LOG.infof("To Write %d", lenMb);
return new FakeInputStream(lenMb);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ private long getCurrentlyAllocatedMemory() {
*/
@ParameterizedTest
@ValueSource(ints = {1, 100, 1024})
@DisplayName("Test Get InputStream")
public void testGetInputStream(int mb) throws Exception {
@DisplayName("Test Post InputStream")
public void testPostInputStream(int mb) throws Exception {
WebTarget base = client.target(generateURL("/inputstreamtransfer/test"));
long size = mb * 1024 * 1024L;
InputStream is = new FakeInputStream(size);
Expand All @@ -101,10 +101,10 @@ public void testGetInputStream(int mb) throws Exception {
*/
@ParameterizedTest
@ValueSource(ints = {1, 100, 1024})
@DisplayName("Test Post InputStream")
public void testPostInputStream(int mb) throws Exception {
int size = mb * 1024 * 1024;
WebTarget base = client.target(generateURL("/inputstreamtransfer/test/" + size));
@DisplayName("Test Get InputStream")
public void testGetInputStream(int mb) throws Exception {
long size = mb * 1024 * 1024L;
WebTarget base = client.target(generateURL("/inputstreamtransfer/test/" + mb));
long before = getCurrentlyAllocatedMemory();
long start = System.nanoTime();
InputStream is = base.request().get(InputStream.class);
Expand Down

0 comments on commit d34c42a

Please sign in to comment.