Skip to content

Commit

Permalink
Merge pull request quarkusio#36466 from mkouba/issue-36430
Browse files Browse the repository at this point in the history
Reactive routes: virtual threads support
  • Loading branch information
mkouba authored Oct 14, 2023
2 parents 87379b2 + 8638dcb commit ec4074a
Show file tree
Hide file tree
Showing 56 changed files with 350 additions and 866 deletions.
8 changes: 7 additions & 1 deletion docs/src/main/asciidoc/reactive-routes.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void blocking(RoutingContext rc) {
// ...
}
----
When `@Blocking` is used, it ignores the `type` attribute of `@Route`.
When `@Blocking` is used, the `type` attribute of the `@Route` is ignored.
====

The `@Route` annotation is repeatable and so you can declare several routes for a single method:
Expand All @@ -164,6 +164,12 @@ String person() {
----
<1> If the `accept` header matches `text/html`, we set the content type automatically to `text/html`.

=== Executing route on a virtual thread

You can annotate a route method with `@io.smallrye.common.annotation.RunOnVirtualThread` in order to execute it on a virtual thread.
However, keep in mind that not everything can run safely on virtual threads.
You should read the xref:virtual-threads.adoc#run-code-on-virtual-threads-using-runonvirtualthread[Virtual thread support reference] carefully and get acquainted with all the details.

=== Handling conflicting routes

You may end up with multiple routes matching a given path.
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ In this scenario, it is worse than useless to have thousands of threads if we ha
Even worse, when running a CPU-bound workload on a virtual thread, the virtual thread monopolizes the carrier thread on which it is mounted.
It will either reduce the chance for the other virtual thread to run or will start creating new carrier threads, leading to high memory usage.

[[run-code-on-virtual-threads-using-runonvirtualthread]]
== Run code on virtual threads using @RunOnVirtualThread

In Quarkus, the support of virtual thread is implemented using the link:{runonvthread}[@RunOnVirtualThread] annotation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.jboss.jandex.Type.Kind;

import io.quarkus.hibernate.validator.spi.BeanValidationAnnotationsBuildItem;
import io.quarkus.vertx.http.runtime.HandlerType;

/**
* Describe a request handler.
Expand All @@ -15,15 +14,15 @@ class HandlerDescriptor {

private final MethodInfo method;
private final BeanValidationAnnotationsBuildItem validationAnnotations;
private final HandlerType handlerType;
private final boolean failureHandler;
private final Type payloadType;
private final String[] contentTypes;

HandlerDescriptor(MethodInfo method, BeanValidationAnnotationsBuildItem bvAnnotations, HandlerType handlerType,
HandlerDescriptor(MethodInfo method, BeanValidationAnnotationsBuildItem bvAnnotations, boolean failureHandler,
String[] producedTypes) {
this.method = method;
this.validationAnnotations = bvAnnotations;
this.handlerType = handlerType;
this.failureHandler = failureHandler;
Type returnType = method.returnType();
if (returnType.kind() == Kind.VOID) {
payloadType = null;
Expand Down Expand Up @@ -120,8 +119,8 @@ boolean isPayloadMutinyBuffer() {
return type.name().equals(DotNames.MUTINY_BUFFER);
}

HandlerType getHandlerType() {
return handlerType;
boolean isFailureHandler() {
return failureHandler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public boolean test(String name) {
if (routeHandler == null) {
String handlerClass = generateHandler(
new HandlerDescriptor(businessMethod.getMethod(), beanValidationAnnotations.orElse(null),
handlerType, produces),
handlerType == HandlerType.FAILURE, produces),
businessMethod.getBean(), businessMethod.getMethod(), classOutput, transformedAnnotations,
routeString, reflectiveHierarchy, produces.length > 0 ? produces[0] : null,
validatorAvailable, index);
Expand All @@ -458,6 +458,13 @@ public boolean test(String name) {
// Wrap the route handler if necessary
// Note that route annotations with the same values share a single handler implementation
routeHandler = recorder.compressRouteHandler(routeHandler, businessMethod.getCompression());
if (businessMethod.getMethod().hasDeclaredAnnotation(DotNames.RUN_ON_VIRTUAL_THREAD)) {
LOGGER.debugf("Route %s#%s() will be executed on a virtual thread",
businessMethod.getMethod().declaringClass().name(), businessMethod.getMethod().name());
routeHandler = recorder.runOnVirtualThread(routeHandler);
// The handler must be executed on the event loop
handlerType = HandlerType.NORMAL;
}

RouteMatcher matcher = new RouteMatcher(path, regex, produces, consumes, methods, order);
matchers.put(matcher, businessMethod.getMethod());
Expand Down Expand Up @@ -489,7 +496,7 @@ public boolean test(String name) {

for (AnnotatedRouteFilterBuildItem filterMethod : routeFilterBusinessMethods) {
String handlerClass = generateHandler(
new HandlerDescriptor(filterMethod.getMethod(), beanValidationAnnotations.orElse(null), HandlerType.NORMAL,
new HandlerDescriptor(filterMethod.getMethod(), beanValidationAnnotations.orElse(null), false,
new String[0]),
filterMethod.getBean(), filterMethod.getMethod(), classOutput, transformedAnnotations,
filterMethod.getRouteFilter().toString(true), reflectiveHierarchy, null, validatorAvailable, index);
Expand Down Expand Up @@ -785,7 +792,7 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met
defaultProduces == null ? invoke.loadNull() : invoke.load(defaultProduces));

// For failure handlers attempt to match the failure type
if (descriptor.getHandlerType() == HandlerType.FAILURE) {
if (descriptor.isFailureHandler()) {
Type failureType = getFailureType(parameters, index);
if (failureType != null) {
ResultHandle failure = invoke.invokeInterfaceMethod(Methods.FAILURE, routingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public Handler<RoutingContext> createHandler(String handlerClassName) {
}
}

public Handler<RoutingContext> runOnVirtualThread(Handler<RoutingContext> routeHandler) {
return new VirtualThreadsRouteHandler(routeHandler);
}

public Handler<RoutingContext> compressRouteHandler(Handler<RoutingContext> routeHandler, HttpCompression compression) {
if (httpBuildTimeConfig.enableCompression) {
return new HttpCompressionHandler(routeHandler, compression,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.vertx.web.runtime;

import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;

public class VirtualThreadsRouteHandler implements Handler<RoutingContext> {

private final Handler<RoutingContext> routeHandler;

public VirtualThreadsRouteHandler(Handler<RoutingContext> routeHandler) {
this.routeHandler = routeHandler;
}

@Override
public void handle(RoutingContext context) {
Context vertxContext = VertxContext.getOrCreateDuplicatedContext(VertxCoreRecorder.getVertx().get());
VertxContextSafetyToggle.setContextSafe(vertxContext, true);
vertxContext.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
routeHandler.handle(context);
}
});
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Use the "compile" scope because we need to include the VirtualThreadsAssertions in the app -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-vertx</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.quarkus.it.vthreads.amqp;

import static io.quarkus.it.vthreads.amqp.AssertHelper.assertThatItRunsOnADuplicatedContext;
import static io.quarkus.it.vthreads.amqp.AssertHelper.assertThatItRunsOnVirtualThread;

import java.util.Random;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -14,6 +11,7 @@
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.quarkus.test.vertx.VirtualThreadsAssertions;
import io.smallrye.common.annotation.RunOnVirtualThread;

@ApplicationScoped
Expand All @@ -25,12 +23,12 @@ public class PriceConsumer {
@Incoming("prices")
@RunOnVirtualThread
public CompletionStage<Void> consume(Message<Double> msg) {
assertThatItRunsOnVirtualThread();
assertThatItRunsOnADuplicatedContext();
VirtualThreadsAssertions.assertThatItRunsOnVirtualThread();
VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext();
double price = msg.getPayload();
alertService.alertMessage(price);
return msg.ack().thenAccept(x -> {
assertThatItRunsOnADuplicatedContext();
VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext();
// While the ack always runs on event loop thread
// the post-ack may run on the processing virtual-thread which executed the method.
});
Expand All @@ -39,8 +37,8 @@ public CompletionStage<Void> consume(Message<Double> msg) {
@Incoming("prices")
@RunOnVirtualThread
public void consume(double price) {
assertThatItRunsOnVirtualThread();
assertThatItRunsOnADuplicatedContext();
VirtualThreadsAssertions.assertThatItRunsOnVirtualThread();
VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext();
alertService.alert(price);
}

Expand All @@ -50,7 +48,7 @@ public void consume(double price) {
@Outgoing("prices-out")
@RunOnVirtualThread
public Message<Double> randomPriceGenerator() {
assertThatItRunsOnVirtualThread();
VirtualThreadsAssertions.assertThatItRunsOnVirtualThread();
return Message.of(r.nextDouble() * 10 * i.incrementAndGet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>

<!-- Use the "compile" scope because we need to include the VirtualThreadsAssertions in the app -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
Expand Down

This file was deleted.

Loading

0 comments on commit ec4074a

Please sign in to comment.