Skip to content

Commit

Permalink
Introduce @Cancellable annotation for Quarkus REST
Browse files Browse the repository at this point in the history
This annotation can be placed on REST methods
that return a single result async type
and allows the user to configure whether the
subscription should be cancelled if the
connection is closed before the result is available

Closes: quarkusio#45141
  • Loading branch information
geoand committed Dec 16, 2024
1 parent 283566b commit 7bd308a
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.server.Cancellable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -44,17 +45,35 @@ void setUp() {

@Test
public void testNormal() {
when().get("test")
doTestNormal("1");
}

@Test
public void testDefaultCancellable() {
doTestCancel("1", Resource.COUNT, 1);
}

@Test
public void testUnCancellable() {
doTestCancel("2", Resource.COUNT, 2);
}

@Test
public void testCancellable() {
doTestCancel("3", Resource.COUNT, 1);
}

private void doTestNormal(String path) {
when().get("test/" + path)
.then()
.statusCode(200)
.body(equalTo("Hello, world"));
}

@Test
public void testCancel() {
private void doTestCancel(String path, AtomicInteger count, int expected) {
WebClient client = WebClient.create(vertx);

client.get(url.getPort(), url.getHost(), "/test").send();
client.get(url.getPort(), url.getHost(), "/test/" + path).send();

try {
// make sure we did make the proper request
Expand All @@ -67,7 +86,7 @@ public void testCancel() {
Thread.sleep(7_000);

// if the count did not increase, it means that Uni was cancelled
assertEquals(1, Resource.COUNT.get());
assertEquals(expected, count.get());
} catch (InterruptedException ignored) {

} finally {
Expand All @@ -77,7 +96,6 @@ public void testCancel() {

}
}

}

@Path("test")
Expand All @@ -87,7 +105,28 @@ public static class Resource {

@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> hello() {
@Path("1")
public Uni<String> defaultCancelableHello() {
COUNT.incrementAndGet();
return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke(
COUNT::incrementAndGet);
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Cancellable(false)
@Path("2")
public Uni<String> uncancellableHello() {
COUNT.incrementAndGet();
return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke(
COUNT::incrementAndGet);
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Cancellable
@Path("3")
public Uni<String> cancellableHello() {
COUNT.incrementAndGet();
return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke(
COUNT::incrementAndGet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
import java.util.List;
import java.util.Map;

import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationValue;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.MethodInfo;
import org.jboss.resteasy.reactive.common.processor.EndpointIndexer;
import org.jboss.resteasy.reactive.common.processor.transformation.AnnotationStore;
import org.jboss.resteasy.reactive.server.Cancellable;
import org.jboss.resteasy.reactive.server.handlers.CompletionStageResponseHandler;
import org.jboss.resteasy.reactive.server.handlers.PublisherResponseHandler;
import org.jboss.resteasy.reactive.server.handlers.UniResponseHandler;
Expand All @@ -23,15 +28,24 @@

public class AsyncReturnTypeScanner implements MethodScanner {

private static final DotName CANCELLABLE = DotName.createSimple(Cancellable.class.getName());

@Override
public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndpointClass,
Map<String, Object> methodContext) {
DotName returnTypeName = method.returnType().name();
AnnotationStore annotationStore = (AnnotationStore) methodContext
.get(EndpointIndexer.METHOD_CONTEXT_ANNOTATION_STORE);
boolean isCancelable = determineCancelable(method, actualEndpointClass, annotationStore);
if (returnTypeName.equals(COMPLETION_STAGE) || returnTypeName.equals(COMPLETABLE_FUTURE)) {
return Collections.singletonList(new FixedHandlerChainCustomizer(new CompletionStageResponseHandler(),
CompletionStageResponseHandler handler = new CompletionStageResponseHandler();
handler.setCancellable(isCancelable);
return Collections.singletonList(new FixedHandlerChainCustomizer(handler,
HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE));
} else if (returnTypeName.equals(UNI)) {
return Collections.singletonList(new FixedHandlerChainCustomizer(new UniResponseHandler(),
UniResponseHandler handler = new UniResponseHandler();
handler.setCancellable(isCancelable);
return Collections.singletonList(new FixedHandlerChainCustomizer(handler,
HandlerChainCustomizer.Phase.AFTER_METHOD_INVOKE));
}
if (returnTypeName.equals(MULTI) || returnTypeName.equals(REST_MULTI) || returnTypeName.equals(PUBLISHER)
Expand All @@ -42,6 +56,23 @@ public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndp
return Collections.emptyList();
}

private boolean determineCancelable(MethodInfo method, ClassInfo clazz, AnnotationStore annotationStore) {
AnnotationInstance instance = annotationStore.getAnnotation(method, CANCELLABLE);
if (instance == null) {
instance = annotationStore.getAnnotation(method.declaringClass(), CANCELLABLE);
if ((instance == null) && !clazz.equals(method.declaringClass())) {
instance = annotationStore.getAnnotation(clazz, CANCELLABLE);
}
}
if (instance != null) {
AnnotationValue value = instance.value();
if (value != null) {
return value.asBoolean();
}
}
return true;
}

@Override
public boolean isMethodSignatureAsync(MethodInfo method) {
DotName returnTypeName = method.returnType().name();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.jboss.resteasy.reactive.server;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.CompletionStage;

import io.smallrye.mutiny.Uni;

/**
* Used on a method that returns a single item async return type (such as {@link Uni} or {@link CompletionStage or Kotlin
* suspend function})
* to control whether to cancel the subscription to the result if the connection is closed before the result is ready.
* By default, Quarkus will cancel the subscription
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.METHOD, ElementType.TYPE })
public @interface Cancellable {

boolean value() default true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.jboss.resteasy.reactive.server.spi.AbstractCancellableServerRestHandler;

public class CompletionStageResponseHandler implements ServerRestHandler {
public class CompletionStageResponseHandler extends AbstractCancellableServerRestHandler {

@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
Expand Down Expand Up @@ -45,7 +45,7 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti
requestContext.serverResponse().addCloseHandler(new Runnable() {
@Override
public void run() {
if (!done.get()) {
if (isCancellable() && !done.get()) {
if (result instanceof CompletableFuture<?> cf) {
canceled.set(true);
cf.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import java.util.function.Consumer;

import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.jboss.resteasy.reactive.server.spi.AbstractCancellableServerRestHandler;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.Cancellable;

public class UniResponseHandler implements ServerRestHandler {
public class UniResponseHandler extends AbstractCancellableServerRestHandler {

@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
Expand Down Expand Up @@ -38,7 +38,7 @@ public void accept(Throwable t) {
requestContext.serverResponse().addCloseHandler(new Runnable() {
@Override
public void run() {
if (done.compareAndSet(false, true)) {
if (isCancellable() && done.compareAndSet(false, true)) {
cancellable.cancel();
try {
// get rid of everything related to the request since the connection has already gone away
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.jboss.resteasy.reactive.server.spi;

public abstract class AbstractCancellableServerRestHandler implements ServerRestHandler {

// make mutable to allow for bytecode serialization
private boolean cancellable;

public boolean isCancellable() {
return cancellable;
}

public void setCancellable(boolean cancellable) {
this.cancellable = cancellable;
}
}

0 comments on commit 7bd308a

Please sign in to comment.