Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix request context leak in the Funqy Knative runtime #27885

Merged
merged 1 commit into from
Sep 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.quarkus.funqy.test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.Funq;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class RequestScopeLeakTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyBean.class, Identity.class, Greeting.class, MyFunction.class)
.addAsResource("greeting-uni.properties", "application.properties"));

@BeforeEach
void cleanup() {
MyBean.DISPOSED.set(0);
}

@Test
public void testRequestScope() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"Roxanne\"}")
.post("/")
.then().statusCode(200)
.header("ce-id", nullValue())
.body("name", equalTo("Roxanne"))
.body("message", equalTo("Hello Roxanne!"));

Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeWithSyncFailure() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"sync-failure\"}")
.post("/")
.then().statusCode(500);
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeWithSyncFailureInPipeline() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"sync-failure-pipeline\"}")
.post("/")
.then().statusCode(500);
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeWithASyncFailure() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"async-failure\"}")
.post("/")
.then().statusCode(500);
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@RequestScoped
public static class MyBean {
public static AtomicInteger DISPOSED = new AtomicInteger();

private final AtomicInteger counter = new AtomicInteger();

public int inc() {
return counter.getAndIncrement();
}

public void get() {
counter.get();
}

@PreDestroy
public void destroy() {
DISPOSED.incrementAndGet();
}
}

public static class MyFunction {

@Inject
MyBean bean;
@Inject
Vertx vertx;

@Funq
public Uni<Greeting> greeting(Identity name) {
Context context = Vertx.currentContext();
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());

if (name.getName().equals("sync-failure")) {
Assertions.assertEquals(0, bean.inc());
throw new IllegalArgumentException("expected sync-failure");
}

return Uni.createFrom().item("Hello " + name.getName() + "!")
.invoke(() -> {
Assertions.assertEquals(0, bean.inc());
Assertions.assertSame(context, Vertx.currentContext());
})
.chain(this::nap)
.invoke(() -> {
Assertions.assertEquals(1, bean.inc());
Assertions.assertSame(context, Vertx.currentContext());
})
.invoke(() -> {
if (name.getName().equals("sync-failure-pipeline")) {
throw new IllegalArgumentException("expected sync-failure-in-pipeline");
}
})
.map(s -> {
Greeting greeting = new Greeting();
greeting.setName(name.getName());
greeting.setMessage(s);
return greeting;
})
.chain(greeting -> {
if (greeting.getName().equals("async-failure")) {
return Uni.createFrom().failure(() -> new IllegalArgumentException("expected async-failure"));
}
return Uni.createFrom().item(greeting);
});
}

public Uni<String> nap(String s) {
return Uni.createFrom().emitter(e -> {
vertx.setTimer(100, x -> e.complete(s));
});
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.quarkus.funqy.test;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PreDestroy;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.funqy.Funq;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.smallrye.common.vertx.VertxContext;

public class RequestScopeTest {

@RegisterExtension
static QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(MyBean.class, Identity.class, Greeting.class, MyFunction.class)
.addAsResource("greeting.properties", "application.properties"));

@BeforeEach
void cleanup() {
MyBean.DISPOSED.set(0);
}

@Test
public void testRequestScope() {
RestAssured.given().contentType("application/json")
.body("{\"name\": \"Roxanne\"}")
.post("/")
.then().statusCode(200)
.header("ce-id", nullValue())
.body("name", equalTo("Roxanne"))
.body("message", equalTo("Hello Roxanne!"));

Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@Test
public void testRequestScopeTerminationWithSynchronousFailure() {
String body = RestAssured.given().contentType("application/json")
.body("{\"name\": \"failure\"}")
.post("/")
.then().statusCode(500).extract().asString();

Assertions.assertTrue(body.contains("expected failure"));
Assertions.assertEquals(1, MyBean.DISPOSED.get());
}

@RequestScoped
public static class MyBean {

public static AtomicInteger DISPOSED = new AtomicInteger();

private final AtomicInteger counter = new AtomicInteger();

public int inc() {
return counter.getAndIncrement();
}

public void get() {
counter.get();
}

@PreDestroy
public void destroy() {
DISPOSED.incrementAndGet();
}
}

public static class MyFunction {

@Inject
MyBean bean;

@Funq
public Greeting greet(Identity name) {
Assertions.assertTrue(VertxContext.isOnDuplicatedContext());
Assertions.assertEquals(0, bean.inc());

if (name.getName().equals("failure")) {
throw new IllegalArgumentException("expected failure");
}

Greeting greeting = new Greeting();
greeting.setName(name.getName());
greeting.setMessage("Hello " + name.getName() + "!");

Assertions.assertEquals(1, bean.inc());
return greeting;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
quarkus.funqy.export=greeting
Original file line number Diff line number Diff line change
@@ -562,19 +562,25 @@ private FunqyServerResponse dispatch(CloudEvent event, RoutingContext routingCon
}
}
currentVertxRequest.setCurrent(routingContext);
try {
RequestContextImpl funqContext = new RequestContextImpl();
if (event != null) {
funqContext.setContextData(CloudEvent.class, event);
}
FunqyRequestImpl funqyRequest = new FunqyRequestImpl(funqContext, input);
FunqyResponseImpl funqyResponse = new FunqyResponseImpl();
invoker.invoke(funqyRequest, funqyResponse);
return funqyResponse;
} finally {
if (requestContext.isActive()) {
requestContext.terminate();
}
RequestContextImpl funqContext = new RequestContextImpl();
if (event != null) {
funqContext.setContextData(CloudEvent.class, event);
}
FunqyRequestImpl funqyRequest = new FunqyRequestImpl(funqContext, input);
FunqyResponseImpl funqyResponse = new FunqyResponseImpl();
invoker.invoke(funqyRequest, funqyResponse);

// The invoker set the output, but we need to extend that output (a Uni) with a termination block deactivating the
// request context if activated.
funqyResponse.setOutput(funqyResponse.getOutput()
.onTermination().invoke(new Runnable() {
@Override
public void run() {
if (requestContext.isActive()) {
requestContext.terminate();
}
}
}));
return funqyResponse;
}
}
Original file line number Diff line number Diff line change
@@ -122,22 +122,22 @@ public void invoke(FunqyServerRequest request, FunqyServerResponse response) {
try {
Object result = method.invoke(target, args);
if (isAsync()) {
response.setOutput(((Uni<?>) result).onFailure().transform(t -> new ApplicationException(t)));
response.setOutput(((Uni<?>) result)
.onFailure().transform(t -> new ApplicationException(t)));
} else {
response.setOutput(Uni.createFrom().item(result));
}
// Catch the exception but do not rethrow the exception,
// The handler decorates the uni with a termination block to handle the request scope deactivation.
} catch (IllegalAccessException e) {
InternalError ex = new InternalError("Failed to invoke function", e);
response.setOutput(Uni.createFrom().failure(ex));
throw ex;
} catch (InvocationTargetException e) {
ApplicationException ex = new ApplicationException(e.getCause());
response.setOutput(Uni.createFrom().failure(ex));
throw ex;
} catch (Throwable t) {
InternalError ex = new InternalError(t);
response.setOutput(Uni.createFrom().failure(ex));
throw ex;
}
}