Skip to content

Commit

Permalink
Add a config parameter to disable virtual thread support.
Browse files Browse the repository at this point in the history
In this case, methods annotated with @RunOnVirtualThread are executed on worker threads.
  • Loading branch information
cescoffier committed Sep 5, 2023
1 parent 68af440 commit cbd8f8a
Show file tree
Hide file tree
Showing 16 changed files with 622 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/virtual-threads-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{
"category": "Main",
"timeout": 50,
"test-modules": "grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads, scheduler-virtual-threads, quartz-virtual-threads",
"test-modules": "virtual-threads-disabled, grpc-virtual-threads, mailer-virtual-threads, redis-virtual-threads, rest-client-reactive-virtual-threads, resteasy-reactive-virtual-threads, vertx-event-bus-virtual-threads, scheduler-virtual-threads, quartz-virtual-threads",
"os-name": "ubuntu-latest"
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,14 @@ public class VirtualThreadsConfig {
*/
@ConfigItem(defaultValue = "5s")
public Optional<Duration> shutdownCheckInterval;

/**
* A flag to explicitly disabled virtual threads, even if the JVM support them.
* In this case, methods annotated with {@code @RunOnVirtualThread} are executed on the worker thread pool.
* <p>
* This flag is intended to be used when running with virtual threads become more expensive than plain worker threads,
* because of pinning, monopolization or thread-based object pool.
*/
@ConfigItem(defaultValue = "true")
public boolean enabled;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,52 +31,54 @@ public class VirtualThreadsRecorder {

public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) {
config = c;
if (launchMode == LaunchMode.DEVELOPMENT) {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdownNow();
if (config.enabled) {
if (launchMode == LaunchMode.DEVELOPMENT) {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdownNow();
}
current = null;
}
current = null;
}
});
} else {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
current = null;
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
service.shutdown();

final long timeout = config.shutdownTimeout.toNanos();
final long interval = config.shutdownCheckInterval.orElse(config.shutdownTimeout).toNanos();

long start = System.nanoTime();
int loop = 1;
long elapsed = 0;
for (;;) {
// This log can be very useful when debugging problems
logger.debugf("Await termination loop: %s, remaining: %s", loop++, timeout - elapsed);
try {
if (!service.awaitTermination(Math.min(timeout, interval), NANOSECONDS)) {
elapsed = System.nanoTime() - start;
if (elapsed >= timeout) {
service.shutdownNow();
break;
});
} else {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
current = null;
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
service.shutdown();

final long timeout = config.shutdownTimeout.toNanos();
final long interval = config.shutdownCheckInterval.orElse(config.shutdownTimeout).toNanos();

long start = System.nanoTime();
int loop = 1;
long elapsed = 0;
for (;;) {
// This log can be very useful when debugging problems
logger.debugf("Await termination loop: %s, remaining: %s", loop++, timeout - elapsed);
try {
if (!service.awaitTermination(Math.min(timeout, interval), NANOSECONDS)) {
elapsed = System.nanoTime() - start;
if (elapsed >= timeout) {
service.shutdownNow();
break;
}
} else {
return;
}
} else {
return;
} catch (InterruptedException ignored) {
}
} catch (InterruptedException ignored) {
}
}
}
}
});
});
}
}
}

Expand Down Expand Up @@ -133,35 +135,37 @@ static ExecutorService newVirtualThreadExecutor()
* using java 11 and executed with a loom-compliant JDK.
*/
private static Executor createExecutor() {
try {
return new ContextPreservingExecutorService(newVirtualThreadExecutor());
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
//quite ugly but works
logger.warn("You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return the traditional executor,
// wrapping executeBlocking.
return new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(fut -> {
try {
command.run();
fut.complete(null);
} catch (Exception e) {
fut.fail(e);
}
}, false);
}
}
};
if (config.enabled) {
try {
return new ContextPreservingExecutorService(newVirtualThreadExecutor());
} catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
logger.debug("Unable to invoke java.util.concurrent.Executors#newVirtualThreadPerTaskExecutor", e);
//quite ugly but works
logger.warn("You weren't able to create an executor that spawns virtual threads, the default" +
" blocking executor will be used, please check that your JDK is compatible with " +
"virtual threads");
//if for some reason a class/method can't be loaded or invoked we return the traditional executor,
// wrapping executeBlocking.
}
}
// Fallback to regular worker threads
return new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(fut -> {
try {
command.run();
fut.complete(null);
} catch (Exception e) {
fut.fail(e);
}
}, false);
}
}
};
}

}
1 change: 1 addition & 0 deletions integration-tests/virtual-threads/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<module>vertx-event-bus-virtual-threads</module>
<module>scheduler-virtual-threads</module>
<module>quartz-virtual-threads</module>
<module>virtual-threads-disabled</module>
</modules>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ void testPost() {
.body(is(body2 + "-1"));
}

@Test
void testFilter() {
// Request scope
// Routing Context
// Duplicated context

// MDC
}

@Test
void testNonBlocking() {
// Non Blocking
Expand Down
72 changes: 72 additions & 0 deletions integration-tests/virtual-threads/virtual-threads-disabled/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>quarkus-virtual-threads-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-integration-test-virtual-threads-disabled</artifactId>
<name>Quarkus - Integration Tests - Virtual Threads - Disabled</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.quarkus.virtual.disabled;

import java.lang.reflect.Method;

import io.quarkus.arc.Arc;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

public class AssertHelper {

/**
* Asserts that the current method:
* - runs on a duplicated context
* - runs on a virtual thread
* - has the request scope activated
*/
public static void assertEverything() {
assertThatTheRequestScopeIsActive();
assertThatItRunsOnVirtualThread();
assertThatItRunsOnADuplicatedContext();
}

public static void assertWorkerOrEventLoopThread() {
assertThatTheRequestScopeIsActive();
assertThatItRunsOnADuplicatedContext();
assertNotOnVirtualThread();
}

public static void assertThatTheRequestScopeIsActive() {
if (!Arc.container().requestContext().isActive()) {
throw new AssertionError(("Expected the request scope to be active"));
}
}

public static void assertThatItRunsOnADuplicatedContext() {
var context = Vertx.currentContext();
if (context == null) {
throw new AssertionError("The method does not run on a Vert.x context");
}
if (!VertxContext.isOnDuplicatedContext()) {
throw new AssertionError("The method does not run on a Vert.x **duplicated** context");
}
}

public static void assertThatItRunsOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (!virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread");
}
} catch (Exception e) {
throw new AssertionError(
"Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e);
}
}

public static void assertNotOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread");
}
} catch (Exception e) {
// Trying using Thread name.
var name = Thread.currentThread().toString();
if (name.toLowerCase().contains("virtual")) {
throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.virtual.disabled;

import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.RequestScoped;

@RequestScoped
public class Counter {

private final AtomicInteger counter = new AtomicInteger();

public int increment() {
return counter.incrementAndGet();
}

}
Loading

0 comments on commit cbd8f8a

Please sign in to comment.