From f0172a26c656c75d32c00d2c49e88cfc7f111297 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Fri, 5 Jul 2024 13:52:15 +0300 Subject: [PATCH] Cancel Uni responses in Quarkus REST when the connection is closed Relates to: #41705 --- .../server/test/CancelableUniTest.java | 96 +++++++++++++++++++ .../server/handlers/UniResponseHandler.java | 27 +++++- 2 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java diff --git a/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java new file mode 100644 index 00000000000000..3b1f003566ff7d --- /dev/null +++ b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/CancelableUniTest.java @@ -0,0 +1,96 @@ +package io.quarkus.resteasy.reactive.server.test; + +import static io.restassured.RestAssured.when; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.jupiter.api.Assertions.*; + +import java.net.URL; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; + +public class CancelableUniTest { + + @RegisterExtension + static QuarkusUnitTest runner = new QuarkusUnitTest() + .withApplicationRoot(jar -> jar.addClasses(Resource.class)); + + @BeforeEach + void setUp() { + Resource.COUNT.set(0); + } + + @Inject + Vertx vertx; + + @TestHTTPResource + URL url; + + @Test + public void testNormal() { + when().get("test") + .then() + .statusCode(200) + .body(equalTo("Hello, world")); + } + + @Test + public void testCancel() { + WebClient client = WebClient.create(vertx); + + client.get(url.getPort(), url.getHost(), "/test").send(); + + try { + // make sure we did make the proper request + await().atMost(Duration.ofSeconds(2)).untilAtomic(Resource.COUNT, equalTo(1)); + + // this will effectively cancel the request + client.close(); + + // make sure we wait until the request could have completed + Thread.sleep(7_000); + + // if the count did not increase, it means that Uni was cancelled + assertEquals(1, Resource.COUNT.get()); + } catch (InterruptedException ignored) { + + } finally { + try { + client.close(); + } catch (Exception ignored) { + + } + } + + } + + @Path("test") + public static class Resource { + + public static final AtomicInteger COUNT = new AtomicInteger(0); + + @GET + @Produces(MediaType.TEXT_PLAIN) + public Uni hello() { + COUNT.incrementAndGet(); + return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke( + COUNT::incrementAndGet); + } + } +} diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java index 027bb09290b7a0..8e983e0e51b406 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/UniResponseHandler.java @@ -1,33 +1,52 @@ package org.jboss.resteasy.reactive.server.handlers; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.Cancellable; public class UniResponseHandler implements ServerRestHandler { @Override public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { // FIXME: handle Response with entity being a Uni - if (requestContext.getResult() instanceof Uni) { - Uni result = (Uni) requestContext.getResult(); + if (requestContext.getResult() instanceof Uni result) { requestContext.suspend(); - result.subscribe().with(new Consumer() { + AtomicBoolean done = new AtomicBoolean(); + Cancellable cancellable = result.subscribe().with(new Consumer() { @Override public void accept(Object v) { + done.set(true); requestContext.setResult(v); requestContext.resume(); } - }, new Consumer() { + }, new Consumer<>() { @Override public void accept(Throwable t) { + done.set(true); requestContext.resume(t, true); } }); + + requestContext.serverResponse().addCloseHandler(new Runnable() { + @Override + public void run() { + if (!done.get()) { + cancellable.cancel(); + try { + // get rid of everything related to the request since the connection has already gone away + requestContext.close(); + } catch (Exception ignored) { + + } + } + } + }); } } }