Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Virtual threads module providing executor supplier for running virtual threads #35069

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,16 @@
<artifactId>quarkus-info</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Quarkus test dependencies -->
<dependency>
Expand Down
13 changes: 13 additions & 0 deletions devtools/bom-descriptor-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2839,6 +2839,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-webjars-locator</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2853,6 +2853,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-webjars-locator-deployment</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions docs/src/main/asciidoc/virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,19 @@ So, the data written in the duplicated context (and the request scope, as the re

However, thread locals are not propagated.

== Virtual thread names

Virtual threads are created without a thread name by default, which is not practical to identify the execution for debugging and logging purposes.
Quarkus managed virtual threads are named and prefixed with `quarkus-virtual-thread-`.
You can customize this prefix, or disable the naming altogether configuring an empty value:

[source, properties]
----
quarkus.virtual-threads.name-prefix=

----


== Additional references

- https://dl.acm.org/doi/10.1145/3583678.3596895[Considerations for integrating virtual threads in a Java framework: a Quarkus example in a resource-constrained environment]
4 changes: 4 additions & 0 deletions extensions/grpc/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<artifactId>quarkus-smallrye-health-deployment</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
4 changes: 4 additions & 0 deletions extensions/grpc/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-stork</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.time.Duration;
import java.util.AbstractMap;
Expand All @@ -20,13 +19,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import jakarta.enterprise.inject.Instance;
Expand Down Expand Up @@ -63,15 +59,14 @@
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.quarkus.vertx.http.runtime.PortSystemProperties;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
Expand Down Expand Up @@ -588,7 +583,8 @@ private ServerServiceDefinition serviceWithInterceptors(Vertx vertx, GrpcContain
List<String> virtuals = virtualMethodsPerService.get(service.getImplementationClassName());
if (list != null || virtuals != null) {
interceptors
.add(new BlockingServerInterceptor(vertx, list, virtuals, VIRTUAL_EXECUTOR_SUPPLIER.get(), devMode));
.add(new BlockingServerInterceptor(vertx, list, virtuals,
VirtualThreadsRecorder.getCurrent(), devMode));
}
}
return ServerInterceptors.intercept(service.definition, interceptors);
Expand Down Expand Up @@ -728,74 +724,4 @@ public void run(Runnable command) {
}
}

public static final Supplier<Executor> VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<>() {
Executor current = null;

/**
* This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
* change --release, --source, --target flags and to enable previews.
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
* <p>
* IMPORTANT: we still need to use a duplicated context to have all the propagation working.
* Thus, the context is captured and applied/terminated in the virtual thread.
*/
@Override
public Executor get() {
if (current == null) {
try {
var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(this);
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
virtual.execute(command);
} else {
virtual.execute(new Runnable() {
@Override
public void run() {
final var previousContext = ((ContextInternal) context).beginDispatch();
try {
command.run();
} finally {
((ContextInternal) context).endDispatch(previousContext);
}
}
});
}
}
};
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
//quite ugly but works
logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return the traditional executor,
// wrapping executeBlocking.
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(fut -> {
try {
command.run();
fut.complete(null);
} catch (Exception e) {
fut.fail(e);
}
});
}
}
};
}
}
return current;
}
};
}
1 change: 1 addition & 0 deletions extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<modules>
<!-- Netty loom adaptor-->
<module>netty-loom-adaptor</module>
<module>virtual-threads</module>
<!-- Plumbing -->
<module>arc</module>
<module>scheduler</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonp</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
import static io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder.DefaultAuthFailureHandler.extractRootCause;

import java.io.Closeable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -64,9 +61,8 @@
import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig;
import io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder.DefaultAuthFailureHandler;
import io.quarkus.vertx.http.runtime.security.QuarkusHttpUser;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.ext.web.RoutingContext;

@Recorder
Expand All @@ -80,78 +76,6 @@ public Executor get() {
return ExecutorRecorder.getCurrent();
}
};
public static final Supplier<Executor> VIRTUAL_EXECUTOR_SUPPLIER = new Supplier<Executor>() {
Executor current = null;

/**
* This method is used to specify a custom executor to dispatch virtual threads on carrier threads
* We need reflection for both ease of use (see {@link #get() Get} method) but also because we call methods
* of private classes from the java.lang package.
*
* It is used for testing purposes only for now
*/
private Executor setVirtualThreadCustomScheduler(Executor executor) throws ClassNotFoundException,
InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchMethodException {
var vtf = Class.forName("java.lang.ThreadBuilders").getDeclaredClasses()[0];
Constructor constructor = vtf.getDeclaredConstructors()[0];
constructor.setAccessible(true);
ThreadFactory tf = (ThreadFactory) constructor.newInstance(
new Object[] { executor, "quarkus-virtual-factory-", 0, 0,
null });

return (Executor) Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class)
.invoke(this, tf);
}

/**
* This method uses reflection in order to allow developers to quickly test quarkus-loom without needing to
* change --release, --source, --target flags and to enable previews.
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
* <p>
* IMPORTANT: we still need to use a duplicated context to have all the propagation working.
* Thus, the context is captured and applied/terminated in the virtual thread.
*/
@Override
public Executor get() {
if (current == null) {
try {
var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor")
.invoke(this);
current = new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
virtual.execute(command);
} else {
virtual.execute(new Runnable() {
@Override
public void run() {
final var previousContext = ((ContextInternal) context).beginDispatch();
try {
command.run();
} finally {
((ContextInternal) context).endDispatch(previousContext);
}
}
});
}
}
};
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
//quite ugly but works
logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return the traditional EXECUTOR
current = EXECUTOR_SUPPLIER.get();
}
}
return current;
}
};

static volatile Deployment currentDeployment;

Expand Down Expand Up @@ -205,7 +129,7 @@ public ResteasyReactiveRequestContext createContext(Deployment deployment,
}

RuntimeDeploymentManager runtimeDeploymentManager = new RuntimeDeploymentManager(info, EXECUTOR_SUPPLIER,
VIRTUAL_EXECUTOR_SUPPLIER,
VirtualThreadsRecorder::getCurrent,
closeTaskHandler, contextFactory, new ArcThreadSetupAction(beanContainer.requestContext()),
vertxConfig.rootPath);
Deployment deployment = runtimeDeploymentManager.deploy();
Expand Down
4 changes: 4 additions & 0 deletions extensions/smallrye-reactive-messaging/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-virtual-threads-deployment</artifactId>
</dependency>
<dependency>
<groupId>org.commonmark</groupId>
<artifactId>commonmark</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.deployment.metrics.MetricsCapabilityBuildItem;
import io.quarkus.deployment.recording.RecorderContext;
import io.quarkus.gizmo.ClassCreator;
Expand Down Expand Up @@ -114,12 +113,6 @@ AdditionalBeanBuildItem beans() {
QuarkusWorkerPoolRegistry.class);
}

@BuildStep
void nativeRuntimeInitClasses(BuildProducer<RuntimeInitializedClassBuildItem> runtimeInitClasses) {
runtimeInitClasses.produce(new RuntimeInitializedClassBuildItem(
"io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry$VirtualExecutorSupplier"));
}

@BuildStep
AnnotationsTransformerBuildItem transformBeanScope(BeanArchiveIndexBuildItem index,
CustomScopeAnnotationsBuildItem scopes) {
Expand Down
Loading