Skip to content

Commit

Permalink
Ensure that a large response does not cause OOM in RESTEasy Classic
Browse files Browse the repository at this point in the history
The idea of this change is to make the allocation of buffers only happen
when the previous write has been completed, as opposed to the previous
behavior where all the buffers where allocated upfront and could never
be deallocated if one of the writes caused an error

Fixes: quarkusio#20822
  • Loading branch information
geoand committed Oct 20, 2021
1 parent 30ceff4 commit 627e127
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public void write(ByteBuf data, boolean last) throws IOException {
return;
}
if (throwable != null) {
if (data != null && data.refCnt() > 0) {
data.release();
}
throw new IOException(throwable);
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,30 @@ public CompletionStage<Void> asyncWrite(final byte[] b, final int off, final int
return ret;
}

int rem = len;
int idx = off;
ByteBuf buffer = pooledBuffer;
CompletionStage<Void> ret = CompletableFuture.completedFuture(null);
if (buffer == null) {
pooledBuffer = buffer = allocator.allocateBuffer();
int bufferSize = allocator.getBufferSize();
int bufferCount = len / bufferSize;
int remainder = len % bufferSize;
if (remainder != 0) {
bufferCount = bufferCount + 1;
}
while (rem > 0) {
int toWrite = Math.min(rem, buffer.writableBytes());
buffer.writeBytes(b, idx, toWrite);
rem -= toWrite;
idx += toWrite;
if (!buffer.isWritable()) {
ByteBuf tmpBuf = buffer;
this.pooledBuffer = buffer = allocator.allocateBuffer();
ret = ret.thenCompose(v -> response.writeNonBlocking(tmpBuf, false));

if (bufferCount == 1) {
pooledBuffer = allocator.allocateBuffer();
pooledBuffer.writeBytes(b);
} else {
for (int i = 0; i < bufferCount - 1; i++) {
int bufferIndex = i;
ret = ret.thenCompose(v -> {
ByteBuf tmpBuf = allocator.allocateBuffer();
tmpBuf.writeBytes(b, bufferIndex * bufferSize, bufferSize);
return response.writeNonBlocking(tmpBuf, false);
});
}
pooledBuffer = allocator.allocateBuffer();
pooledBuffer.writeBytes(b, (bufferCount - 1) * bufferSize, remainder);
}

return ret.thenCompose(v -> asyncUpdateWritten(len));
}

Expand Down

0 comments on commit 627e127

Please sign in to comment.