Skip to content

Commit

Permalink
Fix request context leak in the Funqy Knative runtime.
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored and Alasdair Preston committed Sep 14, 2022
1 parent a861e54 commit 7cb775e
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.quarkus.funqy.test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.Funq;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class RequestScopeLeakTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyBean.class, Identity.class, Greeting.class, MyFunction.class)
.addAsResource("greeting-uni.properties", "application.properties"));

@BeforeEach
void cleanup() {
MyBean.DISPOSED.set(0);
}

@Test
public void testRequestScope() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"Roxanne\"}")
.post("/")
.then().statusCode(200)
.header("ce-id", nullValue())
.body("name", equalTo("Roxanne"))
.body("message", equalTo("Hello Roxanne!"));

Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeWithSyncFailure() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"sync-failure\"}")
.post("/")
.then().statusCode(500);
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeWithSyncFailureInPipeline() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"sync-failure-pipeline\"}")
.post("/")
.then().statusCode(500);
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeWithASyncFailure() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"async-failure\"}")
.post("/")
.then().statusCode(500);
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@RequestScoped
public static class MyBean {
public static AtomicInteger DISPOSED = new AtomicInteger();

private final AtomicInteger counter = new AtomicInteger();

public int inc() {
return counter.getAndIncrement();
}

public void get() {
counter.get();
}

@PreDestroy
public void destroy() {
DISPOSED.incrementAndGet();
}
}

public static class MyFunction {

@Inject
MyBean bean;
@Inject
Vertx vertx;

@Funq
public Uni<Greeting> greeting(Identity name) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());

if (name.getName().equals("sync-failure")) {
Assertions.assertEquals(0, bean.inc());
throw new IllegalArgumentException("expected sync-failure");
}

return Uni.createFrom().item("Hello " + name.getName() + "!")
.invoke(() -> {
Assertions.assertEquals(0, bean.inc());
Assertions.assertSame(context, Vertx.currentContext());
})
.chain(this::nap)
.invoke(() -> {
Assertions.assertEquals(1, bean.inc());
Assertions.assertSame(context, Vertx.currentContext());
})
.invoke(() -> {
if (name.getName().equals("sync-failure-pipeline")) {
throw new IllegalArgumentException("expected sync-failure-in-pipeline");
}
})
.map(s -> {
Greeting greeting = new Greeting();
greeting.setName(name.getName());
greeting.setMessage(s);
return greeting;
})
.chain(greeting -> {
if (greeting.getName().equals("async-failure")) {
return Uni.createFrom().failure(() -> new IllegalArgumentException("expected async-failure"));
}
return Uni.createFrom().item(greeting);
});
}

public Uni<String> nap(String s) {
return Uni.createFrom().emitter(e -> {
vertx.setTimer(100, x -> e.complete(s));
});
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.quarkus.funqy.test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.Funq;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.smallrye.common.vertx.VertxContext;

public class RequestScopeTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyBean.class, Identity.class, Greeting.class, MyFunction.class)
.addAsResource("greeting.properties", "application.properties"));

@BeforeEach
void cleanup() {
MyBean.DISPOSED.set(0);
}

@Test
public void testRequestScope() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"Roxanne\"}")
.post("/")
.then().statusCode(200)
.header("ce-id", nullValue())
.body("name", equalTo("Roxanne"))
.body("message", equalTo("Hello Roxanne!"));

Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeTerminationWithSynchronousFailure() {
String body = RestAssured.given().contentType("application/json")
.body("{\"name\": \"failure\"}")
.post("/")
.then().statusCode(500).extract().asString();

Assertions.assertTrue(body.contains("expected failure"));
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@RequestScoped
public static class MyBean {

public static AtomicInteger DISPOSED = new AtomicInteger();

private final AtomicInteger counter = new AtomicInteger();

public int inc() {
return counter.getAndIncrement();
}

public void get() {
counter.get();
}

@PreDestroy
public void destroy() {
DISPOSED.incrementAndGet();
}
}

public static class MyFunction {

@Inject
MyBean bean;

@Funq
public Greeting greet(Identity name) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Assertions.assertEquals(0, bean.inc());

if (name.getName().equals("failure")) {
throw new IllegalArgumentException("expected failure");
}

Greeting greeting = new Greeting();
greeting.setName(name.getName());
greeting.setMessage("Hello " + name.getName() + "!");

Assertions.assertEquals(1, bean.inc());
return greeting;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
quarkus.funqy.export=greeting
Original file line number Diff line number Diff line change
Expand Up @@ -562,19 +562,25 @@ private FunqyServerResponse dispatch(CloudEvent event, RoutingContext routingCon
}
}
currentVertxRequest.setCurrent(routingContext);
try {
RequestContextImpl funqContext = new RequestContextImpl();
if (event != null) {
funqContext.setContextData(CloudEvent.class, event);
}
FunqyRequestImpl funqyRequest = new FunqyRequestImpl(funqContext, input);
FunqyResponseImpl funqyResponse = new FunqyResponseImpl();
invoker.invoke(funqyRequest, funqyResponse);
return funqyResponse;
} finally {
if (requestContext.isActive()) {
requestContext.terminate();
}
RequestContextImpl funqContext = new RequestContextImpl();
if (event != null) {
funqContext.setContextData(CloudEvent.class, event);
}
FunqyRequestImpl funqyRequest = new FunqyRequestImpl(funqContext, input);
FunqyResponseImpl funqyResponse = new FunqyResponseImpl();
invoker.invoke(funqyRequest, funqyResponse);

// The invoker set the output, but we need to extend that output (a Uni) with a termination block deactivating the
// request context if activated.
funqyResponse.setOutput(funqyResponse.getOutput()
.onTermination().invoke(new Runnable() {
@Override
public void run() {
if (requestContext.isActive()) {
requestContext.terminate();
}
}
}));
return funqyResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,22 @@ public void invoke(FunqyServerRequest request, FunqyServerResponse response) {
try {
Object result = method.invoke(target, args);
if (isAsync()) {
response.setOutput(((Uni<?>) result).onFailure().transform(t -> new ApplicationException(t)));
response.setOutput(((Uni<?>) result)
.onFailure().transform(t -> new ApplicationException(t)));
} else {
response.setOutput(Uni.createFrom().item(result));
}
// Catch the exception but do not rethrow the exception,
// The handler decorates the uni with a termination block to handle the request scope deactivation.
} catch (IllegalAccessException e) {
InternalError ex = new InternalError("Failed to invoke function", e);
response.setOutput(Uni.createFrom().failure(ex));
throw ex;
} catch (InvocationTargetException e) {
ApplicationException ex = new ApplicationException(e.getCause());
response.setOutput(Uni.createFrom().failure(ex));
throw ex;
} catch (Throwable t) {
InternalError ex = new InternalError(t);
response.setOutput(Uni.createFrom().failure(ex));
throw ex;
}
}

Expand Down

0 comments on commit 7cb775e

Please sign in to comment.