From af89f4fce15da08eb4fb36b789c60ffa4d33884c Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 13 Sep 2022 09:03:49 +0200 Subject: [PATCH] Fix request context leak in the Funqy Knative runtime. --- .../funqy/test/RequestScopeLeakTest.java | 151 ++++++++++++++++++ .../quarkus/funqy/test/RequestScopeTest.java | 103 ++++++++++++ .../test/resources/greeting-uni.properties | 1 + .../knative/events/VertxRequestHandler.java | 32 ++-- .../funqy/runtime/FunctionInvoker.java | 8 +- 5 files changed, 278 insertions(+), 17 deletions(-) create mode 100644 extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeLeakTest.java create mode 100644 extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeTest.java create mode 100644 extensions/funqy/funqy-knative-events/deployment/src/test/resources/greeting-uni.properties diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeLeakTest.java b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeLeakTest.java new file mode 100644 index 0000000000000..2c2c8f62b7e8a --- /dev/null +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeLeakTest.java @@ -0,0 +1,151 @@ +package io.quarkus.funqy.test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.Funq; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.smallrye.common.vertx.VertxContext; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Vertx; + +public class RequestScopeLeakTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyBean.class, Identity.class, Greeting.class, MyFunction.class) + .addAsResource("greeting-uni.properties", "application.properties")); + + @BeforeEach + void cleanup() { + MyBean.DISPOSED.set(0); + } + + @Test + public void testRequestScope() { + RestAssured.given().contentType("application/json") + .body("{\"name\": \"Roxanne\"}") + .post("/") + .then().statusCode(200) + .header("ce-id", nullValue()) + .body("name", equalTo("Roxanne")) + .body("message", equalTo("Hello Roxanne!")); + + Assertions.assertEquals(1, MyBean.DISPOSED.get()); + } + + @Test + public void testRequestScopeWithSyncFailure() { + RestAssured.given().contentType("application/json") + .body("{\"name\": \"sync-failure\"}") + .post("/") + .then().statusCode(500); + Assertions.assertEquals(1, MyBean.DISPOSED.get()); + } + + @Test + public void testRequestScopeWithSyncFailureInPipeline() { + RestAssured.given().contentType("application/json") + .body("{\"name\": \"sync-failure-pipeline\"}") + .post("/") + .then().statusCode(500); + Assertions.assertEquals(1, MyBean.DISPOSED.get()); + } + + @Test + public void testRequestScopeWithASyncFailure() { + RestAssured.given().contentType("application/json") + .body("{\"name\": \"async-failure\"}") + .post("/") + .then().statusCode(500); + Assertions.assertEquals(1, MyBean.DISPOSED.get()); + } + + @RequestScoped + public static class MyBean { + public static AtomicInteger DISPOSED = new AtomicInteger(); + + private final AtomicInteger counter = new AtomicInteger(); + + public int inc() { + return counter.getAndIncrement(); + } + + public void get() { + counter.get(); + } + + @PreDestroy + public void destroy() { + DISPOSED.incrementAndGet(); + } + } + + public static class MyFunction { + + @Inject + MyBean bean; + @Inject + Vertx vertx; + + @Funq + public Uni greeting(Identity name) { + Context context = Vertx.currentContext(); + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + + if (name.getName().equals("sync-failure")) { + Assertions.assertEquals(0, bean.inc()); + throw new IllegalArgumentException("expected sync-failure"); + } + + return Uni.createFrom().item("Hello " + name.getName() + "!") + .invoke(() -> { + Assertions.assertEquals(0, bean.inc()); + Assertions.assertSame(context, Vertx.currentContext()); + }) + .chain(this::nap) + .invoke(() -> { + Assertions.assertEquals(1, bean.inc()); + Assertions.assertSame(context, Vertx.currentContext()); + }) + .invoke(() -> { + if (name.getName().equals("sync-failure-pipeline")) { + throw new IllegalArgumentException("expected sync-failure-in-pipeline"); + } + }) + .map(s -> { + Greeting greeting = new Greeting(); + greeting.setName(name.getName()); + greeting.setMessage(s); + return greeting; + }) + .chain(greeting -> { + if (greeting.getName().equals("async-failure")) { + return Uni.createFrom().failure(() -> new IllegalArgumentException("expected async-failure")); + } + return Uni.createFrom().item(greeting); + }); + } + + public Uni nap(String s) { + return Uni.createFrom().emitter(e -> { + vertx.setTimer(100, x -> e.complete(s)); + }); + } + + } +} diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeTest.java b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeTest.java new file mode 100644 index 0000000000000..01d51b8b9b033 --- /dev/null +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/java/io/quarkus/funqy/test/RequestScopeTest.java @@ -0,0 +1,103 @@ +package io.quarkus.funqy.test; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.RequestScoped; +import javax.inject.Inject; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.Funq; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; +import io.smallrye.common.vertx.VertxContext; + +public class RequestScopeTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(MyBean.class, Identity.class, Greeting.class, MyFunction.class) + .addAsResource("greeting.properties", "application.properties")); + + @BeforeEach + void cleanup() { + MyBean.DISPOSED.set(0); + } + + @Test + public void testRequestScope() { + RestAssured.given().contentType("application/json") + .body("{\"name\": \"Roxanne\"}") + .post("/") + .then().statusCode(200) + .header("ce-id", nullValue()) + .body("name", equalTo("Roxanne")) + .body("message", equalTo("Hello Roxanne!")); + + Assertions.assertEquals(1, MyBean.DISPOSED.get()); + } + + @Test + public void testRequestScopeTerminationWithSynchronousFailure() { + String body = RestAssured.given().contentType("application/json") + .body("{\"name\": \"failure\"}") + .post("/") + .then().statusCode(500).extract().asString(); + + Assertions.assertTrue(body.contains("expected failure")); + Assertions.assertEquals(1, MyBean.DISPOSED.get()); + } + + @RequestScoped + public static class MyBean { + + public static AtomicInteger DISPOSED = new AtomicInteger(); + + private final AtomicInteger counter = new AtomicInteger(); + + public int inc() { + return counter.getAndIncrement(); + } + + public void get() { + counter.get(); + } + + @PreDestroy + public void destroy() { + DISPOSED.incrementAndGet(); + } + } + + public static class MyFunction { + + @Inject + MyBean bean; + + @Funq + public Greeting greet(Identity name) { + Assertions.assertTrue(VertxContext.isOnDuplicatedContext()); + Assertions.assertEquals(0, bean.inc()); + + if (name.getName().equals("failure")) { + throw new IllegalArgumentException("expected failure"); + } + + Greeting greeting = new Greeting(); + greeting.setName(name.getName()); + greeting.setMessage("Hello " + name.getName() + "!"); + + Assertions.assertEquals(1, bean.inc()); + return greeting; + } + + } +} diff --git a/extensions/funqy/funqy-knative-events/deployment/src/test/resources/greeting-uni.properties b/extensions/funqy/funqy-knative-events/deployment/src/test/resources/greeting-uni.properties new file mode 100644 index 0000000000000..ed2792cb11d1d --- /dev/null +++ b/extensions/funqy/funqy-knative-events/deployment/src/test/resources/greeting-uni.properties @@ -0,0 +1 @@ +quarkus.funqy.export=greeting \ No newline at end of file diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java index f9d87c003ef05..eb172090110f0 100644 --- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java +++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java @@ -562,19 +562,25 @@ private FunqyServerResponse dispatch(CloudEvent event, RoutingContext routingCon } } currentVertxRequest.setCurrent(routingContext); - try { - RequestContextImpl funqContext = new RequestContextImpl(); - if (event != null) { - funqContext.setContextData(CloudEvent.class, event); - } - FunqyRequestImpl funqyRequest = new FunqyRequestImpl(funqContext, input); - FunqyResponseImpl funqyResponse = new FunqyResponseImpl(); - invoker.invoke(funqyRequest, funqyResponse); - return funqyResponse; - } finally { - if (requestContext.isActive()) { - requestContext.terminate(); - } + RequestContextImpl funqContext = new RequestContextImpl(); + if (event != null) { + funqContext.setContextData(CloudEvent.class, event); } + FunqyRequestImpl funqyRequest = new FunqyRequestImpl(funqContext, input); + FunqyResponseImpl funqyResponse = new FunqyResponseImpl(); + invoker.invoke(funqyRequest, funqyResponse); + + // The invoker set the output, but we need to extend that output (a Uni) with a termination block deactivating the + // request context if activated. + funqyResponse.setOutput(funqyResponse.getOutput() + .onTermination().invoke(new Runnable() { + @Override + public void run() { + if (requestContext.isActive()) { + requestContext.terminate(); + } + } + })); + return funqyResponse; } } diff --git a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java index aeff7760bf343..e101cd63b93fc 100644 --- a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java +++ b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java @@ -122,22 +122,22 @@ public void invoke(FunqyServerRequest request, FunqyServerResponse response) { try { Object result = method.invoke(target, args); if (isAsync()) { - response.setOutput(((Uni) result).onFailure().transform(t -> new ApplicationException(t))); + response.setOutput(((Uni) result) + .onFailure().transform(t -> new ApplicationException(t))); } else { response.setOutput(Uni.createFrom().item(result)); } + // Catch the exception but do not rethrow the exception, + // The handler decorates the uni with a termination block to handle the request scope deactivation. } catch (IllegalAccessException e) { InternalError ex = new InternalError("Failed to invoke function", e); response.setOutput(Uni.createFrom().failure(ex)); - throw ex; } catch (InvocationTargetException e) { ApplicationException ex = new ApplicationException(e.getCause()); response.setOutput(Uni.createFrom().failure(ex)); - throw ex; } catch (Throwable t) { InternalError ex = new InternalError(t); response.setOutput(Uni.createFrom().failure(ex)); - throw ex; } }