Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Integrate @RunOnVirtualThread with the @ConsumeEvent
Browse files Browse the repository at this point in the history
Allows @ConsumeEvent method to run on a virtual thread.
Also verify that sending and receiving from the event bus is not pinning the carrier thread.
cescoffier committed Aug 28, 2023

Verified

This commit was signed with the committer’s verified signature.
cescoffier Clement Escoffier
1 parent 3dede5c commit c14639b
Showing 20 changed files with 432 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/virtual-threads-tests.json
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
{
"category": "Main",
"timeout": 45,
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads",
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads",
"os-name": "ubuntu-latest"
},
{
9 changes: 9 additions & 0 deletions docs/src/main/asciidoc/vertx-reference.adoc
Original file line number Diff line number Diff line change
@@ -694,6 +694,15 @@ Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
----

=== Process events on virtual threads

Methods annotated with `@ConsumeEvent` can also be annotated with `@RunOnVirtualThread`.
In this case, the method is invoked on a virtual thread.
Each event is invoked on a different virtual thread.

The method must be _blocking_ and your Java runtime must provide support for virtual threads.
Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

=== Use codecs

The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses codecs to _serialize_ and _deserialize_ objects.
4 changes: 4 additions & 0 deletions extensions/vertx/deployment/pom.xml
Original file line number Diff line number Diff line change
@@ -25,6 +25,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny-deployment</artifactId>
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.vertx.runtime.EventConsumerInvoker;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Uni;
import io.vertx.core.MultiMap;
import io.vertx.core.eventbus.Message;
@@ -78,6 +79,7 @@ class EventBusConsumer {
protected static final MethodDescriptor THROWABLE_TO_STRING = MethodDescriptor
.ofMethod(Throwable.class, "toString", String.class);
protected static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName());
protected static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());

static String generateInvoker(BeanInfo bean, MethodInfo method,
AnnotationInstance consumeEvent,
@@ -100,9 +102,10 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
String generatedName = targetPackage + baseName + INVOKER_SUFFIX + "_" + method.name() + "_"
+ HashUtil.sha1(sigBuilder.toString());

boolean blocking;
boolean blocking, runOnVirtualThread;
AnnotationValue blockingValue = consumeEvent.value("blocking");
blocking = method.hasAnnotation(BLOCKING) || (blockingValue != null && blockingValue.asBoolean());
runOnVirtualThread = method.hasAnnotation(RUN_ON_VIRTUAL_THREAD);

ClassCreator invokerCreator = ClassCreator.builder().classOutput(classOutput).className(generatedName)
.superClass(EventConsumerInvoker.class).build();
@@ -113,11 +116,16 @@ static String generateInvoker(BeanInfo bean, MethodInfo method,
FieldCreator containerField = invokerCreator.getFieldCreator("container", ArcContainer.class)
.setModifiers(ACC_PRIVATE | ACC_FINAL);

if (blocking) {
if (blocking || runOnVirtualThread) {
MethodCreator isBlocking = invokerCreator.getMethodCreator("isBlocking", boolean.class);
isBlocking.returnValue(isBlocking.load(true));
}

if (runOnVirtualThread) {
MethodCreator isRunOnVirtualThread = invokerCreator.getMethodCreator("isRunningOnVirtualThread", boolean.class);
isRunOnVirtualThread.returnValue(isRunOnVirtualThread.load(true));
}

AnnotationValue orderedValue = consumeEvent.value("ordered");
boolean ordered = orderedValue != null && orderedValue.asBoolean();
if (ordered) {
Original file line number Diff line number Diff line change
@@ -49,8 +49,9 @@
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
import io.quarkus.vertx.runtime.VertxEventBusConsumerRecorder;
import io.quarkus.vertx.runtime.VertxProducer;
import io.quarkus.vertx.runtime.VertxRecorder;
import io.smallrye.common.annotation.RunOnVirtualThread;

class VertxProcessor {

@@ -68,7 +69,7 @@ AdditionalBeanBuildItem registerBean() {

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder,
VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder recorder,
List<EventConsumerBusinessMethodItem> messageConsumerBusinessMethods,
BuildProducer<GeneratedClassBuildItem> generatedClass,
AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown,
@@ -102,7 +103,7 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxRecorder recorder,
@BuildStep
@Record(ExecutionTime.STATIC_INIT)
void currentContextFactory(BuildProducer<CurrentContextFactoryBuildItem> currentContextFactory,
VertxBuildConfig buildConfig, VertxRecorder recorder) {
VertxBuildConfig buildConfig, VertxEventBusConsumerRecorder recorder) {
if (buildConfig.customizeArcContext()) {
currentContextFactory.produce(new CurrentContextFactoryBuildItem(recorder.currentContextFactory()));
}
@@ -150,6 +151,12 @@ void collectEventConsumers(
"An event consumer business method that accepts io.vertx.core.eventbus.Message or io.vertx.mutiny.core.eventbus.Message must return void [method: %s, bean:%s]",
method, bean));
}
if (method.hasAnnotation(RunOnVirtualThread.class) && consumeEvent.value("ordered") != null
&& consumeEvent.value("ordered").asBoolean()) {
throw new IllegalStateException(String.format(
"An event consumer business method that cannot use @RunOnVirtualThread and set the ordered attribute to true [method: %s, bean:%s]",
method, bean));
}
messageConsumerBusinessMethods
.produce(new EventConsumerBusinessMethodItem(bean, method, consumeEvent));
LOGGER.debugf("Found event consumer business method %s declared on %s", method, bean);
4 changes: 4 additions & 0 deletions extensions/vertx/runtime/pom.xml
Original file line number Diff line number Diff line change
@@ -42,6 +42,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mutiny</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-latebound-mdc-provider</artifactId>
Original file line number Diff line number Diff line change
@@ -18,6 +18,10 @@ public boolean isBlocking() {
return false;
}

public boolean isRunningOnVirtualThread() {
return false;
}

public boolean isOrdered() {
return false;
}
@@ -77,7 +81,7 @@ public void accept(Object result, Throwable failure) {
if (failure != null) {
if (message.replyAddress() == null) {
// No reply handler
throw VertxRecorder.wrapIfNecessary(failure);
throw VertxEventBusConsumerRecorder.wrapIfNecessary(failure);
} else {
message.fail(ConsumeEvent.EXPLICIT_FAILURE_CODE, failure.getMessage());
}
@@ -105,12 +109,12 @@ public void accept(Object result, Throwable failure) {
try {
requestContext.destroy(endState);
} catch (Exception e) {
throw VertxRecorder.wrapIfNecessary(e);
throw VertxEventBusConsumerRecorder.wrapIfNecessary(e);
}
if (failure != null) {
if (message.replyAddress() == null) {
// No reply handler
throw VertxRecorder.wrapIfNecessary(failure);
throw VertxEventBusConsumerRecorder.wrapIfNecessary(failure);
} else {
message.fail(ConsumeEvent.EXPLICIT_FAILURE_CODE, failure.getMessage());
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.runtime.configuration.ProfileManager;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
@@ -35,17 +36,17 @@
import io.vertx.core.impl.VertxInternal;

@Recorder
public class VertxRecorder {
public class VertxEventBusConsumerRecorder {

private static final Logger LOGGER = Logger.getLogger(VertxRecorder.class.getName());
private static final Logger LOGGER = Logger.getLogger(VertxEventBusConsumerRecorder.class.getName());

static volatile Vertx vertx;
static volatile List<MessageConsumer<?>> messageConsumers;

public void configureVertx(Supplier<Vertx> vertx, Map<String, ConsumeEvent> messageConsumerConfigurations,
LaunchMode launchMode, ShutdownContext shutdown, Map<Class<?>, Class<?>> codecByClass) {
VertxRecorder.vertx = vertx.get();
VertxRecorder.messageConsumers = new CopyOnWriteArrayList<>();
VertxEventBusConsumerRecorder.vertx = vertx.get();
VertxEventBusConsumerRecorder.messageConsumers = new CopyOnWriteArrayList<>();

registerMessageConsumers(messageConsumerConfigurations);
registerCodecs(codecByClass);
@@ -83,7 +84,7 @@ void destroy() {
void registerMessageConsumers(Map<String, ConsumeEvent> messageConsumerConfigurations) {
if (!messageConsumerConfigurations.isEmpty()) {
EventBus eventBus = vertx.eventBus();
VertxInternal vi = (VertxInternal) VertxRecorder.vertx;
VertxInternal vi = (VertxInternal) VertxEventBusConsumerRecorder.vertx;
CountDownLatch latch = new CountDownLatch(messageConsumerConfigurations.size());
final List<Throwable> registrationFailures = new ArrayList<>();
for (Entry<String, ConsumeEvent> entry : messageConsumerConfigurations.entrySet()) {
@@ -110,22 +111,47 @@ public void handle(Message<Object> m) {
// We need to create a duplicated context from the "context"
Context dup = VertxContext.getOrCreateDuplicatedContext(context);
setContextSafe(dup, true);
dup.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> event) {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());

if (invoker.isRunningOnVirtualThread()) {
// Switch to a Vert.x context to capture it and use it during the invocation.
dup.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
}
}
});
}
});
} else {
dup.executeBlocking(new Handler<Promise<Object>>() {
@Override
public void handle(Promise<Object> event) {
try {
invoker.invoke(m);
} catch (Exception e) {
if (m.replyAddress() == null) {
// No reply handler
throw wrapIfNecessary(e);
} else {
m.fail(ConsumeEvent.FAILURE_CODE, e.toString());
}
}
event.complete();
}
event.complete();
}
}, invoker.isOrdered(), null);
}, invoker.isOrdered(), null);
}
} else {
// Will run on the context used for the consumer registration.
// It's a duplicated context, but we need to mark it as safe.
Original file line number Diff line number Diff line change
@@ -13,13 +13,13 @@

public class VertxProducerTest {

private VertxRecorder recorder;
private VertxEventBusConsumerRecorder recorder;
private VertxProducer producer;

@BeforeEach
public void setUp() {
producer = new VertxProducer();
recorder = new VertxRecorder();
recorder = new VertxEventBusConsumerRecorder();
}

@AfterEach
4 changes: 0 additions & 4 deletions extensions/virtual-threads/deployment/pom.xml
Original file line number Diff line number Diff line change
@@ -27,10 +27,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
19 changes: 10 additions & 9 deletions extensions/virtual-threads/runtime/pom.xml
Original file line number Diff line number Diff line change
@@ -25,8 +25,9 @@
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
<!-- Do not depends on the extension as it creates a cycle -->
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
@@ -58,13 +59,13 @@
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
1 change: 1 addition & 0 deletions integration-tests/virtual-threads/pom.xml
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
<module>kafka-virtual-threads</module>
<module>amqp-virtual-threads</module>
<module>jms-virtual-threads</module>
<module>vertx-event-bus-virtual-threads</module>
</modules>

<build>
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>quarkus-virtual-threads-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-integration-test-virtual-threads-vertx-event-bus</artifactId>
<name>Quarkus - Integration Tests - Virtual Threads - Vert.x Event Bus</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>


<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.quarkus.virtual.vertx;

import java.lang.reflect.Method;

import io.quarkus.arc.Arc;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

public class AssertHelper {

/**
* Asserts that the current method:
* - runs on a duplicated context
* - runs on a virtual thread
* - has the request scope activated
*/
public static void assertEverything() {
assertThatTheRequestScopeIsActive();
assertThatItRunsOnVirtualThread();
assertThatItRunsOnADuplicatedContext();
}

public static void assertThatTheRequestScopeIsActive() {
if (!Arc.container().requestContext().isActive()) {
throw new AssertionError(("Expected the request scope to be active"));
}
}

public static void assertThatItRunsOnADuplicatedContext() {
var context = Vertx.currentContext();
if (context == null) {
throw new AssertionError("The method does not run on a Vert.x context");
}
if (!VertxContext.isOnDuplicatedContext()) {
throw new AssertionError("The method does not run on a Vert.x **duplicated** context");
}
}

public static void assertThatItRunsOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (!virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread");
}
} catch (Exception e) {
throw new AssertionError(
"Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e);
}
}

public static void assertNotOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread");
}
} catch (Exception e) {
// Trying using Thread name.
var name = Thread.currentThread().toString();
if (name.toLowerCase().contains("virtual")) {
throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkus.virtual.vertx;

import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@ApplicationScoped
public class EventBusConsumer {

public static final List<String> ONE_WAY = new CopyOnWriteArrayList<>();

@ConsumeEvent("one-way")
@RunOnVirtualThread
void receive(String m) {
AssertHelper.assertEverything();
ONE_WAY.add(m);
}

@ConsumeEvent("request-reply")
@RunOnVirtualThread
String process(String m) {
AssertHelper.assertEverything();
return m.toUpperCase();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.virtual.vertx;

import io.smallrye.common.annotation.RunOnVirtualThread;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;

import java.time.Duration;
import java.util.List;

@RunOnVirtualThread
@Path("/")
public class VertxEventBusResource {

@Inject
EventBus bus;

@GET
@Path("/one-way")
public void sentOneWay() {
bus.send("one-way", "hello");
}

@GET
@Path("/one-way-verify")
public List<String> oneWayVerification() {
return EventBusConsumer.ONE_WAY;
}

@GET
@Path("/request-reply")
public String requestReply() {
return bus.<String> request("request-reply", "hello").map(m -> m.body()).await().atMost(Duration.ofSeconds(10));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
quarkus.native.additional-build-args=--enable-preview
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.quarkus.virtual.vertx;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

/**
* An integration test reading the output of the unit test to verify that no tests where pinning the carrier thread.
* It reads the reports generated by surefire.
*/
public class NoPinningVerify {

@Test
void verify() throws IOException, ParserConfigurationException, SAXException {
var reports = new File("target", "surefire-reports");
Assertions.assertTrue(reports.isDirectory(),
"Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?");
var list = reports.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("TEST") && name.endsWith("Test.xml");
}
});
Assertions.assertNotNull(list,
"Unable to find " + reports.getAbsolutePath() + ", did you run the tests with Maven before?");

for (File report : list) {
Document document = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(report);
var suite = document.getFirstChild();
var cases = getChildren(suite.getChildNodes(), "testcase");
for (Node c : cases) {
verify(report, c);
}
}

}

private void verify(File file, Node ca) {
var fullname = ca.getAttributes().getNamedItem("classname").getTextContent() + "."
+ ca.getAttributes().getNamedItem("name").getTextContent();
var output = getChildren(ca.getChildNodes(), "system-out");
if (output.isEmpty()) {
return;
}
var sout = output.get(0).getTextContent();
if (sout.contains("VThreadContinuation.onPinned")) {
throw new AssertionError("The test case " + fullname + " pinned the carrier thread, check " + file.getAbsolutePath()
+ " for details (or the log of the test)");
}

}

private List<Node> getChildren(NodeList nodes, String name) {
List<Node> list = new ArrayList<>();
for (int i = 0; i < nodes.getLength(); i++) {
var node = nodes.item(i);
if (node.getNodeName().equalsIgnoreCase(name)) {
list.add(node);
}
}
return list;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.virtual.vertx;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
class RunOnVirtualThreadIT extends RunOnVirtualThreadTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.quarkus.virtual.vertx;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static org.hamcrest.Matchers.is;

@QuarkusTest
class RunOnVirtualThreadTest {

@Test
@Timeout(10)
void testOneWay() {
RestAssured.get("/one-way").then()
.assertThat().statusCode(204);

Awaitility.await().untilAsserted(() -> {
RestAssured.get("/one-way-verify").then()
.assertThat().statusCode(200)
.body("size()", is(1));
});
}

@Test
@Timeout(10)
void testRequestReply() {
RestAssured.get("/request-reply").then()
.assertThat().statusCode(200)
.body(is("HELLO"));
}
}

0 comments on commit c14639b

Please sign in to comment.