Skip to content

Commit

Permalink
Revert limiting synchronization and Add maxChunkSize
Browse files Browse the repository at this point in the history
maxChunkSize to current MaxChunkSize, at least 8192
  • Loading branch information
fredericBregier committed Nov 25, 2023
1 parent d18bf3f commit c0ea8f1
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public int available() throws IOException {
return 0;
}
if (pooled != null && pooled.isReadable()) {
return pooled.readableBytes();
return pooled.readableBytes() + exchange.readBytesAvailable();
}

return exchange.readBytesAvailable();
Expand Down Expand Up @@ -235,7 +235,6 @@ public void handle(Throwable event) {

protected ByteBuf readBlocking() throws IOException {
long expire = System.currentTimeMillis() + timeout;
Buffer ret = null;
synchronized (request.connection()) {
while (input1 == null && !eof && readException == null) {
long rem = expire - System.currentTimeMillis();
Expand Down Expand Up @@ -263,16 +262,18 @@ protected ByteBuf readBlocking() throws IOException {
if (readException != null) {
throw new IOException(readException);
}
ret = input1;
Buffer ret = input1;
input1 = null;
if (inputOverflow != null) {
input1 = inputOverflow.poll();
if (input1 == null) {
request.fetch(1);
}
} else if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}
if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
package org.jboss.resteasy.reactive.client.handlers;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

import io.vertx.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Adapt an InputStream to a ReadStream that can be used with a Pump in Vertx.
*/
public class AsyncInputStream implements ReadStream<Buffer>, AutoCloseable {
public static final String INPUTSTREAM_IS_CLOSED = "Inputstream is closed";
private static int BUF_SIZE = 8192;
// Based on the inputStream with the real data
private final InputStream in;
private final Context context;
private final InboundBuffer<Buffer> queue;
private final byte[] bytes = new byte[BUF_SIZE];
private final byte[] bytes;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean readInProgress = new AtomicBoolean(false);
private Handler<Buffer> dataHandler;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;
private final int maxChunkSize;

/**
* Create a new Async InputStream that can we used with a Pump
*/
public AsyncInputStream(final Vertx vertx, final InputStream in) {
public AsyncInputStream(final Vertx vertx, final InputStream in, final int maxChunkSize) {
this.maxChunkSize = Math.max(maxChunkSize, 8192);
bytes = new byte[this.maxChunkSize];
this.context = vertx.getOrCreateContext();
this.in = in;
queue = new InboundBuffer<>(context, 0);
Expand Down Expand Up @@ -128,7 +134,7 @@ public synchronized AsyncInputStream endHandler(final Handler<Void> endHandler)

private void doRead() {
checkClose();
doRead(BUF_SIZE);
doRead(maxChunkSize);
}

private void doRead(final int len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ private ReadStream<Buffer> setRequestHeadersForSendingInputStream(HttpClientRequ
InputStream inputStream = (InputStream) entity.getEntity();
httpClientRequest.setChunked(true);
Vertx vertx = Vertx.currentContext().owner();
ReadStream<Buffer> readStream = new AsyncInputStream(vertx, inputStream);
ReadStream<Buffer> readStream = new AsyncInputStream(vertx, inputStream, maxChunkSize);
// set the Vertx headers after we've run the interceptors because they can modify them
setVertxHeaders(httpClientRequest, headerMap);
return readStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public int available() throws IOException {
return 0;
}
if (pooled != null && pooled.isReadable()) {
return pooled.readableBytes();
return pooled.readableBytes() + exchange.readBytesAvailable();
}

return exchange.readBytesAvailable();
Expand Down Expand Up @@ -190,7 +190,6 @@ public void handle(Throwable event) {

protected ByteBuf readBlocking() throws IOException {
long expire = System.currentTimeMillis() + timeout;
Buffer ret = null;
synchronized (VertxBlockingInput.this) {
while (input1 == null && !eof && readException == null) {
long rem = expire - System.currentTimeMillis();
Expand Down Expand Up @@ -218,16 +217,18 @@ protected ByteBuf readBlocking() throws IOException {
if (readException != null) {
throw new IOException(readException);
}
ret = input1;
Buffer ret = input1;
input1 = null;
if (inputOverflow != null) {
input1 = inputOverflow.poll();
if (input1 == null) {
request.fetch(1);
}
} else if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}
if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public int available() throws IOException {
return 0;
}
if (pooled != null && pooled.isReadable()) {
return pooled.readableBytes();
return pooled.readableBytes() + exchange.readBytesAvailable();
}

return exchange.readBytesAvailable();
Expand Down Expand Up @@ -236,7 +236,6 @@ public void handle(Throwable event) {

protected ByteBuf readBlocking() throws IOException {
long expire = System.currentTimeMillis() + timeout;
Buffer ret = null;
synchronized (request.connection()) {
while (input1 == null && !eof && readException == null) {
long rem = expire - System.currentTimeMillis();
Expand Down Expand Up @@ -264,16 +263,18 @@ protected ByteBuf readBlocking() throws IOException {
if (readException != null) {
throw new IOException(readException);
}
ret = input1;
Buffer ret = input1;
input1 = null;
if (inputOverflow != null) {
input1 = inputOverflow.poll();
if (input1 == null) {
request.fetch(1);
}
} else if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}
if (!eof) {
request.fetch(1);
}
return ret == null ? null : ret.getByteBuf();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.jboss.resteasy.reactive.server.vertx.test.inputstream;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down

0 comments on commit c0ea8f1

Please sign in to comment.