From ebd18c88397b82565b4727eb965d453b4cad7e60 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 3 Feb 2021 17:15:11 +1100 Subject: [PATCH] Minor Resteast Reactive Servlet fixes --- .../common/runtime/ArcThreadSetupAction.java | 9 ++ .../runtime/ServletRequestContext.java | 120 ++++++++++++------ .../runtime/ServletRequestContextFactory.java | 5 + .../server/runtime/BlockingInputHandler.java | 10 +- .../runtime/ResteasyReactiveRecorder.java | 1 + .../core/AbstractResteasyReactiveContext.java | 15 ++- .../reactive/spi/ThreadSetupAction.java | 7 + .../server/core/RequestContextFactory.java | 8 ++ .../core/ResteasyReactiveRequestContext.java | 11 +- .../startup/RuntimeDeploymentManager.java | 2 +- .../startup/RuntimeResourceDeployment.java | 36 ++++-- .../server/handlers/InputHandler.java | 3 +- .../server/spi/ServerHttpRequest.java | 2 + .../VertxResteasyReactiveRequestContext.java | 8 ++ 14 files changed, 169 insertions(+), 68 deletions(-) diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ArcThreadSetupAction.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ArcThreadSetupAction.java index f4024a5637185a..a64d3ae4d7901d 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ArcThreadSetupAction.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ArcThreadSetupAction.java @@ -17,6 +17,10 @@ public ArcThreadSetupAction(ManagedContext managedContext) { public ThreadState activateInitial() { managedContext.activate(); InjectableContext.ContextState state = managedContext.getState(); + return toThreadState(state); + } + + private ThreadState toThreadState(InjectableContext.ContextState state) { return new ThreadState() { @Override public void close() { @@ -34,4 +38,9 @@ public void deactivate() { } }; } + + @Override + public ThreadState currentState() { + return toThreadState(managedContext.getState()); + } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java index 37a67aa9a142c6..ab2a6725e428ea 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContext.java @@ -38,6 +38,7 @@ import io.quarkus.arc.Arc; import io.quarkus.arc.impl.LazyValue; import io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveSecurityContext; +import io.quarkus.runtime.BlockingOperationControl; import io.quarkus.security.identity.SecurityIdentity; import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser; import io.undertow.server.HttpServerExchange; @@ -58,6 +59,7 @@ public class ServletRequestContext extends ResteasyReactiveRequestContext AsyncContext asyncContext; ServletWriteListener writeListener; byte[] asyncWriteData; + boolean closed; Consumer asyncWriteHandler; protected Consumer preCommitTask; @@ -81,10 +83,14 @@ protected void beginAsyncProcessing() { } @Override - public void close() { - super.close(); - if (asyncContext != null) { - asyncContext.complete(); + public synchronized void close() { + if (asyncWriteData != null) { + closed = true; + } else { + super.close(); + if (asyncContext != null) { + asyncContext.complete(); + } } } @@ -197,7 +203,7 @@ public String getRequestNormalisedPath() { @Override public String getRequestAbsoluteUri() { - return request.getRequestURI(); + return request.getRequestURL().toString(); } @Override @@ -279,6 +285,15 @@ public InputStream createInputStream(ByteBuffer existingData) { return new ServletResteasyReactiveInputStream(existingData, request); } + @Override + public InputStream createInputStream() { + try { + return request.getInputStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public ServerHttpResponse pauseRequestInput() { //TODO @@ -346,23 +361,31 @@ public boolean headWritten() { @Override public ServerHttpResponse end(byte[] data) { - try { - response.getOutputStream().write(data); - response.getOutputStream().close(); - } catch (IOException e) { - log.debug("IoException writing response", e); + if (BlockingOperationControl.isBlockingAllowed()) { + try { + response.getOutputStream().write(data); + response.getOutputStream().close(); + } catch (IOException e) { + log.debug("IoException writing response", e); + } + } else { + write(data, new Consumer() { + @Override + public void accept(Throwable throwable) { + try { + response.getOutputStream().close(); + } catch (IOException e) { + log.debug("IoException writing response", e); + } + } + }); } return this; } @Override public ServerHttpResponse end(String data) { - try { - response.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8)); - response.getOutputStream().close(); - } catch (IOException e) { - log.debug("IoException writing response", e); - } + end(data.getBytes(StandardCharsets.UTF_8)); return this; } @@ -422,17 +445,22 @@ public ServerHttpResponse write(byte[] data, Consumer asyncResultHand asyncResultHandler.accept(e); } } else { - if (writeListener == null) { - try { - ServletOutputStream outputStream = response.getOutputStream(); - outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream)); - } catch (IOException e) { - asyncResultHandler.accept(e); + synchronized (this) { + if (asyncWriteData != null) { + throw new IllegalStateException("Cannot write more than one piece of async data at a time"); } - } else { asyncWriteData = data; asyncWriteHandler = asyncResultHandler; - writeListener.onWritePossible(); + if (writeListener == null) { + try { + ServletOutputStream outputStream = response.getOutputStream(); + outputStream.setWriteListener(writeListener = new ServletWriteListener(outputStream)); + } catch (IOException e) { + asyncResultHandler.accept(e); + } + } else { + writeListener.onWritePossible(); + } } } return this; @@ -484,29 +512,37 @@ class ServletWriteListener implements WriteListener { } @Override - public synchronized void onWritePossible() { - if (!outputStream.isReady()) { - return; - } - Consumer ctx = asyncWriteHandler; - byte[] data = asyncWriteData; - asyncWriteHandler = null; - asyncWriteData = null; - try { - outputStream.write(data); - ctx.accept(null); - } catch (IOException e) { - ctx.accept(e); + public void onWritePossible() { + synchronized (ServletRequestContext.this) { + if (!outputStream.isReady()) { + return; + } + Consumer ctx = asyncWriteHandler; + byte[] data = asyncWriteData; + asyncWriteHandler = null; + asyncWriteData = null; + try { + outputStream.write(data); + ctx.accept(null); + } catch (IOException e) { + ctx.accept(e); + } + if (closed) { + close(); + } } } @Override public synchronized void onError(Throwable t) { - if (asyncWriteHandler != null) { - Consumer ctx = asyncWriteHandler; - asyncWriteHandler = null; - asyncWriteData = null; - ctx.accept(t); + synchronized (ServletRequestContext.this) { + if (asyncWriteHandler != null) { + Consumer ctx = asyncWriteHandler; + asyncWriteHandler = null; + asyncWriteData = null; + ctx.accept(t); + close(); + } } } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContextFactory.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContextFactory.java index 2cc20536b9e09c..25462aaa571731 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContextFactory.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-servlet/runtime/src/main/java/io/quarkus/resteasy/reactive/server/servlet/runtime/ServletRequestContextFactory.java @@ -26,4 +26,9 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment, Provi (HttpServletResponse) src.getServletResponse(), requestContext, handlerChain, abortHandlerChain, (RoutingContext) ((VertxHttpExchange) src.getExchange().getDelegate()).getContext(), src.getExchange()); } + + @Override + public boolean isDefaultBlocking() { + return true; + } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java index 13a4e844885c40..9b79f8fcbd2eaf 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/BlockingInputHandler.java @@ -34,10 +34,12 @@ public void handle(ResteasyReactiveRequestContext context) throws Exception { context.serverRequest().getRequestMethod().equals(HttpMethod.HEAD)) { return; } - - VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context; - RoutingContext routingContext = vertxContext.getContext(); - vertxContext.setInputStream(new VertxInputStream(routingContext, timeout)); + if (context instanceof VertxResteasyReactiveRequestContext) { + //TODO: this should not be installed for servlet + VertxResteasyReactiveRequestContext vertxContext = (VertxResteasyReactiveRequestContext) context; + RoutingContext routingContext = vertxContext.getContext(); + vertxContext.setInputStream(new VertxInputStream(routingContext, timeout)); + } } } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java index fda225dad1c23d..f3c33092d750c1 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/runtime/src/main/java/io/quarkus/resteasy/reactive/server/runtime/ResteasyReactiveRecorder.java @@ -102,6 +102,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment, handlerChain, abortHandlerChain, currentIdentityAssociation); } + }; } diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java index 8b37cd6f51bd93..2c88d396c0e906 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java @@ -116,6 +116,9 @@ public void run() { } requestScopeActivated = false; } + } else { + requestScopeActivated = false; + requestScopeDeactivated(); } if (this.executor != null) { //resume happened in the meantime @@ -179,12 +182,16 @@ public void requireCDIRequestScope() { return; } requestScopeActivated = true; - if (currentRequestScope == null) { - currentRequestScope = requestContext.activateInitial(); - handleRequestScopeActivation(); + if (isRequestScopeManagementRequired()) { + if (currentRequestScope == null) { + currentRequestScope = requestContext.activateInitial(); + } else { + currentRequestScope.activate(); + } } else { - currentRequestScope.activate(); + currentRequestScope = requestContext.currentState(); } + handleRequestScopeActivation(); } protected abstract void handleRequestScopeActivation(); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/spi/ThreadSetupAction.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/spi/ThreadSetupAction.java index 8f5e78ede8f66b..912614f2c69390 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/spi/ThreadSetupAction.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/spi/ThreadSetupAction.java @@ -4,6 +4,8 @@ public interface ThreadSetupAction { ThreadState activateInitial(); + ThreadState currentState(); + interface ThreadState { void close(); @@ -32,5 +34,10 @@ public void deactivate() { } }; } + + @Override + public ThreadState currentState() { + return activateInitial(); + } }; } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/RequestContextFactory.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/RequestContextFactory.java index 6004982112270e..373c84a2ecfc64 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/RequestContextFactory.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/RequestContextFactory.java @@ -9,4 +9,12 @@ public interface RequestContextFactory { ResteasyReactiveRequestContext createContext(Deployment deployment, ProvidersImpl providers, Object context, ThreadSetupAction requestContext, ServerRestHandler[] handlerChain, ServerRestHandler[] abortHandlerChain); + + /** + * @return true if requests default to blocking when created by this factory + */ + default boolean isDefaultBlocking() { + return false; + } + } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/ResteasyReactiveRequestContext.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/ResteasyReactiveRequestContext.java index de76c13e1aacd5..0b875e61aeed19 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/ResteasyReactiveRequestContext.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/ResteasyReactiveRequestContext.java @@ -25,7 +25,6 @@ import javax.ws.rs.ext.ReaderInterceptor; import javax.ws.rs.ext.WriterInterceptor; import org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext; -import org.jboss.resteasy.reactive.common.util.EmptyInputStream; import org.jboss.resteasy.reactive.common.util.Encode; import org.jboss.resteasy.reactive.common.util.PathSegmentImpl; import org.jboss.resteasy.reactive.server.core.serialization.EntityWriter; @@ -128,7 +127,7 @@ public abstract class ResteasyReactiveRequestContext /** * The input stream, if an entity is present. */ - private InputStream inputStream = EmptyInputStream.INSTANCE; + private InputStream inputStream; /** * used for {@link UriInfo#getMatchedURIs()} @@ -634,6 +633,7 @@ protected void handleRequestScopeActivation() { @Override protected void requestScopeDeactivated() { + CurrentRequestManager.set(null); } @Override @@ -698,7 +698,14 @@ public List getMatchedURIs() { return matchedURIs; } + public boolean hasInputStream() { + return inputStream != null; + } + public InputStream getInputStream() { + if (inputStream == null) { + inputStream = serverRequest().createInputStream(); + } return inputStream; } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java index 27279da12bf2b8..5f01c2d256cb78 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeDeploymentManager.java @@ -96,7 +96,7 @@ public BeanFactory.BeanInstance apply(Class aClass) { }); RuntimeResourceDeployment runtimeResourceDeployment = new RuntimeResourceDeployment(info, executorSupplier, blockingInputHandlerSupplier, - interceptorDeployment, dynamicEntityWriter, resourceLocatorHandler); + interceptorDeployment, dynamicEntityWriter, resourceLocatorHandler, requestContextFactory.isDefaultBlocking()); List possibleSubResource = new ArrayList<>(locatableResourceClasses); possibleSubResource.addAll(resourceClasses); //the TCK uses normal resources also as sub resources for (ResourceClass clazz : possibleSubResource) { diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java index 945aec6d173f40..7ff6bbb4b91ff5 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java @@ -94,11 +94,15 @@ public class RuntimeResourceDeployment { private final RuntimeInterceptorDeployment runtimeInterceptorDeployment; private final DynamicEntityWriter dynamicEntityWriter; private final ResourceLocatorHandler resourceLocatorHandler; + /** + * If the runtime will always default to blocking (e.g. Servlet) + */ + private final boolean defaultBlocking; public RuntimeResourceDeployment(DeploymentInfo info, Supplier executorSupplier, Supplier blockingInputHandlerSupplier, RuntimeInterceptorDeployment runtimeInterceptorDeployment, DynamicEntityWriter dynamicEntityWriter, - ResourceLocatorHandler resourceLocatorHandler) { + ResourceLocatorHandler resourceLocatorHandler, boolean defaultBlocking) { this.info = info; this.serialisers = info.getSerialisers(); this.quarkusRestConfig = info.getConfig(); @@ -107,6 +111,7 @@ public RuntimeResourceDeployment(DeploymentInfo info, Supplier executo this.runtimeInterceptorDeployment = runtimeInterceptorDeployment; this.dynamicEntityWriter = dynamicEntityWriter; this.resourceLocatorHandler = resourceLocatorHandler; + this.defaultBlocking = defaultBlocking; } public RuntimeResource buildResourceMethod(ResourceClass clazz, @@ -150,11 +155,14 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, // when a method is blocking, we also want all the request filters to run on the worker thread // because they can potentially set thread local variables - if (method.isBlocking()) { - handlers.add(new BlockingHandler(executorSupplier)); - score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionBlocking); - } else { - score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionNonBlocking); + //we don't need to run this for Servlet and other runtimes that default to blocking + if (!defaultBlocking) { + if (method.isBlocking()) { + handlers.add(new BlockingHandler(executorSupplier)); + score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionBlocking); + } else { + score.add(ScoreSystem.Category.Execution, ScoreSystem.Diagnostic.ExecutionNonBlocking); + } } //spec doesn't seem to test this, but RESTEasy does not run request filters again for sub resources (which makes sense) @@ -176,16 +184,18 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, } } // form params can be everywhere (field, beanparam, param) - if (method.isFormParamRequired()) { + if (method.isFormParamRequired() && !defaultBlocking) { // read the body as multipart in one go handlers.add(new ReadBodyHandler(bodyParameter != null)); } else if (bodyParameter != null) { - if (method.isBlocking() && (blockingInputHandlerSupplier != null)) { - // when the method is blocking, we will already be on a worker thread - handlers.add(blockingInputHandlerSupplier.get()); - } else { - // allow the body to be read by chunks - handlers.add(new InputHandler(quarkusRestConfig.getInputBufferSize(), executorSupplier)); + if (!defaultBlocking) { + if (method.isBlocking() && (blockingInputHandlerSupplier != null)) { + // when the method is blocking, we will already be on a worker thread + handlers.add(blockingInputHandlerSupplier.get()); + } else if (!method.isBlocking()) { + // allow the body to be read by chunks + handlers.add(new InputHandler(quarkusRestConfig.getInputBufferSize(), executorSupplier)); + } } } // if we need the body, let's deserialize it diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java index 262ff6d77cfb37..a81a35031d86b2 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/InputHandler.java @@ -7,7 +7,6 @@ import java.util.concurrent.Executor; import java.util.function.Supplier; import javax.ws.rs.HttpMethod; -import org.jboss.resteasy.reactive.common.util.EmptyInputStream; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; import org.jboss.resteasy.reactive.server.spi.ServerHttpRequest; import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; @@ -36,7 +35,7 @@ public InputHandler(long maxBufferSize, Supplier supplier) { public void handle(ResteasyReactiveRequestContext context) throws Exception { // in some cases, with sub-resource locators or via request filters, // it's possible we've already read the entity - if (context.getInputStream() != EmptyInputStream.INSTANCE) { + if (context.hasInputStream()) { // let's not set it twice return; } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpRequest.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpRequest.java index 7bbf2f02e5aa9a..dab47b72d4bcff 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpRequest.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/ServerHttpRequest.java @@ -48,6 +48,8 @@ public interface ServerHttpRequest { InputStream createInputStream(ByteBuffer existingData); + InputStream createInputStream(); + ServerHttpResponse pauseRequestInput(); ServerHttpResponse resumeRequestInput(); diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java index 860f658a77c02d..ca4f2ebee350cb 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/VertxResteasyReactiveRequestContext.java @@ -188,9 +188,17 @@ public void setExpectMultipart(boolean expectMultipart) { @Override public InputStream createInputStream(ByteBuffer existingData) { + if (existingData == null) { + return createInputStream(); + } return new VertxInputStream(context, 10000, Unpooled.wrappedBuffer(existingData), this); } + @Override + public InputStream createInputStream() { + return new VertxInputStream(context, 10000, this); + } + @Override public ServerHttpResponse pauseRequestInput() { request.pause();