diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/customexceptions/PerClassThrowableExceptionMapperTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/customexceptions/PerClassThrowableExceptionMapperTest.java index 3dbec8fb460c0..104605ee53882 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/customexceptions/PerClassThrowableExceptionMapperTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/customexceptions/PerClassThrowableExceptionMapperTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import io.quarkus.resteasy.reactive.server.test.ExceptionUtil; import io.quarkus.test.QuarkusUnitTest; import io.restassured.RestAssured; @@ -26,7 +27,7 @@ public class PerClassThrowableExceptionMapperTest { @Override public JavaArchive get() { return ShrinkWrap.create(JavaArchive.class) - .addClasses(HasCustomThrowableHandlerResource.class); + .addClasses(HasCustomThrowableHandlerResource.class, ExceptionUtil.class); } }); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stress/SuspendResumeStressTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stress/SuspendResumeStressTest.java new file mode 100644 index 0000000000000..8ee15d769d0c0 --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/stress/SuspendResumeStressTest.java @@ -0,0 +1,106 @@ +package io.quarkus.resteasy.reactive.server.test.stress; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; + +import org.hamcrest.Matchers; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.MethodInfo; +import org.jboss.resteasy.reactive.common.model.ResourceClass; +import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; +import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer; +import org.jboss.resteasy.reactive.server.model.ServerResourceMethod; +import org.jboss.resteasy.reactive.server.processor.scanning.MethodScanner; +import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.builder.BuildChainBuilder; +import io.quarkus.builder.BuildContext; +import io.quarkus.builder.BuildStep; +import io.quarkus.resteasy.reactive.server.spi.MethodScannerBuildItem; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Tests lots of suspends/resumes per request + */ +public class SuspendResumeStressTest { + + private static volatile ExecutorService executorService; + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .addBuildChainCustomizer(new Consumer() { + @Override + public void accept(BuildChainBuilder buildChainBuilder) { + buildChainBuilder.addBuildStep(new BuildStep() { + @Override + public void execute(BuildContext context) { + context.produce(new MethodScannerBuildItem(new MethodScanner() { + @Override + public List scan(MethodInfo method, ClassInfo actualEndpointClass, + Map methodContext) { + return Collections.singletonList(new Custom()); + } + })); + } + }).produces(MethodScannerBuildItem.class).build(); + } + }) + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClass(HelloResource.class)); + + @Test + public void testSuspendResumeStressTest() { + executorService = Executors.newFixedThreadPool(10); + try { + for (int i = 0; i < 100; ++i) { + RestAssured.when().get("/hello").then().body(Matchers.is("hello")); + } + } finally { + executorService.shutdownNow(); + executorService = null; + } + } + + @Path("hello") + public static class HelloResource { + + @GET + public String hello() { + return "hello"; + } + + } + + public static class Custom implements HandlerChainCustomizer { + @Override + public List handlers(Phase phase, ResourceClass resourceClass, ServerResourceMethod resourceMethod) { + List handlers = new ArrayList<>(); + for (int i = 0; i < 100; ++i) { + handlers.add(new ResumeHandler()); + } + return handlers; + } + } + + public static class ResumeHandler implements ServerRestHandler { + + @Override + public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { + requestContext.suspend(); + executorService.execute(requestContext::resume); + } + } +} diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java index 8199390a09219..7cfe4c05513d3 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/core/AbstractResteasyReactiveContext.java @@ -132,7 +132,6 @@ public void run() { //unless there are pre-mapping filters as these may require CDI boolean disasociateRequestScope = false; boolean aborted = false; - Executor exec = null; try { while (position < handlers.length) { int pos = position; @@ -152,15 +151,7 @@ public void run() { requestScopeActivated = false; requestScopeDeactivated(); } - if (this.executor != null) { - //resume happened in the meantime - suspended = false; - exec = this.executor; - // prevent future suspensions from re-submitting the task - this.executor = null; - return; - } else if (suspended) { - running = false; + if (suspended) { processingSuspended = true; return; } @@ -184,7 +175,6 @@ public void run() { // we need to make sure we don't close the underlying stream in the event loop if the task // has been offloaded to the executor if ((position == handlers.length && !processingSuspended) || aborted) { - exec = null; close(); } else { if (disasociateRequestScope) { @@ -192,14 +182,27 @@ public void run() { currentRequestScope.deactivate(); } beginAsyncProcessing(); - } - if (exec != null) { - //outside sync block - exec.execute(this); - } else { + Executor exec = null; + boolean resumed = false; synchronized (this) { running = false; + if (this.executor != null) { + //resume happened in the meantime + suspended = false; + exec = this.executor; + // prevent future suspensions from re-submitting the task + this.executor = null; + } else if (!suspended) { + resumed = true; + } + } + if (exec != null) { + //outside sync block + exec.execute(this); + } else if (resumed) { + resume(); } + } } }