From 627e1275714573298dec458ae01a7b33e8b28e01 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Wed, 20 Oct 2021 16:49:49 +0300 Subject: [PATCH] Ensure that a large response does not cause OOM in RESTEasy Classic The idea of this change is to make the allocation of buffers only happen when the previous write has been completed, as opposed to the previous behavior where all the buffers where allocated upfront and could never be deallocated if one of the writes caused an error Fixes: #20822 --- .../standalone/VertxBlockingOutput.java | 3 ++ .../runtime/standalone/VertxOutputStream.java | 34 +++++++++++-------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxBlockingOutput.java b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxBlockingOutput.java index b8801e241e02e..4a84d8dd80c36 100644 --- a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxBlockingOutput.java +++ b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxBlockingOutput.java @@ -76,6 +76,9 @@ public void write(ByteBuf data, boolean last) throws IOException { return; } if (throwable != null) { + if (data != null && data.refCnt() > 0) { + data.release(); + } throw new IOException(throwable); } try { diff --git a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java index 3dd4255d70d08..18140d473e712 100644 --- a/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java +++ b/extensions/resteasy-classic/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxOutputStream.java @@ -158,24 +158,30 @@ public CompletionStage asyncWrite(final byte[] b, final int off, final int return ret; } - int rem = len; - int idx = off; - ByteBuf buffer = pooledBuffer; CompletionStage ret = CompletableFuture.completedFuture(null); - if (buffer == null) { - pooledBuffer = buffer = allocator.allocateBuffer(); + int bufferSize = allocator.getBufferSize(); + int bufferCount = len / bufferSize; + int remainder = len % bufferSize; + if (remainder != 0) { + bufferCount = bufferCount + 1; } - while (rem > 0) { - int toWrite = Math.min(rem, buffer.writableBytes()); - buffer.writeBytes(b, idx, toWrite); - rem -= toWrite; - idx += toWrite; - if (!buffer.isWritable()) { - ByteBuf tmpBuf = buffer; - this.pooledBuffer = buffer = allocator.allocateBuffer(); - ret = ret.thenCompose(v -> response.writeNonBlocking(tmpBuf, false)); + + if (bufferCount == 1) { + pooledBuffer = allocator.allocateBuffer(); + pooledBuffer.writeBytes(b); + } else { + for (int i = 0; i < bufferCount - 1; i++) { + int bufferIndex = i; + ret = ret.thenCompose(v -> { + ByteBuf tmpBuf = allocator.allocateBuffer(); + tmpBuf.writeBytes(b, bufferIndex * bufferSize, bufferSize); + return response.writeNonBlocking(tmpBuf, false); + }); } + pooledBuffer = allocator.allocateBuffer(); + pooledBuffer.writeBytes(b, (bufferCount - 1) * bufferSize, remainder); } + return ret.thenCompose(v -> asyncUpdateWritten(len)); }