diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxInputStream.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxInputStream.java index 1b2f11f5d1337e..83b8ebe93d1612 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxInputStream.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxInputStream.java @@ -139,7 +139,7 @@ public int available() throws IOException { return 0; } if (pooled != null && pooled.isReadable()) { - return pooled.readableBytes(); + return pooled.readableBytes() + exchange.readBytesAvailable(); } return exchange.readBytesAvailable(); @@ -235,7 +235,6 @@ public void handle(Throwable event) { protected ByteBuf readBlocking() throws IOException { long expire = System.currentTimeMillis() + timeout; - Buffer ret = null; synchronized (request.connection()) { while (input1 == null && !eof && readException == null) { long rem = expire - System.currentTimeMillis(); @@ -263,16 +262,18 @@ protected ByteBuf readBlocking() throws IOException { if (readException != null) { throw new IOException(readException); } - ret = input1; + Buffer ret = input1; input1 = null; if (inputOverflow != null) { input1 = inputOverflow.poll(); + if (input1 == null) { + request.fetch(1); + } + } else if (!eof) { + request.fetch(1); } + return ret == null ? null : ret.getByteBuf(); } - if (!eof) { - request.fetch(1); - } - return ret == null ? null : ret.getByteBuf(); } @Override diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/AsyncInputStream.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/AsyncInputStream.java index cbabe5a0cbc0e9..3bcf58ce6824c5 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/AsyncInputStream.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/AsyncInputStream.java @@ -1,35 +1,41 @@ package org.jboss.resteasy.reactive.client.handlers; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.impl.InboundBuffer; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; + /** * Adapt an InputStream to a ReadStream that can be used with a Pump in Vertx. */ public class AsyncInputStream implements ReadStream, AutoCloseable { public static final String INPUTSTREAM_IS_CLOSED = "Inputstream is closed"; - private static int BUF_SIZE = 8192; // Based on the inputStream with the real data private final InputStream in; private final Context context; private final InboundBuffer queue; - private final byte[] bytes = new byte[BUF_SIZE]; + private final byte[] bytes; private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean readInProgress = new AtomicBoolean(false); private Handler dataHandler; private Handler endHandler; private Handler exceptionHandler; + private final int maxChunkSize; /** * Create a new Async InputStream that can we used with a Pump */ - public AsyncInputStream(final Vertx vertx, final InputStream in) { + public AsyncInputStream(final Vertx vertx, final InputStream in, final int maxChunkSize) { + this.maxChunkSize = Math.max(maxChunkSize, 8192); + bytes = new byte[this.maxChunkSize]; this.context = vertx.getOrCreateContext(); this.in = in; queue = new InboundBuffer<>(context, 0); @@ -128,7 +134,7 @@ public synchronized AsyncInputStream endHandler(final Handler endHandler) private void doRead() { checkClose(); - doRead(BUF_SIZE); + doRead(maxChunkSize); } private void doRead(final int len) { diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java index fe836017398849..de3a6df1816717 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java @@ -591,7 +591,7 @@ private ReadStream setRequestHeadersForSendingInputStream(HttpClientRequ InputStream inputStream = (InputStream) entity.getEntity(); httpClientRequest.setChunked(true); Vertx vertx = Vertx.currentContext().owner(); - ReadStream readStream = new AsyncInputStream(vertx, inputStream); + ReadStream readStream = new AsyncInputStream(vertx, inputStream, maxChunkSize); // set the Vertx headers after we've run the interceptors because they can modify them setVertxHeaders(httpClientRequest, headerMap); return readStream; diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/VertxClientInputStream.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/VertxClientInputStream.java index 146efa1d1ad14d..a471ed3199fd67 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/VertxClientInputStream.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/VertxClientInputStream.java @@ -98,7 +98,7 @@ public int available() throws IOException { return 0; } if (pooled != null && pooled.isReadable()) { - return pooled.readableBytes(); + return pooled.readableBytes() + exchange.readBytesAvailable(); } return exchange.readBytesAvailable(); @@ -190,7 +190,6 @@ public void handle(Throwable event) { protected ByteBuf readBlocking() throws IOException { long expire = System.currentTimeMillis() + timeout; - Buffer ret = null; synchronized (VertxBlockingInput.this) { while (input1 == null && !eof && readException == null) { long rem = expire - System.currentTimeMillis(); @@ -218,16 +217,18 @@ protected ByteBuf readBlocking() throws IOException { if (readException != null) { throw new IOException(readException); } - ret = input1; + Buffer ret = input1; input1 = null; if (inputOverflow != null) { input1 = inputOverflow.poll(); + if (input1 == null) { + request.fetch(1); + } + } else if (!eof) { + request.fetch(1); } + return ret == null ? null : ret.getByteBuf(); } - if (!eof) { - request.fetch(1); - } - return ret == null ? null : ret.getByteBuf(); } @Override diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxInputStream.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxInputStream.java index 25e99642ef029d..3002f19b36025a 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxInputStream.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxInputStream.java @@ -140,7 +140,7 @@ public int available() throws IOException { return 0; } if (pooled != null && pooled.isReadable()) { - return pooled.readableBytes(); + return pooled.readableBytes() + exchange.readBytesAvailable(); } return exchange.readBytesAvailable(); @@ -236,7 +236,6 @@ public void handle(Throwable event) { protected ByteBuf readBlocking() throws IOException { long expire = System.currentTimeMillis() + timeout; - Buffer ret = null; synchronized (request.connection()) { while (input1 == null && !eof && readException == null) { long rem = expire - System.currentTimeMillis(); @@ -264,16 +263,18 @@ protected ByteBuf readBlocking() throws IOException { if (readException != null) { throw new IOException(readException); } - ret = input1; + Buffer ret = input1; input1 = null; if (inputOverflow != null) { input1 = inputOverflow.poll(); + if (input1 == null) { + request.fetch(1); + } + } else if (!eof) { + request.fetch(1); } + return ret == null ? null : ret.getByteBuf(); } - if (!eof) { - request.fetch(1); - } - return ret == null ? null : ret.getByteBuf(); } @Override diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/FakeInputStream.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/FakeInputStream.java index 56e2a76e280926..4d1e9e7e0c8eb6 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/FakeInputStream.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/FakeInputStream.java @@ -1,6 +1,5 @@ package org.jboss.resteasy.reactive.server.vertx.test.inputstream; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResource.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResource.java index b1112ac15ebc74..dc299314f86e66 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResource.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResource.java @@ -1,7 +1,12 @@ package org.jboss.resteasy.reactive.server.vertx.test.inputstream; import io.smallrye.common.annotation.Blocking; -import jakarta.ws.rs.*; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import org.jboss.logging.Logger; @@ -32,8 +37,9 @@ public String test(InputStream is) throws IOException { @Produces(MediaType.APPLICATION_OCTET_STREAM) @Blocking public InputStream test(@PathParam("len") long len) throws IOException { - LOG.infof("To Write %d", len); - return new FakeInputStream(len); + long lenMb = len * 1024 * 1024L; + LOG.infof("To Write %d", lenMb); + return new FakeInputStream(lenMb); } } diff --git a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResourceTest.java b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResourceTest.java index 53ed61ed5c4815..32381faaf27cc7 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResourceTest.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/test/java/org/jboss/resteasy/reactive/server/vertx/test/inputstream/InputStreamPostGetResourceTest.java @@ -77,8 +77,8 @@ private long getCurrentlyAllocatedMemory() { */ @ParameterizedTest @ValueSource(ints = {1, 100, 1024}) - @DisplayName("Test Get InputStream") - public void testGetInputStream(int mb) throws Exception { + @DisplayName("Test Post InputStream") + public void testPostInputStream(int mb) throws Exception { WebTarget base = client.target(generateURL("/inputstreamtransfer/test")); long size = mb * 1024 * 1024L; InputStream is = new FakeInputStream(size); @@ -101,10 +101,10 @@ public void testGetInputStream(int mb) throws Exception { */ @ParameterizedTest @ValueSource(ints = {1, 100, 1024}) - @DisplayName("Test Post InputStream") - public void testPostInputStream(int mb) throws Exception { - int size = mb * 1024 * 1024; - WebTarget base = client.target(generateURL("/inputstreamtransfer/test/" + size)); + @DisplayName("Test Get InputStream") + public void testGetInputStream(int mb) throws Exception { + long size = mb * 1024 * 1024L; + WebTarget base = client.target(generateURL("/inputstreamtransfer/test/" + mb)); long before = getCurrentlyAllocatedMemory(); long start = System.nanoTime(); InputStream is = base.request().get(InputStream.class);