diff --git a/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableCompletionStageTest.java b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableCompletionStageTest.java new file mode 100644 index 00000000000000..286cb45d56877a --- /dev/null +++ b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableCompletionStageTest.java @@ -0,0 +1,106 @@ +package io.quarkus.resteasy.reactive.server.test; + +import static io.restassured.RestAssured.when; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URL; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; + +public class CancelableCompletionStageTest { + + @RegisterExtension + static QuarkusUnitTest runner = new QuarkusUnitTest() + .withApplicationRoot(jar -> jar.addClasses(Resource.class)); + + @BeforeEach + void setUp() { + Resource.COUNT.set(0); + } + + @Inject + Vertx vertx; + + @TestHTTPResource + URL url; + + @Test + public void testNormal() { + when().get("test") + .then() + .statusCode(200) + .body(equalTo("Hello, world")); + } + + @Test + public void testCancel() { + WebClient client = WebClient.create(vertx); + + client.get(url.getPort(), url.getHost(), "/test").send(); + + try { + // make sure we did make the proper request + await().atMost(Duration.ofSeconds(2)).untilAtomic(Resource.COUNT, equalTo(1)); + + // this will effectively cancel the request + client.close(); + + // make sure we wait until the request could have completed + Thread.sleep(7_000); + + // if the count did not increase, it means that Uni was cancelled + assertEquals(1, Resource.COUNT.get()); + } catch (InterruptedException ignored) { + + } finally { + try { + client.close(); + } catch (Exception ignored) { + + } + } + + } + + @Path("test") + public static class Resource { + + public static final AtomicInteger COUNT = new AtomicInteger(0); + + @GET + @Produces(MediaType.TEXT_PLAIN) + public CompletionStage hello() { + COUNT.incrementAndGet(); + return CompletableFuture.supplyAsync( + new Supplier<>() { + @Override + public String get() { + COUNT.incrementAndGet(); + return "Hello, world"; + } + }, + CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS)); + } + } +} diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java index 502fece9d492ba..757b519eeaef66 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/CompletionStageResponseHandler.java @@ -1,6 +1,8 @@ package org.jboss.resteasy.reactive.server.handlers; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; @@ -10,19 +12,42 @@ public class CompletionStageResponseHandler implements ServerRestHandler { @Override public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { // FIXME: handle Response with entity being a CompletionStage - if (requestContext.getResult() instanceof CompletionStage) { - CompletionStage result = (CompletionStage) requestContext.getResult(); + if (requestContext.getResult() instanceof CompletionStage result) { requestContext.suspend(); + AtomicBoolean done = new AtomicBoolean(); + AtomicBoolean canceled = new AtomicBoolean(); result.handle((v, t) -> { - if (t != null) { - requestContext.handleException(t, true); + done.set(true); + if (canceled.get()) { + try { + // get rid of everything related to the request since the connection has already gone away + requestContext.close(); + } catch (Exception ignored) { + + } } else { - requestContext.setResult(v); + if (t != null) { + requestContext.handleException(t, true); + } else { + requestContext.setResult(v); + } + requestContext.resume(); } - requestContext.resume(); return null; }); + + requestContext.serverResponse().addCloseHandler(new Runnable() { + @Override + public void run() { + if (!done.get()) { + if (result instanceof CompletableFuture cf) { + canceled.set(true); + cf.cancel(true); + } + } + } + }); } } }