diff --git a/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt b/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt index fa7fc5a8ec64a..7b0824ab3f7a2 100644 --- a/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt +++ b/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/CoroutineInvocationHandler.kt @@ -6,8 +6,8 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext +import org.jboss.resteasy.reactive.server.spi.AbstractCancellableServerRestHandler import org.jboss.resteasy.reactive.server.spi.EndpointInvoker -import org.jboss.resteasy.reactive.server.spi.ServerRestHandler import org.slf4j.LoggerFactory private val logger = LoggerFactory.getLogger(CoroutineInvocationHandler::class.java) @@ -15,7 +15,7 @@ private val logger = LoggerFactory.getLogger(CoroutineInvocationHandler::class.j class CoroutineInvocationHandler( private val invoker: EndpointInvoker, private val coroutineScope: CoroutineScope -) : ServerRestHandler { +) : AbstractCancellableServerRestHandler() { private val originalTCCL: ClassLoader = Thread.currentThread().contextClassLoader @@ -74,7 +74,7 @@ class CoroutineInvocationHandler( } requestContext.serverResponse().addCloseHandler { - if (!done.get()) { + if (isCancellable && !done.get()) { try { canceled.set(true) job.cancel(null) diff --git a/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java index 3b1f003566ff7..ca673fba96569 100644 --- a/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java +++ b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java @@ -15,6 +15,7 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import org.jboss.resteasy.reactive.server.Cancellable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -44,17 +45,35 @@ void setUp() { @Test public void testNormal() { - when().get("test") + doTestNormal("1"); + } + + @Test + public void testDefaultCancellable() { + doTestCancel("1", Resource.COUNT, 1); + } + + @Test + public void testUnCancellable() { + doTestCancel("2", Resource.COUNT, 2); + } + + @Test + public void testCancellable() { + doTestCancel("3", Resource.COUNT, 1); + } + + private void doTestNormal(String path) { + when().get("test/" + path) .then() .statusCode(200) .body(equalTo("Hello, world")); } - @Test - public void testCancel() { + private void doTestCancel(String path, AtomicInteger count, int expected) { WebClient client = WebClient.create(vertx); - client.get(url.getPort(), url.getHost(), "/test").send(); + client.get(url.getPort(), url.getHost(), "/test/" + path).send(); try { // make sure we did make the proper request @@ -67,7 +86,7 @@ public void testCancel() { Thread.sleep(7_000); // if the count did not increase, it means that Uni was cancelled - assertEquals(1, Resource.COUNT.get()); + assertEquals(expected, count.get()); } catch (InterruptedException ignored) { } finally { @@ -77,7 +96,6 @@ public void testCancel() { } } - } @Path("test") @@ -87,7 +105,28 @@ public static class Resource { @GET @Produces(MediaType.TEXT_PLAIN) - public Uni hello() { + @Path("1") + public Uni defaultCancelableHello() { + COUNT.incrementAndGet(); + return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke( + COUNT::incrementAndGet); + } + + @GET + @Produces(MediaType.TEXT_PLAIN) + @Cancellable(false) + @Path("2") + public Uni uncancellableHello() { + COUNT.incrementAndGet(); + return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke( + COUNT::incrementAndGet); + } + + @GET + @Produces(MediaType.TEXT_PLAIN) + @Cancellable + @Path("3") + public Uni cancellableHello() { COUNT.incrementAndGet(); return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke( COUNT::incrementAndGet); diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java index 53393c76317b6..94ebbea67ecb5 100644 --- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java +++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/scanning/AsyncReturnTypeScanner.java @@ -12,9 +12,14 @@ import java.util.List; import java.util.Map; +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.AnnotationValue; import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; import org.jboss.jandex.MethodInfo; +import org.jboss.resteasy.reactive.common.processor.EndpointIndexer; +import org.jboss.resteasy.reactive.common.processor.transformation.AnnotationStore; +import org.jboss.resteasy.reactive.server.Cancellable; import org.jboss.resteasy.reactive.server.handlers.CompletionStageResponseHandler; import org.jboss.resteasy.reactive.server.handlers.PublisherResponseHandler; import org.jboss.resteasy.reactive.server.handlers.UniResponseHandler; @@ -23,15 +28,24 @@ public class AsyncReturnTypeScanner implements MethodScanner { + private static final DotName CANCELLABLE = DotName.createSimple(Cancellable.class.getName()); + @Override public List scan(MethodInfo method, ClassInfo actualEndpointClass, Map methodContext) { DotName returnTypeName = method.returnType().name(); + AnnotationStore annotationStore = (AnnotationStore) methodContext + .get(EndpointIndexer.METHOD_CONTEXT_ANNOTATION_STORE); + boolean isCancelable = determineCancelable(method, actualEndpointClass, annotationStore); if (returnTypeName.equals(COMPLETION_STAGE) || returnTypeName.equals(COMPLETABLE_FUTURE)) { - return Collections.singletonList(new FixedHandlerChainCustomizer(new CompletionStageResponseHandler(), + CompletionStageResponseHandler handler = new CompletionStageResponseHandler(); + handler.setCancellable(isCancelable); + return Collections.singletonList(new FixedHandlerChainCustomizer(handler, HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } else if (returnTypeName.equals(UNI)) { - return Collections.singletonList(new FixedHandlerChainCustomizer(new UniResponseHandler(), + UniResponseHandler handler = new UniResponseHandler(); + handler.setCancellable(isCancelable); + return Collections.singletonList(new FixedHandlerChainCustomizer(handler, HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE)); } if (returnTypeName.equals(MULTI) || returnTypeName.equals(REST_MULTI) || returnTypeName.equals(PUBLISHER) @@ -42,6 +56,23 @@ public List scan(MethodInfo method, ClassInfo actualEndp return Collections.emptyList(); } + private boolean determineCancelable(MethodInfo method, ClassInfo clazz, AnnotationStore annotationStore) { + AnnotationInstance instance = annotationStore.getAnnotation(method, CANCELLABLE); + if (instance == null) { + instance = annotationStore.getAnnotation(method.declaringClass(), CANCELLABLE); + if ((instance == null) && !clazz.equals(method.declaringClass())) { + instance = annotationStore.getAnnotation(clazz, CANCELLABLE); + } + } + if (instance != null) { + AnnotationValue value = instance.value(); + if (value != null) { + return value.asBoolean(); + } + } + return true; + } + @Override public boolean isMethodSignatureAsync(MethodInfo method) { DotName returnTypeName = method.returnType().name(); diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/Cancellable.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/Cancellable.java new file mode 100644 index 0000000000000..c07c104c051fb --- /dev/null +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/Cancellable.java @@ -0,0 +1,22 @@ +package org.jboss.resteasy.reactive.server; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.concurrent.CompletionStage; + +import io.smallrye.mutiny.Uni; + +/** + * Used on a method that returns a single item async return type (such as {@link Uni} or {@link CompletionStage or Kotlin + * suspend function}) + * to control whether to cancel the subscription to the result if the connection is closed before the result is ready. + * By default, Quarkus will cancel the subscription + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.METHOD, ElementType.TYPE }) +public @interface Cancellable { + + boolean value() default true; +} diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java index 6998220d74e02..1854676749c19 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java @@ -6,9 +6,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; -import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; +import org.jboss.resteasy.reactive.server.spi.AbstractCancellableServerRestHandler; -public class CompletionStageResponseHandler implements ServerRestHandler { +public class CompletionStageResponseHandler extends AbstractCancellableServerRestHandler { @Override public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { @@ -45,7 +45,7 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti requestContext.serverResponse().addCloseHandler(new Runnable() { @Override public void run() { - if (!done.get()) { + if (isCancellable() && !done.get()) { if (result instanceof CompletableFuture cf) { canceled.set(true); cf.cancel(true); diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java index dc89c7e4c6aae..11d283f7a032e 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java @@ -4,12 +4,12 @@ import java.util.function.Consumer; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; -import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; +import org.jboss.resteasy.reactive.server.spi.AbstractCancellableServerRestHandler; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.subscription.Cancellable; -public class UniResponseHandler implements ServerRestHandler { +public class UniResponseHandler extends AbstractCancellableServerRestHandler { @Override public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { @@ -38,7 +38,7 @@ public void accept(Throwable t) { requestContext.serverResponse().addCloseHandler(new Runnable() { @Override public void run() { - if (done.compareAndSet(false, true)) { + if (isCancellable() && done.compareAndSet(false, true)) { cancellable.cancel(); try { // get rid of everything related to the request since the connection has already gone away diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/AbstractCancellableServerRestHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/AbstractCancellableServerRestHandler.java new file mode 100644 index 0000000000000..0e527dd3db951 --- /dev/null +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/spi/AbstractCancellableServerRestHandler.java @@ -0,0 +1,15 @@ +package org.jboss.resteasy.reactive.server.spi; + +public abstract class AbstractCancellableServerRestHandler implements ServerRestHandler { + + // make mutable to allow for bytecode serialization + private boolean cancellable; + + public boolean isCancellable() { + return cancellable; + } + + public void setCancellable(boolean cancellable) { + this.cancellable = cancellable; + } +}