Skip to content

Commit

Permalink
Reactive routes - if needed propagate req. context activated by filter
Browse files Browse the repository at this point in the history
- resolves #13073
  • Loading branch information
mkouba committed Nov 24, 2020
1 parent 2a1883b commit 0e8efd2
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.vertx.web.filter;

import static org.hamcrest.Matchers.is;

import java.util.concurrent.atomic.AtomicInteger;

import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.vertx.web.Route;
import io.quarkus.vertx.web.RouteFilter;
import io.restassured.RestAssured;
import io.vertx.ext.web.RoutingContext;

public class UserFilterRequestContextPropagationTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(FilterAndRoute.class, RequestFoo.class));

@Test
public void test() {
RestAssured.post("/hello").then().statusCode(200)
.body(is("11"));
}

public static class FilterAndRoute {

@Inject
RequestFoo foo;

@RouteFilter
void filter1(RoutingContext rc) {
foo.setState(11);
rc.next();
}

@Route(path = "hello")
void hello(RoutingContext ctx) {
ctx.response().end("" + foo.getState());
}

}

@RequestScoped
static class RequestFoo {

private AtomicInteger state = new AtomicInteger(-1);

void setState(int value) {
this.state.set(value);
}

public int getState() {
return state.get();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import javax.enterprise.event.Event;

import io.quarkus.arc.Arc;
import io.quarkus.arc.InjectableContext;
import io.quarkus.arc.InjectableContext.ContextState;
import io.quarkus.arc.ManagedContext;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.vertx.http.runtime.CurrentVertxRequest;
Expand All @@ -20,6 +20,8 @@
*/
public abstract class RouteHandler implements Handler<RoutingContext> {

private static final String REQUEST_CONTEXT_STATE = "__cdi_req_ctx_state";

private final Event<SecurityIdentity> securityIdentityEvent;
private final CurrentVertxRequest currentVertxRequest;

Expand Down Expand Up @@ -47,20 +49,24 @@ public void handle(RoutingContext context) {
invoke(context);
} else {
try {
// The context could be activated by a filter - we need to use the same context
// Note that for requests that require a BodyHandler the route method is always invoked asynchronously and so if a filter activates the request context we must propagat it manually
ContextState state = context.get(REQUEST_CONTEXT_STATE);
// Activate the context, i.e. set the thread locals
requestContext.activate();
requestContext.activate(state);
currentVertxRequest.setCurrent(context);
if (user != null) {
securityIdentityEvent.fire(user.getSecurityIdentity());
}
// Reactive routes can use async processing (e.g. mutiny Uni/Multi) and context propagation
// 1. Store the state (which is basically a shared Map instance)
// 2. Terminate the context correcly when the response is disposed or an exception is thrown
InjectableContext.ContextState state = requestContext.getState();
final ContextState endState = state != null ? state : requestContext.getState();
context.put(REQUEST_CONTEXT_STATE, endState);
context.addEndHandler(new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> result) {
requestContext.destroy(state);
requestContext.destroy(endState);
}
});
invoke(context);
Expand Down

0 comments on commit 0e8efd2

Please sign in to comment.