() {
- Executor current = null;
-
- /**
- * This method is used to specify a custom executor to dispatch virtual threads on carrier threads
- * We need reflection for both ease of use (see {@link #get() Get} method) but also because we call methods
- * of private classes from the java.lang package.
- *
- * It is used for testing purposes only for now
- */
- private Executor setVirtualThreadCustomScheduler(Executor executor) throws ClassNotFoundException,
- InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchMethodException {
- var vtf = Class.forName("java.lang.ThreadBuilders").getDeclaredClasses()[0];
- Constructor constructor = vtf.getDeclaredConstructors()[0];
- constructor.setAccessible(true);
- ThreadFactory tf = (ThreadFactory) constructor.newInstance(
- new Object[] { executor, "quarkus-virtual-factory-", 0, 0,
- null });
-
- return (Executor) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class)
- .invoke(this, tf);
- }
-
- /**
- * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
- * change --release, --source, --target flags and to enable previews.
- * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
- * using java 11 and executed with a loom-compliant JDK.
- *
- * IMPORTANT: we still need to use a duplicated context to have all the propagation working.
- * Thus, the context is captured and applied/terminated in the virtual thread.
- */
- @Override
- public Executor get() {
- if (current == null) {
- try {
- var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
- .invoke(this);
- current = new Executor() {
- @Override
- public void execute(Runnable command) {
- var context = Vertx.currentContext();
- if (!(context instanceof ContextInternal)) {
- virtual.execute(command);
- } else {
- virtual.execute(new Runnable() {
- @Override
- public void run() {
- final var previousContext = ((ContextInternal) context).beginDispatch();
- try {
- command.run();
- } finally {
- ((ContextInternal) context).endDispatch(previousContext);
- }
- }
- });
- }
- }
- };
- } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
- logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
- //quite ugly but works
- logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" +
- " blocking executor will be used, please check that your JDK is compatible with " +
- "virtual threads");
- //if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR
- current = EXECUTOR_SUPPLIER.get();
- }
- }
- return current;
- }
- };
static volatile Deployment currentDeployment;
@@ -205,7 +129,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment,
}
RuntimeDeploymentManager runtimeDeploymentManager = new RuntimeDeploymentManager(info, EXECUTOR_SUPPLIER,
- VIRTUAL_EXECUTOR_SUPPLIER,
+ VirtualThreadsRecorder::getCurrent,
closeTaskHandler, contextFactory, new ArcThreadSetupAction(beanContainer.requestContext()),
vertxConfig.rootPath);
Deployment deployment = runtimeDeploymentManager.deploy();
diff --git a/extensions/smallrye-reactive-messaging/deployment/pom.xml b/extensions/smallrye-reactive-messaging/deployment/pom.xml
index 885efec6591ce..c99b6e63e260d 100644
--- a/extensions/smallrye-reactive-messaging/deployment/pom.xml
+++ b/extensions/smallrye-reactive-messaging/deployment/pom.xml
@@ -38,6 +38,10 @@
io.quarkus
quarkus-vertx-deployment
+
+ io.quarkus
+ quarkus-virtual-threads-deployment
+
org.commonmark
commonmark
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
index a5b0955d7ff07..d2ebac1e6fa28 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
@@ -52,7 +52,6 @@
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
-import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem;
import io.quarkus.deployment.recording.RecorderContext;
import io.quarkus.gizmo.ClassCreator;
@@ -114,12 +113,6 @@ AdditionalBeanBuildItem beans() {
QuarkusWorkerPoolRegistry.class);
}
- @BuildStep
- void nativeRuntimeInitClasses(BuildProducer runtimeInitClasses) {
- runtimeInitClasses.produce(new RuntimeInitializedClassBuildItem(
- "io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry$VirtualExecutorSupplier"));
- }
-
@BuildStep
AnnotationsTransformerBuildItem transformBeanScope(BeanArchiveIndexBuildItem index,
CustomScopeAnnotationsBuildItem scopes) {
diff --git a/extensions/smallrye-reactive-messaging/runtime/pom.xml b/extensions/smallrye-reactive-messaging/runtime/pom.xml
index fb63c1dce84a7..30cc9bd512573 100644
--- a/extensions/smallrye-reactive-messaging/runtime/pom.xml
+++ b/extensions/smallrye-reactive-messaging/runtime/pom.xml
@@ -28,6 +28,10 @@
io.quarkus
quarkus-vertx
+
+ io.quarkus
+ quarkus-virtual-threads
+
io.smallrye.reactive
smallrye-reactive-messaging-provider
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java
index a8b0e0f000b7f..e93767d8aba6d 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java
@@ -1,15 +1,11 @@
package io.quarkus.smallrye.reactivemessaging.runtime;
-import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.function.Supplier;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
@@ -21,17 +17,14 @@
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
-import org.slf4j.LoggerFactory;
-import io.quarkus.runtime.ExecutorRecorder;
+import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
-import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
-import io.vertx.core.impl.ContextInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.WorkerExecutor;
@@ -41,7 +34,8 @@
// TODO: create a different entry for WorkerPoolRegistry than `analyzeWorker` and drop this class
public class QuarkusWorkerPoolRegistry extends WorkerPoolRegistry {
- private static final Logger logger = Logger.getLogger(QuarkusWorkerPoolRegistry.class);
+ private static final Logger log = Logger.getLogger(WorkerPoolRegistry.class);
+
private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
private static final String WORKER_CONCURRENCY = "max-concurrency";
public static final String DEFAULT_VIRTUAL_THREAD_WORKER = "";
@@ -59,61 +53,6 @@ private static Set initVirtualThreadWorkers() {
return set;
}
- private enum VirtualExecutorSupplier implements Supplier {
- Instance;
-
- private final Executor executor;
-
- /**
- * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
- * change --release, --source, --target flags and to enable previews.
- * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
- * using java 11 and executed with a loom-compliant JDK.
- */
- VirtualExecutorSupplier() {
- Executor actual;
- try {
- var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
- .invoke(this);
- actual = new Executor() {
- @Override
- public void execute(Runnable command) {
- var context = Vertx.currentContext();
- if (!(context instanceof ContextInternal)) {
- virtual.execute(command);
- } else {
- ContextInternal contextInternal = (ContextInternal) context;
- virtual.execute(new Runnable() {
- @Override
- public void run() {
- final var previousContext = contextInternal.beginDispatch();
- try {
- command.run();
- } finally {
- contextInternal.endDispatch(previousContext);
- }
- }
- });
- }
- }
- };
- } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
- //quite ugly but works
- logger.warnf(e, "You weren't able to create an executor that spawns virtual threads, the default" +
- " blocking executor will be used, please check that your JDK is compatible with " +
- "virtual threads");
- //if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR
- actual = ExecutorRecorder.getCurrent();
- }
- this.executor = actual;
- }
-
- @Override
- public Executor get() {
- return this.executor;
- }
- }
-
public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(100) @BeforeDestroyed(ApplicationScoped.class) Object event) {
if (!workerExecutors.isEmpty()) {
@@ -151,7 +90,7 @@ public Uni executeWork(Context currentContext, Uni uni, String workerN
}
private Uni runOnVirtualThread(Context currentContext, Uni uni) {
- return uni.runSubscriptionOn(VirtualExecutorSupplier.Instance.get())
+ return uni.runSubscriptionOn(VirtualThreadsRecorder.getCurrent())
.onItemOrFailure().transformToUni((item, failure) -> {
return Uni.createFrom().emitter(emitter -> {
if (currentContext != null) {
@@ -186,9 +125,8 @@ public WorkerExecutor getWorker(String workerName) {
if (executor == null) {
executor = executionHolder.vertx().createSharedWorkerExecutor(workerName,
workerConcurrency.get(workerName));
- LoggerFactory.getLogger(WorkerPoolRegistry.class)
- .info("Created worker pool named " + workerName + " with concurrency of "
- + workerConcurrency.get(workerName));
+ log.info("Created worker pool named " + workerName + " with concurrency of "
+ + workerConcurrency.get(workerName));
workerExecutors.put(workerName, executor);
}
}
diff --git a/extensions/virtual-threads/deployment/pom.xml b/extensions/virtual-threads/deployment/pom.xml
new file mode 100644
index 0000000000000..67a18d2b61479
--- /dev/null
+++ b/extensions/virtual-threads/deployment/pom.xml
@@ -0,0 +1,51 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-virtual-threads-parent
+ 999-SNAPSHOT
+ ../pom.xml
+
+
+ quarkus-virtual-threads-deployment
+ Quarkus - Virtual Threads - Deployment
+
+
+
+ io.quarkus
+ quarkus-arc-deployment
+
+
+ io.quarkus
+ quarkus-virtual-threads
+
+
+ io.quarkus
+ quarkus-core-deployment
+
+
+ io.quarkus
+ quarkus-vertx-deployment
+
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
diff --git a/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java
new file mode 100644
index 0000000000000..136bfa1f2bff8
--- /dev/null
+++ b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java
@@ -0,0 +1,19 @@
+package io.quarkus.virtual.threads;
+
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.annotations.ExecutionTime;
+import io.quarkus.deployment.annotations.Record;
+import io.quarkus.deployment.builditem.LaunchModeBuildItem;
+import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
+
+public class VirtualThreadsProcessor {
+
+ @BuildStep
+ @Record(ExecutionTime.STATIC_INIT)
+ public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder,
+ ShutdownContextBuildItem shutdownContextBuildItem,
+ LaunchModeBuildItem launchModeBuildItem) {
+ recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode());
+ }
+
+}
diff --git a/extensions/virtual-threads/pom.xml b/extensions/virtual-threads/pom.xml
new file mode 100644
index 0000000000000..e7a87fb197424
--- /dev/null
+++ b/extensions/virtual-threads/pom.xml
@@ -0,0 +1,23 @@
+
+
+ 4.0.0
+
+
+ quarkus-extensions-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../pom.xml
+
+
+ quarkus-virtual-threads-parent
+ Quarkus - Virtual Threads
+ pom
+
+
+ runtime
+ deployment
+
+
+
\ No newline at end of file
diff --git a/extensions/virtual-threads/runtime/pom.xml b/extensions/virtual-threads/runtime/pom.xml
new file mode 100644
index 0000000000000..948c7ce99de64
--- /dev/null
+++ b/extensions/virtual-threads/runtime/pom.xml
@@ -0,0 +1,70 @@
+
+
+ 4.0.0
+
+
+ io.quarkus
+ quarkus-virtual-threads-parent
+ 999-SNAPSHOT
+ ../pom.xml
+
+
+ quarkus-virtual-threads
+ Quarkus - Virtual Threads - Runtime
+ Virtual Threads Executor
+
+
+
+ io.quarkus
+ quarkus-core
+
+
+ io.quarkus
+ quarkus-arc
+
+
+ io.quarkus
+ quarkus-vertx
+
+
+ io.quarkus
+ quarkus-junit5-internal
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+
+
+
+ io.quarkus
+ quarkus-extension-maven-plugin
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ --enable-preview
+
+
+
+
+
diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java
new file mode 100644
index 0000000000000..09499bb96f67f
--- /dev/null
+++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/ContextPreservingExecutorService.java
@@ -0,0 +1,105 @@
+package io.quarkus.virtual.threads;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.impl.ContextInternal;
+
+/**
+ * Delegating executor service implementation preserving the Vert.x context on {@link #execute(Runnable)}
+ */
+class ContextPreservingExecutorService implements ExecutorService {
+ private final ExecutorService delegate;
+
+ ContextPreservingExecutorService(final ExecutorService delegate) {
+ this.delegate = delegate;
+ }
+
+ public void execute(final Runnable command) {
+ var context = Vertx.currentContext();
+ if (!(context instanceof ContextInternal)) {
+ delegate.execute(command);
+ } else {
+ ContextInternal contextInternal = (ContextInternal) context;
+ delegate.execute(new Runnable() {
+ @Override
+ public void run() {
+ final var previousContext = contextInternal.beginDispatch();
+ try {
+ command.run();
+ } finally {
+ contextInternal.endDispatch(previousContext);
+ }
+ }
+ });
+ }
+ }
+
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public Future submit(Runnable task, T result) {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ public List shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ public String toString() {
+ return delegate.toString();
+ }
+}
diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsConfig.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsConfig.java
new file mode 100644
index 0000000000000..1e1b3aa24b87d
--- /dev/null
+++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsConfig.java
@@ -0,0 +1,32 @@
+package io.quarkus.virtual.threads;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import io.quarkus.runtime.annotations.ConfigItem;
+import io.quarkus.runtime.annotations.ConfigPhase;
+import io.quarkus.runtime.annotations.ConfigRoot;
+
+@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED)
+public class VirtualThreadsConfig {
+
+ /**
+ * Virtual thread name prefix. If left blank virtual threads will be unnamed.
+ */
+ @ConfigItem(defaultValue = "quarkus-virtual-thread-")
+ Optional namePrefix;
+
+ /**
+ * The shutdown timeout. If all pending work has not been completed by this time
+ * then any pending tasks will be interrupted, and the shutdown process will continue
+ */
+ @ConfigItem(defaultValue = "1M")
+ public Duration shutdownTimeout;
+
+ /**
+ * The frequency at which the status of the executor service should be checked during shutdown.
+ * Setting this key to an empty value disables the shutdown check interval.
+ */
+ @ConfigItem(defaultValue = "5s")
+ public Optional shutdownCheckInterval;
+}
diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java
new file mode 100644
index 0000000000000..c326cd433d1b2
--- /dev/null
+++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java
@@ -0,0 +1,166 @@
+package io.quarkus.virtual.threads;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import org.jboss.logging.Logger;
+
+import io.quarkus.runtime.LaunchMode;
+import io.quarkus.runtime.ShutdownContext;
+import io.quarkus.runtime.annotations.Recorder;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.vertx.core.Vertx;
+import io.vertx.core.impl.ContextInternal;
+
+@Recorder
+public class VirtualThreadsRecorder {
+
+ private static final Logger logger = Logger.getLogger("io.quarkus.virtual-threads");
+
+ static VirtualThreadsConfig config = new VirtualThreadsConfig();
+
+ private static volatile Executor current;
+ private static final Object lock = new Object();
+
+ public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) {
+ config = c;
+ if (launchMode == LaunchMode.DEVELOPMENT) {
+ shutdownContext.addLastShutdownTask(new Runnable() {
+ @Override
+ public void run() {
+ Executor executor = current;
+ if (executor instanceof ExecutorService) {
+ ((ExecutorService) executor).shutdownNow();
+ }
+ current = null;
+ }
+ });
+ } else {
+ shutdownContext.addLastShutdownTask(new Runnable() {
+ @Override
+ public void run() {
+ Executor executor = current;
+ if (executor instanceof ExecutorService) {
+ ExecutorService service = (ExecutorService) executor;
+ service.shutdown();
+
+ final long timeout = config.shutdownTimeout.toNanos();
+ final long interval = config.shutdownCheckInterval.orElse(config.shutdownTimeout).toNanos();
+
+ long start = System.nanoTime();
+ int loop = 1;
+ long elapsed = 0;
+ for (;;) {
+ // This log can be very useful when debugging problems
+ logger.debugf("Await termination loop: %s, remaining: %s", loop++, timeout - elapsed);
+ try {
+ if (!service.awaitTermination(Math.min(timeout, interval), NANOSECONDS)) {
+ elapsed = System.nanoTime() - start;
+ if (elapsed >= timeout) {
+ service.shutdownNow();
+ break;
+ }
+ } else {
+ return;
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+
+ public static Executor getCurrent() {
+ Executor executor = current;
+ if (executor != null) {
+ return executor;
+ }
+ synchronized (lock) {
+ if (current == null) {
+ current = createExecutor();
+ }
+ return current;
+ }
+ }
+
+ static ExecutorService newVirtualThreadPerTaskExecutorWithName(String prefix)
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException {
+ Method ofVirtual = Thread.class.getMethod("ofVirtual");
+ Object vtb = ofVirtual.invoke(VirtualThreadsRecorder.class);
+ Class> vtbClass = Class.forName("java.lang.Thread$Builder$OfVirtual");
+ Method name = vtbClass.getMethod("name", String.class, long.class);
+ vtb = name.invoke(vtb, prefix, 0);
+ Method factory = vtbClass.getMethod("factory");
+ ThreadFactory tf = (ThreadFactory) factory.invoke(vtb);
+
+ return (ExecutorService) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class)
+ .invoke(VirtualThreadsRecorder.class, tf);
+ }
+
+ static ExecutorService newVirtualThreadPerTaskExecutor()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ return (ExecutorService) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
+ .invoke(VirtualThreadsRecorder.class);
+ }
+
+ static ExecutorService newVirtualThreadExecutor()
+ throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
+ try {
+ Optional namePrefix = config.namePrefix;
+ return namePrefix.isPresent() ? newVirtualThreadPerTaskExecutorWithName(namePrefix.get())
+ : newVirtualThreadPerTaskExecutor();
+ } catch (ClassNotFoundException e) {
+ logger.warn("Unable to invoke java.util.concurrent.Executors#newThreadPerTaskExecutor" +
+ " with VirtualThreadFactory, falling back to unnamed virtual threads", e);
+ return newVirtualThreadPerTaskExecutor();
+ }
+ }
+
+ /**
+ * This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
+ * change --release, --source, --target flags and to enable previews.
+ * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
+ * using java 11 and executed with a loom-compliant JDK.
+ */
+ private static Executor createExecutor() {
+ try {
+ return new ContextPreservingExecutorService(newVirtualThreadExecutor());
+ } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
+ logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
+ //quite ugly but works
+ logger.warn("You weren't able to create an executor that spawns virtual threads, the default" +
+ " blocking executor will be used, please check that your JDK is compatible with " +
+ "virtual threads");
+ //if for some reason a class/method can't be loaded or invoked we return the traditional executor,
+ // wrapping executeBlocking.
+ return new Executor() {
+ @Override
+ public void execute(Runnable command) {
+ var context = Vertx.currentContext();
+ if (!(context instanceof ContextInternal)) {
+ Infrastructure.getDefaultWorkerPool().execute(command);
+ } else {
+ context.executeBlocking(fut -> {
+ try {
+ command.run();
+ fut.complete(null);
+ } catch (Exception e) {
+ fut.fail(e);
+ }
+ }, false);
+ }
+ }
+ };
+ }
+ }
+
+}
diff --git a/extensions/virtual-threads/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/virtual-threads/runtime/src/main/resources/META-INF/quarkus-extension.yaml
new file mode 100644
index 0000000000000..06b2a96598fff
--- /dev/null
+++ b/extensions/virtual-threads/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -0,0 +1,10 @@
+---
+artifact: ${project.groupId}:${project.artifactId}:${project.version}
+name: "Virtual Threads Support"
+metadata:
+ keywords:
+ - "virtual-threads"
+ - "loom"
+ unlisted: true
+ config:
+ - "quarkus.virtual-threads."
diff --git a/extensions/virtual-threads/runtime/src/test/java/io/quarkus/virtual/threads/VirtualThreadExecutorSupplierTest.java b/extensions/virtual-threads/runtime/src/test/java/io/quarkus/virtual/threads/VirtualThreadExecutorSupplierTest.java
new file mode 100644
index 0000000000000..e4cb74d37d117
--- /dev/null
+++ b/extensions/virtual-threads/runtime/src/test/java/io/quarkus/virtual/threads/VirtualThreadExecutorSupplierTest.java
@@ -0,0 +1,61 @@
+package io.quarkus.virtual.threads;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.concurrent.Executor;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
+
+class VirtualThreadExecutorSupplierTest {
+
+ @Test
+ @EnabledForJreRange(min = JRE.JAVA_20, disabledReason = "Virtual Threads are a preview feature starting from Java 20")
+ void virtualThreadCustomScheduler()
+ throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException {
+ Executor executor = VirtualThreadsRecorder.newVirtualThreadPerTaskExecutorWithName("vthread-");
+ var assertSubscriber = Uni.createFrom().emitter(e -> {
+ assertThat(Thread.currentThread().getName()).isNotEmpty()
+ .startsWith("vthread-");
+ assertThatItRunsOnVirtualThread();
+ e.complete(null);
+ }).runSubscriptionOn(executor)
+ .subscribe().withSubscriber(UniAssertSubscriber.create());
+ assertSubscriber.awaitItem(Duration.ofSeconds(1)).assertCompleted();
+ }
+
+ @Test
+ @EnabledForJreRange(min = JRE.JAVA_20, disabledReason = "Virtual Threads are a preview feature starting from Java 20")
+ void execute() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
+ Executor executor = VirtualThreadsRecorder.newVirtualThreadPerTaskExecutor();
+ var assertSubscriber = Uni.createFrom().emitter(e -> {
+ assertThat(Thread.currentThread().getName()).isEmpty();
+ assertThatItRunsOnVirtualThread();
+ e.complete(null);
+ }).runSubscriptionOn(executor)
+ .subscribe().withSubscriber(UniAssertSubscriber.create());
+ assertSubscriber.awaitItem(Duration.ofSeconds(1)).assertCompleted();
+ }
+
+ public static void assertThatItRunsOnVirtualThread() {
+ // We cannot depend on a Java 20.
+ try {
+ Method isVirtual = Thread.class.getMethod("isVirtual");
+ isVirtual.setAccessible(true);
+ boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
+ if (!virtual) {
+ throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread");
+ }
+ } catch (Exception e) {
+ throw new AssertionError(
+ "Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e);
+ }
+ }
+}