Skip to content

Commit

Permalink
Cancel Uni responses in Quarkus REST when the connection is closed
Browse files Browse the repository at this point in the history
Relates to: quarkusio#41705
  • Loading branch information
geoand authored and holly-cummins committed Jul 31, 2024
1 parent e695207 commit f0172a2
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.quarkus.resteasy.reactive.server.test;

import static io.restassured.RestAssured.when;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.jupiter.api.Assertions.*;

import java.net.URL;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;

public class CancelableUniTest {

@RegisterExtension
static QuarkusUnitTest runner = new QuarkusUnitTest()
.withApplicationRoot(jar -> jar.addClasses(Resource.class));

@BeforeEach
void setUp() {
Resource.COUNT.set(0);
}

@Inject
Vertx vertx;

@TestHTTPResource
URL url;

@Test
public void testNormal() {
when().get("test")
.then()
.statusCode(200)
.body(equalTo("Hello, world"));
}

@Test
public void testCancel() {
WebClient client = WebClient.create(vertx);

client.get(url.getPort(), url.getHost(), "/test").send();

try {
// make sure we did make the proper request
await().atMost(Duration.ofSeconds(2)).untilAtomic(Resource.COUNT, equalTo(1));

// this will effectively cancel the request
client.close();

// make sure we wait until the request could have completed
Thread.sleep(7_000);

// if the count did not increase, it means that Uni was cancelled
assertEquals(1, Resource.COUNT.get());
} catch (InterruptedException ignored) {

} finally {
try {
client.close();
} catch (Exception ignored) {

}
}

}

@Path("test")
public static class Resource {

public static final AtomicInteger COUNT = new AtomicInteger(0);

@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> hello() {
COUNT.incrementAndGet();
return Uni.createFrom().item("Hello, world").onItem().delayIt().by(Duration.ofSeconds(5)).onItem().invoke(
COUNT::incrementAndGet);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,52 @@
package org.jboss.resteasy.reactive.server.handlers;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.Cancellable;

public class UniResponseHandler implements ServerRestHandler {

@Override
public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
// FIXME: handle Response with entity being a Uni
if (requestContext.getResult() instanceof Uni) {
Uni<?> result = (Uni<?>) requestContext.getResult();
if (requestContext.getResult() instanceof Uni<?> result) {
requestContext.suspend();

result.subscribe().with(new Consumer<Object>() {
AtomicBoolean done = new AtomicBoolean();
Cancellable cancellable = result.subscribe().with(new Consumer<Object>() {
@Override
public void accept(Object v) {
done.set(true);
requestContext.setResult(v);
requestContext.resume();
}
}, new Consumer<Throwable>() {
}, new Consumer<>() {
@Override
public void accept(Throwable t) {
done.set(true);
requestContext.resume(t, true);
}
});

requestContext.serverResponse().addCloseHandler(new Runnable() {
@Override
public void run() {
if (!done.get()) {
cancellable.cancel();
try {
// get rid of everything related to the request since the connection has already gone away
requestContext.close();
} catch (Exception ignored) {

}
}
}
});
}
}
}

0 comments on commit f0172a2

Please sign in to comment.