Skip to content

Commit

Permalink
Minor Resteast Reactive Servlet fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Feb 3, 2021
1 parent 9d1070b commit 84e6922
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.quarkus.arc.Arc;
import io.quarkus.arc.impl.LazyValue;
import io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveSecurityContext;
import io.quarkus.runtime.BlockingOperationControl;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.undertow.server.HttpServerExchange;
Expand All @@ -58,6 +59,7 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext
AsyncContext asyncContext;
ServletWriteListener writeListener;
byte[] asyncWriteData;
boolean closed;
Consumer<Throwable> asyncWriteHandler;
protected Consumer<ResteasyReactiveRequestContext> preCommitTask;

Expand All @@ -69,6 +71,11 @@ public ServletRequestContext(Deployment deployment, ProvidersImpl providers,
this.request = request;
this.response = response;
this.context = context;
try {
setInputStream(request.getInputStream());
} catch (IOException e) {
throw new RuntimeException(e);
}
exchange.addResponseCommitListener(this);
}

Expand All @@ -81,10 +88,14 @@ protected void beginAsyncProcessing() {
}

@Override
public void close() {
super.close();
if (asyncContext != null) {
asyncContext.complete();
public synchronized void close() {
if (asyncWriteData != null) {
closed = true;
} else {
super.close();
if (asyncContext != null) {
asyncContext.complete();
}
}
}

Expand Down Expand Up @@ -346,23 +357,31 @@ public boolean headWritten() {

@Override
public ServerHttpResponse end(byte[] data) {
try {
response.getOutputStream().write(data);
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
if (BlockingOperationControl.isBlockingAllowed()) {
try {
response.getOutputStream().write(data);
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
} else {
write(data, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
}
});
}
return this;
}

@Override
public ServerHttpResponse end(String data) {
try {
response.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));
response.getOutputStream().close();
} catch (IOException e) {
log.debug("IoException writing response", e);
}
end(data.getBytes(StandardCharsets.UTF_8));
return this;
}

Expand Down Expand Up @@ -422,17 +441,22 @@ public ServerHttpResponse write(byte[] data, Consumer<Throwable> asyncResultHand
asyncResultHandler.accept(e);
}
} else {
if (writeListener == null) {
try {
ServletOutputStream outputStream = response.getOutputStream();
outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream));
} catch (IOException e) {
asyncResultHandler.accept(e);
synchronized (this) {
if (asyncWriteData != null) {
throw new IllegalStateException("Cannot write more than one piece of async data at a time");
}
} else {
asyncWriteData = data;
asyncWriteHandler = asyncResultHandler;
writeListener.onWritePossible();
if (writeListener == null) {
try {
ServletOutputStream outputStream = response.getOutputStream();
outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream));
} catch (IOException e) {
asyncResultHandler.accept(e);
}
} else {
writeListener.onWritePossible();
}
}
}
return this;
Expand Down Expand Up @@ -484,29 +508,37 @@ class ServletWriteListener implements WriteListener {
}

@Override
public synchronized void onWritePossible() {
if (!outputStream.isReady()) {
return;
}
Consumer<Throwable> ctx = asyncWriteHandler;
byte[] data = asyncWriteData;
asyncWriteHandler = null;
asyncWriteData = null;
try {
outputStream.write(data);
ctx.accept(null);
} catch (IOException e) {
ctx.accept(e);
public void onWritePossible() {
synchronized (ServletRequestContext.this) {
if (!outputStream.isReady()) {
return;
}
Consumer<Throwable> ctx = asyncWriteHandler;
byte[] data = asyncWriteData;
asyncWriteHandler = null;
asyncWriteData = null;
try {
outputStream.write(data);
ctx.accept(null);
} catch (IOException e) {
ctx.accept(e);
}
if (closed) {
close();
}
}
}

@Override
public synchronized void onError(Throwable t) {
if (asyncWriteHandler != null) {
Consumer<Throwable> ctx = asyncWriteHandler;
asyncWriteHandler = null;
asyncWriteData = null;
ctx.accept(t);
synchronized (ServletRequestContext.this) {
if (asyncWriteHandler != null) {
Consumer<Throwable> ctx = asyncWriteHandler;
asyncWriteHandler = null;
asyncWriteData = null;
ctx.accept(t);
close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public void handle(ResteasyReactiveRequestContext context) throws Exception {
context.serverRequest().getRequestMethod().equals(HttpMethod.HEAD)) {
return;
}

VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context;
RoutingContext routingContext = vertxContext.getContext();
vertxContext.setInputStream(new VertxInputStream(routingContext, timeout));
if (context instanceof VertxResteasyReactiveRequestContext) {
//TODO: this should not be installed for servlet
VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context;
RoutingContext routingContext = vertxContext.getContext();
vertxContext.setInputStream(new VertxInputStream(routingContext, timeout));
}
}

}

0 comments on commit 84e6922

Please sign in to comment.