Skip to content

Commit

Permalink
Cancel CompletionStage in Quarkus REST when the connection is closed
Browse files Browse the repository at this point in the history
Relates to: quarkusio#41705
  • Loading branch information
geoand authored and danielsoro committed Sep 20, 2024
1 parent f8f6395 commit 737889d
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package io.quarkus.resteasy.reactive.server.test;

import static io.restassured.RestAssured.when;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URL;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;

public class CancelableCompletionStageTest {

@RegisterExtension
static QuarkusUnitTest runner = new QuarkusUnitTest()
.withApplicationRoot(jar -> jar.addClasses(Resource.class));

@BeforeEach
void setUp() {
Resource.COUNT.set(0);
}

@Inject
Vertx vertx;

@TestHTTPResource
URL url;

@Test
public void testNormal() {
when().get("test")
.then()
.statusCode(200)
.body(equalTo("Hello, world"));
}

@Test
public void testCancel() {
WebClient client = WebClient.create(vertx);

client.get(url.getPort(), url.getHost(), "/test").send();

try {
// make sure we did make the proper request
await().atMost(Duration.ofSeconds(2)).untilAtomic(Resource.COUNT, equalTo(1));

// this will effectively cancel the request
client.close();

// make sure we wait until the request could have completed
Thread.sleep(7_000);

// if the count did not increase, it means that Uni was cancelled
assertEquals(1, Resource.COUNT.get());
} catch (InterruptedException ignored) {

} finally {
try {
client.close();
} catch (Exception ignored) {

}
}

}

@Path("test")
public static class Resource {

public static final AtomicInteger COUNT = new AtomicInteger(0);

@GET
@Produces(MediaType.TEXT_PLAIN)
public CompletionStage<String> hello() {
COUNT.incrementAndGet();
return CompletableFuture.supplyAsync(
new Supplier<>() {
@Override
public String get() {
COUNT.incrementAndGet();
return "Hello, world";
}
},
CompletableFuture.delayedExecutor(5, TimeUnit.SECONDS));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.jboss.resteasy.reactive.server.handlers;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
Expand All @@ -10,19 +12,42 @@ public class CompletionStageResponseHandler implements ServerRestHandler {
@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
// FIXME: handle Response with entity being a CompletionStage
if (requestContext.getResult() instanceof CompletionStage) {
CompletionStage<?> result = (CompletionStage<?>) requestContext.getResult();
if (requestContext.getResult() instanceof CompletionStage<?> result) {
requestContext.suspend();

AtomicBoolean done = new AtomicBoolean();
AtomicBoolean canceled = new AtomicBoolean();
result.handle((v, t) -> {
if (t != null) {
requestContext.handleException(t, true);
done.set(true);
if (canceled.get()) {
try {
// get rid of everything related to the request since the connection has already gone away
requestContext.close();
} catch (Exception ignored) {

}
} else {
requestContext.setResult(v);
if (t != null) {
requestContext.handleException(t, true);
} else {
requestContext.setResult(v);
}
requestContext.resume();
}
requestContext.resume();
return null;
});

requestContext.serverResponse().addCloseHandler(new Runnable() {
@Override
public void run() {
if (!done.get()) {
if (result instanceof CompletableFuture<?> cf) {
canceled.set(true);
cf.cancel(true);
}
}
}
});
}
}
}

0 comments on commit 737889d

Please sign in to comment.