Skip to content

Commit

Permalink
Integrate @RunOnVirtualThread with the @ConsumeEvent
Browse files Browse the repository at this point in the history
Allows @ConsumeEvent method to run on a virtual thread.
Also verify that sending and receiving from the event bus is not pinning the carrier thread.
  • Loading branch information
cescoffier committed Aug 28, 2023
1 parent 3dede5c commit 5661a27
Show file tree
Hide file tree
Showing 20 changed files with 435 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/virtual-threads-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{
"category": "Main",
"timeout": 45,
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads",
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads",
"os-name": "ubuntu-latest"
},
{
Expand Down
9 changes: 9 additions & 0 deletions docs/src/main/asciidoc/vertx-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,15 @@ Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
----

=== Process events on virtual threads

Methods annotated with `@ConsumeEvent` can also be annotated with `@RunOnVirtualThread`.
In this case, the method is invoked on a virtual thread.
Each event is invoked on a different virtual thread.

The method must be _blocking_ and your Java runtime must provide support for virtual threads.
Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

=== Use codecs

The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses codecs to _serialize_ and _deserialize_ objects.
Expand Down
4 changes: 4 additions & 0 deletions extensions/vertx/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.vertx.runtime.EventConsumerInvoker;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Uni;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.Message;
Expand Down Expand Up @@ -78,6 +79,7 @@ class EventBusConsumer {
protected static final MethodDescriptor THROWABLE_TO_STRING = MethodDescriptor
.ofMethod(Throwable.class, "toString", String.class);
protected static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName());
protected static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());

static String generateInvoker(BeanInfo bean, MethodInfo method,
AnnotationInstance consumeEvent,
Expand All @@ -100,9 +102,10 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
String generatedName = targetPackage + baseName + INVOKER_SUFFIX + "_" + method.name() + "_"
+ HashUtil.sha1(sigBuilder.toString());

boolean blocking;
boolean blocking, runOnVirtualThread;
AnnotationValue blockingValue = consumeEvent.value("blocking");
blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean());
runOnVirtualThread = method.hasAnnotation(RUN_ON_VIRTUAL_THREAD);

ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName)
.superClass(EventConsumerInvoker.class).build();
Expand All @@ -113,11 +116,16 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
FieldCreator containerField = invokerCreator.getFieldCreator("container", ArcContainer.class)
.setModifiers(ACC_PRIVATE | ACC_FINAL);

if (blocking) {
if (blocking || runOnVirtualThread) {
MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class);
isBlocking.returnValue(isBlocking.load(true));
}

if (runOnVirtualThread) {
MethodCreator isRunOnVirtualThread = invokerCreator.getMethodCreator("isRunningOnVirtualThread", boolean.class);
isRunOnVirtualThread.returnValue(isRunOnVirtualThread.load(true));
}

AnnotationValue orderedValue = consumeEvent.value("ordered");
boolean ordered = orderedValue != null && orderedValue.asBoolean();
if (ordered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.runtime.VertxEventBusConsumerRecorder;
import io.quarkus.vertx.runtime.VertxProducer;
import io.quarkus.vertx.runtime.VertxRecorder;
import io.smallrye.common.annotation.RunOnVirtualThread;

class VertxProcessor {

Expand All @@ -68,7 +69,7 @@ AdditionalBeanBuildItem registerBean() {

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder,
VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder recorder,
List<EventConsumerBusinessMethodItem> messageConsumerBusinessMethods,
BuildProducer<GeneratedClassBuildItem> generatedClass,
AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown,
Expand Down Expand Up @@ -102,7 +103,7 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder,
@BuildStep
@Record(ExecutionTime.STATIC_INIT)
void currentContextFactory(BuildProducer<CurrentContextFactoryBuildItem> currentContextFactory,
VertxBuildConfig buildConfig, VertxRecorder recorder) {
VertxBuildConfig buildConfig, VertxEventBusConsumerRecorder recorder) {
if (buildConfig.customizeArcContext()) {
currentContextFactory.produce(new CurrentContextFactoryBuildItem(recorder.currentContextFactory()));
}
Expand Down Expand Up @@ -150,6 +151,12 @@ void collectEventConsumers(
"An event consumer business method that accepts io.vertx.core.eventbus.Message or io.vertx.mutiny.core.eventbus.Message must return void [method: %s, bean:%s]",
method, bean));
}
if (method.hasAnnotation(RunOnVirtualThread.class) && consumeEvent.value("ordered") != null
&& consumeEvent.value("ordered").asBoolean()) {
throw new IllegalStateException(String.format(
"An event consumer business method that cannot use @RunOnVirtualThread and set the ordered attribute to true [method: %s, bean:%s]",
method, bean));
}
messageConsumerBusinessMethods
.produce(new EventConsumerBusinessMethodItem(bean, method, consumeEvent));
LOGGER.debugf("Found event consumer business method %s declared on %s", method, bean);
Expand Down
4 changes: 4 additions & 0 deletions extensions/vertx/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-latebound-mdc-provider</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public boolean isBlocking() {
return false;
}

public boolean isRunningOnVirtualThread() {
return false;
}

public boolean isOrdered() {
return false;
}
Expand Down Expand Up @@ -77,7 +81,7 @@ public void accept(Object result, Throwable failure) {
if (failure != null) {
if (message.replyAddress() == null) {
// No reply handler
throw VertxRecorder.wrapIfNecessary(failure);
throw VertxEventBusConsumerRecorder.wrapIfNecessary(failure);
} else {
message.fail(ConsumeEvent.EXPLICIT_FAILURE_CODE, failure.getMessage());
}
Expand Down Expand Up @@ -105,12 +109,12 @@ public void accept(Object result, Throwable failure) {
try {
requestContext.destroy(endState);
} catch (Exception e) {
throw VertxRecorder.wrapIfNecessary(e);
throw VertxEventBusConsumerRecorder.wrapIfNecessary(e);
}
if (failure != null) {
if (message.replyAddress() == null) {
// No reply handler
throw VertxRecorder.wrapIfNecessary(failure);
throw VertxEventBusConsumerRecorder.wrapIfNecessary(failure);
} else {
message.fail(ConsumeEvent.EXPLICIT_FAILURE_CODE, failure.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.configuration.ProfileManager;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
Expand All @@ -35,17 +36,17 @@
import io.vertx.core.impl.VertxInternal;

@Recorder
public class VertxRecorder {
public class VertxEventBusConsumerRecorder {

private static final Logger LOGGER = Logger.getLogger(VertxRecorder.class.getName());
private static final Logger LOGGER = Logger.getLogger(VertxEventBusConsumerRecorder.class.getName());

static volatile Vertx vertx;
static volatile List<MessageConsumer<?>> messageConsumers;

public void configureVertx(Supplier<Vertx> vertx, Map<String, ConsumeEvent> messageConsumerConfigurations,
LaunchMode launchMode, ShutdownContext shutdown, Map<Class<?>, Class<?>> codecByClass) {
VertxRecorder.vertx = vertx.get();
VertxRecorder.messageConsumers = new CopyOnWriteArrayList<>();
VertxEventBusConsumerRecorder.vertx = vertx.get();
VertxEventBusConsumerRecorder.messageConsumers = new CopyOnWriteArrayList<>();

registerMessageConsumers(messageConsumerConfigurations);
registerCodecs(codecByClass);
Expand Down Expand Up @@ -83,7 +84,7 @@ void destroy() {
void registerMessageConsumers(Map<String, ConsumeEvent> messageConsumerConfigurations) {
if (!messageConsumerConfigurations.isEmpty()) {
EventBus eventBus = vertx.eventBus();
VertxInternal vi = (VertxInternal) VertxRecorder.vertx;
VertxInternal vi = (VertxInternal) VertxEventBusConsumerRecorder.vertx;
CountDownLatch latch = new CountDownLatch(messageConsumerConfigurations.size());
final List<Throwable> registrationFailures = new ArrayList<>();
for (Entry<String, ConsumeEvent> entry : messageConsumerConfigurations.entrySet()) {
Expand All @@ -110,22 +111,47 @@ public void handle(Message<Object> m) {
// We need to create a duplicated context from the "context"
Context dup = VertxContext.getOrCreateDuplicatedContext(context);
setContextSafe(dup, true);
dup.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> event) {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());

if (invoker.isRunningOnVirtualThread()) {
// Switch to a Vert.x context to capture it and use it during the invocation.
dup.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
}
}
});
}
});
} else {
dup.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> event) {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
}
event.complete();
}
event.complete();
}
}, invoker.isOrdered(), null);
}, invoker.isOrdered(), null);
}
} else {
// Will run on the context used for the consumer registration.
// It's a duplicated context, but we need to mark it as safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

public class VertxProducerTest {

private VertxRecorder recorder;
private VertxEventBusConsumerRecorder recorder;
private VertxProducer producer;

@BeforeEach
public void setUp() {
producer = new VertxProducer();
recorder = new VertxRecorder();
recorder = new VertxEventBusConsumerRecorder();
}

@AfterEach
Expand Down
4 changes: 0 additions & 4 deletions extensions/virtual-threads/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
19 changes: 10 additions & 9 deletions extensions/virtual-threads/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
<!-- Do not depends on the extension as it creates a cycle -->
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down Expand Up @@ -58,13 +59,13 @@
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
1 change: 1 addition & 0 deletions integration-tests/virtual-threads/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<module>kafka-virtual-threads</module>
<module>amqp-virtual-threads</module>
<module>jms-virtual-threads</module>
<module>vertx-event-bus-virtual-threads</module>
</modules>

<build>
Expand Down
Loading

0 comments on commit 5661a27

Please sign in to comment.