From 6a7a0f7c9e0fc427ef99c1dbd978d31440487555 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 19 Jan 2022 16:33:17 +1100 Subject: [PATCH] Fix response truncation If the response is an exact multiple of the buffer size it is being truncated. Fixes #22973 --- .../resteasy/jackson/LargeResource.java | 39 +++++++++++++ .../resteasy/jackson/LargeResponseTest.java | 32 ++++++++++ .../runtime/standalone/VertxOutputStream.java | 58 ++++++++++--------- 3 files changed, 102 insertions(+), 27 deletions(-) create mode 100644 extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResource.java create mode 100644 extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResponseTest.java diff --git a/extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResource.java b/extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResource.java new file mode 100644 index 0000000000000..7a5849c51c618 --- /dev/null +++ b/extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResource.java @@ -0,0 +1,39 @@ +package io.quarkus.resteasy.jackson; + +import java.util.HashMap; +import java.util.Map; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Produces(MediaType.APPLICATION_JSON) +@Path("/large") +public class LargeResource { + + @GET + @Path("/bufmult") + public Map hello() { + Map ret = new HashMap<>(); + for (int i = 0; i < 830; ++i) { + if (i == 0) { + //hack to make this exactly 2 * 8191 bytes long, as tested by trial and error + ret.put("key00", "value" + i); + } else { + ret.put("key" + i, "value" + i); + } + } + return ret; + } + + @GET + @Path("/huge") + public Map huge() { + Map ret = new HashMap<>(); + for (int i = 0; i < 1280; ++i) { + ret.put("key" + i, "value" + i); + } + return ret; + } +} diff --git a/extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResponseTest.java b/extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResponseTest.java new file mode 100644 index 0000000000000..eba47b1ded7f4 --- /dev/null +++ b/extensions/resteasy-classic/resteasy-jackson/deployment/src/test/java/io/quarkus/resteasy/jackson/LargeResponseTest.java @@ -0,0 +1,32 @@ +package io.quarkus.resteasy.jackson; + +import static org.hamcrest.Matchers.equalTo; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class LargeResponseTest { + + @RegisterExtension + static QuarkusUnitTest TEST = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(LargeResource.class)); + + @Test + public void testLargeResponseMultipleOfBuffer() { + RestAssured.get("/large/bufmult").then() + .statusCode(200) + .body("key500", equalTo("value500")); + } + + @Test + public void testLargeResponse() { + RestAssured.get("/large/huge").then() + .statusCode(200) + .body("key500", equalTo("value500")); + } + +} diff --git a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java index 1b6e5ac0a53ff..208d0fc2a274a 100644 --- a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java +++ b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java @@ -3,12 +3,14 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; import javax.ws.rs.core.HttpHeaders; import org.jboss.resteasy.spi.AsyncOutputStream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; public class VertxOutputStream extends AsyncOutputStream { @@ -160,36 +162,38 @@ public CompletionStage asyncWrite(final byte[] b, final int off, final int return ret; } - CompletionStage ret = CompletableFuture.completedFuture(null); - int bufferSize = allocator.getBufferSize(); - int bufferCount = len / bufferSize; - int remainder = len % bufferSize; - if (remainder != 0) { - bufferCount = bufferCount + 1; - } + CompletableFuture ret = CompletableFuture.completedFuture(null); - if (bufferCount == 1) { - if (pooledBuffer == null) { - pooledBuffer = allocator.allocateBuffer(); - } - pooledBuffer.writeBytes(b, 0, len); - } else { - for (int i = 0; i < bufferCount - 1; i++) { - int bufferIndex = i; - ret = ret.thenCompose(v -> { - ByteBuf tmpBuf = null; - if ((bufferIndex == 0) && ((pooledBuffer != null))) { - tmpBuf = pooledBuffer; + ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(b, off, len); + if (pooledBuffer == null) { + pooledBuffer = allocator.allocateBuffer(); + } + pooledBuffer.writeBytes(wrappedBuffer, Math.min(pooledBuffer.writableBytes(), wrappedBuffer.readableBytes())); + if (pooledBuffer.writableBytes() == 0) { + CompletableFuture cf = new CompletableFuture<>(); + ret = cf; + ByteBuf filled = pooledBuffer; + pooledBuffer = null; + response.writeNonBlocking(filled, false).whenComplete(new BiConsumer() { + @Override + public void accept(Void unused, Throwable throwable) { + if (throwable != null) { + cf.completeExceptionally(throwable); + return; } - if (tmpBuf == null) { - tmpBuf = allocator.allocateBuffer(); + pooledBuffer = allocator.allocateBuffer(); + pooledBuffer.writeBytes(wrappedBuffer, + Math.min(pooledBuffer.writableBytes(), wrappedBuffer.readableBytes())); + + if (pooledBuffer.writableBytes() == 0) { + ByteBuf filled = pooledBuffer; + pooledBuffer = null; + response.writeNonBlocking(filled, false).whenComplete(this); + } else { + cf.complete(null); } - tmpBuf.writeBytes(b, bufferIndex * bufferSize, bufferSize); - return response.writeNonBlocking(tmpBuf, false); - }); - } - pooledBuffer = allocator.allocateBuffer(); - pooledBuffer.writeBytes(b, (bufferCount - 1) * bufferSize, remainder); + } + }); } return ret.thenCompose(v -> asyncUpdateWritten(len));