From 84e6922dc2d3b89494c104b20021d9adf05fa73d Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 3 Feb 2021 17:15:11 +1100 Subject: [PATCH] Minor Resteast Reactive Servlet fixes --- .../runtime/ServletRequestContext.java | 114 +++++++++++------- .../server/runtime/BlockingInputHandler.java | 10 +- 2 files changed, 79 insertions(+), 45 deletions(-) diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java index 37a67aa9a142c6..fbbc947225507c 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java @@ -38,6 +38,7 @@ import io.quarkus.arc.Arc; import io.quarkus.arc.impl.LazyValue; import io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveSecurityContext; +import io.quarkus.runtime.BlockingOperationControl; import io.quarkus.security.identity.SecurityIdentity; import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; import io.undertow.server.HttpServerExchange; @@ -58,6 +59,7 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext AsyncContext asyncContext; ServletWriteListener writeListener; byte[] asyncWriteData; + boolean closed; Consumer asyncWriteHandler; protected Consumer preCommitTask; @@ -69,6 +71,11 @@ public ServletRequestContext(Deployment deployment, ProvidersImpl providers, this.request = request; this.response = response; this.context = context; + try { + setInputStream(request.getInputStream()); + } catch (IOException e) { + throw new RuntimeException(e); + } exchange.addResponseCommitListener(this); } @@ -81,10 +88,14 @@ protected void beginAsyncProcessing() { } @Override - public void close() { - super.close(); - if (asyncContext != null) { - asyncContext.complete(); + public synchronized void close() { + if (asyncWriteData != null) { + closed = true; + } else { + super.close(); + if (asyncContext != null) { + asyncContext.complete(); + } } } @@ -346,23 +357,31 @@ public boolean headWritten() { @Override public ServerHttpResponse end(byte[] data) { - try { - response.getOutputStream().write(data); - response.getOutputStream().close(); - } catch (IOException e) { - log.debug("IoException writing response", e); + if (BlockingOperationControl.isBlockingAllowed()) { + try { + response.getOutputStream().write(data); + response.getOutputStream().close(); + } catch (IOException e) { + log.debug("IoException writing response", e); + } + } else { + write(data, new Consumer() { + @Override + public void accept(Throwable throwable) { + try { + response.getOutputStream().close(); + } catch (IOException e) { + log.debug("IoException writing response", e); + } + } + }); } return this; } @Override public ServerHttpResponse end(String data) { - try { - response.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8)); - response.getOutputStream().close(); - } catch (IOException e) { - log.debug("IoException writing response", e); - } + end(data.getBytes(StandardCharsets.UTF_8)); return this; } @@ -422,17 +441,22 @@ public ServerHttpResponse write(byte[] data, Consumer asyncResultHand asyncResultHandler.accept(e); } } else { - if (writeListener == null) { - try { - ServletOutputStream outputStream = response.getOutputStream(); - outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream)); - } catch (IOException e) { - asyncResultHandler.accept(e); + synchronized (this) { + if (asyncWriteData != null) { + throw new IllegalStateException("Cannot write more than one piece of async data at a time"); } - } else { asyncWriteData = data; asyncWriteHandler = asyncResultHandler; - writeListener.onWritePossible(); + if (writeListener == null) { + try { + ServletOutputStream outputStream = response.getOutputStream(); + outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream)); + } catch (IOException e) { + asyncResultHandler.accept(e); + } + } else { + writeListener.onWritePossible(); + } } } return this; @@ -484,29 +508,37 @@ class ServletWriteListener implements WriteListener { } @Override - public synchronized void onWritePossible() { - if (!outputStream.isReady()) { - return; - } - Consumer ctx = asyncWriteHandler; - byte[] data = asyncWriteData; - asyncWriteHandler = null; - asyncWriteData = null; - try { - outputStream.write(data); - ctx.accept(null); - } catch (IOException e) { - ctx.accept(e); + public void onWritePossible() { + synchronized (ServletRequestContext.this) { + if (!outputStream.isReady()) { + return; + } + Consumer ctx = asyncWriteHandler; + byte[] data = asyncWriteData; + asyncWriteHandler = null; + asyncWriteData = null; + try { + outputStream.write(data); + ctx.accept(null); + } catch (IOException e) { + ctx.accept(e); + } + if (closed) { + close(); + } } } @Override public synchronized void onError(Throwable t) { - if (asyncWriteHandler != null) { - Consumer ctx = asyncWriteHandler; - asyncWriteHandler = null; - asyncWriteData = null; - ctx.accept(t); + synchronized (ServletRequestContext.this) { + if (asyncWriteHandler != null) { + Consumer ctx = asyncWriteHandler; + asyncWriteHandler = null; + asyncWriteData = null; + ctx.accept(t); + close(); + } } } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java index 13a4e844885c40..9b79f8fcbd2eaf 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java @@ -34,10 +34,12 @@ public void handle(ResteasyReactiveRequestContext context) throws Exception { context.serverRequest().getRequestMethod().equals(HttpMethod.HEAD)) { return; } - - VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context; - RoutingContext routingContext = vertxContext.getContext(); - vertxContext.setInputStream(new VertxInputStream(routingContext, timeout)); + if (context instanceof VertxResteasyReactiveRequestContext) { + //TODO: this should not be installed for servlet + VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context; + RoutingContext routingContext = vertxContext.getContext(); + vertxContext.setInputStream(new VertxInputStream(routingContext, timeout)); + } } }