From 99d982714a1d3052bf71b077af99ad0d212252bf Mon Sep 17 00:00:00 2001 From: anavarr Date: Mon, 3 Jul 2023 16:45:52 +0200 Subject: [PATCH] merge changes from Ozan + fixed logging and default executor when newVirtualThreadPerTask can't be used to schedule virtual threads --- .../deployment/console/AeshConsole.java | 4 +- .../QuarkusMediatorConfigurationUtil.java | 17 ++- .../deployment/ReactiveMessagingDotNames.java | 2 + .../SmallRyeReactiveMessagingProcessor.java | 8 +- .../runtime/QuarkusWorkerPoolRegistry.java | 111 +++++++++++++++--- .../SmallRyeReactiveMessagingLifecycle.java | 3 +- .../runtime/WorkerConfiguration.java | 12 +- 7 files changed, 131 insertions(+), 26 deletions(-) diff --git a/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java b/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java index c881b28d12b3fc..0373db65b291b0 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java @@ -6,7 +6,7 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; @@ -56,7 +56,7 @@ public class AeshConsole extends QuarkusConsole { * Because Aesh can log deadlocks are possible on Windows if a write fails, unless care * is taken. */ - private final LinkedBlockingDeque writeQueue = new LinkedBlockingDeque<>(); + private final ConcurrentLinkedDeque writeQueue = new ConcurrentLinkedDeque<>(); private final Lock connectionLock = new ReentrantLock(); private static final ThreadLocal IN_WRITE = new ThreadLocal<>() { @Override diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java index 118b2b7dfb91d5..619794744f8fa5 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java @@ -9,6 +9,7 @@ import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING; +import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS; @@ -33,6 +34,7 @@ import io.quarkus.runtime.configuration.ConfigurationException; import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration; import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor; +import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry; import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo; import io.smallrye.reactive.messaging.Shape; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -178,17 +180,28 @@ public Integer get() { AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING); AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING); AnnotationInstance transactionalAnnotation = methodInfo.annotation(TRANSACTIONAL); - if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null) { + AnnotationInstance runOnVirtualThreadAnnotation = methodInfo.annotation(RUN_ON_VIRTUAL_THREAD); + if (blockingAnnotation != null || smallryeBlockingAnnotation != null || transactionalAnnotation != null + || runOnVirtualThreadAnnotation != null) { mediatorConfigurationSupport.validateBlocking(validationOutput); configuration.setBlocking(true); if (blockingAnnotation != null) { AnnotationValue ordered = blockingAnnotation.value("ordered"); - configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean()); + if (ordered == null && runOnVirtualThreadAnnotation != null) { + configuration.setBlockingExecutionOrdered(false); + } else { + configuration.setBlockingExecutionOrdered(ordered == null || ordered.asBoolean()); + } String poolName; if (blockingAnnotation.value() != null && !(poolName = blockingAnnotation.value().asString()).equals(Blocking.DEFAULT_WORKER_POOL)) { configuration.setWorkerPoolName(poolName); + } else if (runOnVirtualThreadAnnotation != null) { + configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER); } + } else if (runOnVirtualThreadAnnotation != null) { + configuration.setBlockingExecutionOrdered(false); + configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER); } else { configuration.setBlockingExecutionOrdered(true); } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java index 06fa2d2b21c508..384dcd3aee6ba7 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java @@ -10,6 +10,7 @@ import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; import org.jboss.jandex.DotName; +import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.reactive.messaging.MessageConverter; import io.smallrye.reactive.messaging.MutinyEmitter; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -89,6 +90,7 @@ public final class ReactiveMessagingDotNames { .createSimple("io.quarkus.smallrye.reactivemessaging.runtime.kotlin.AbstractSubscribingCoroutineInvoker"); static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional"); + static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName()); private ReactiveMessagingDotNames() { } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index cdef1d9380c9a0..09a1a84807176c 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -2,6 +2,7 @@ import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING; +import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL; @@ -240,6 +241,7 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re BeanInfo bean = mediatorMethod.getBean(); if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING) + || methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD) || methodInfo.hasAnnotation(TRANSACTIONAL)) { // Just in case both annotation are used, use @Blocking value. String poolName = Blocking.DEFAULT_WORKER_POOL; @@ -249,8 +251,12 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re AnnotationInstance blocking = methodInfo.annotation(ReactiveMessagingDotNames.BLOCKING); poolName = blocking.value() == null ? Blocking.DEFAULT_WORKER_POOL : blocking.value().asString(); } + boolean virtualThread = methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD); + if (virtualThread && Blocking.DEFAULT_WORKER_POOL.equals(poolName)) { + poolName = QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER; + } workerConfigurations.add(new WorkerConfiguration(methodInfo.declaringClass().toString(), - methodInfo.name(), poolName)); + methodInfo.name(), poolName, virtualThread)); } try { diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java index 7a596d10ac02fd..78e67c3badaad1 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java @@ -1,10 +1,11 @@ package io.quarkus.smallrye.reactivemessaging.runtime; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; +import java.lang.reflect.InvocationTargetException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Supplier; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -15,6 +16,7 @@ import jakarta.inject.Inject; import org.eclipse.microprofile.config.ConfigProvider; +import org.jboss.logging.Logger; import org.slf4j.LoggerFactory; import io.smallrye.mutiny.Uni; @@ -22,6 +24,8 @@ import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; import io.smallrye.reactive.messaging.providers.helpers.Validation; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.WorkerExecutor; @@ -30,15 +34,62 @@ @ApplicationScoped // TODO: create a different entry for WorkerPoolRegistry than `analyzeWorker` and drop this class public class QuarkusWorkerPoolRegistry extends WorkerPoolRegistry { + + static final Logger logger = Logger.getLogger("io.quarkus"); private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker"; private static final String WORKER_CONCURRENCY = "max-concurrency"; + public static final String DEFAULT_VIRTUAL_THREAD_WORKER = ""; + public static final int DEFAULT_VIRTUAL_THREAD_MAX_CONCURRENCY = 5000; @Inject ExecutionHolder executionHolder; + private final List virtualThreadExecutors = new ArrayList<>(); private final Map workerConcurrency = new HashMap<>(); private final Map workerExecutors = new ConcurrentHashMap<>(); + public static final Supplier VIRTUAL_EXECUTOR_SUPPLIER = new Supplier() { + volatile Executor current = null; + + @Override + public Executor get() { + if (current == null) { + try { + var virtual = (Executor) Executors.class.getMethod("newVirtualThreadPerTaskExecutor") + .invoke(this); + current = new Executor() { + @Override + public void execute(Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + virtual.execute(command); + } else { + virtual.execute(new Runnable() { + @Override + public void run() { + final var previousContext = ((ContextInternal) context).beginDispatch(); + try { + command.run(); + } finally { + ((ContextInternal) context).endDispatch(previousContext); + } + } + }); + } + } + }; + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + logger.warnf("You weren't able to create an executor that spawns virtual threads, the default" + + " blocking executor will be used, please check that your JDK is compatible with " + + "virtual threads"); + //if for some reason a class/method can't be loaded or invoked we return a default blocking executor + current = Executors.newCachedThreadPool(); + } + } + return current; + } + }; + public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(100) @BeforeDestroyed(ApplicationScoped.class) Object event) { if (!workerExecutors.isEmpty()) { @@ -56,23 +107,40 @@ public Uni executeWork(Context currentContext, Uni uni, String workerN return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered); } return executionHolder.vertx().executeBlocking(uni, ordered); + } + if (virtualThreadExecutors.contains(workerName)) { + if (currentContext != null) { + return runOnVirtualThread(currentContext, uni); + } else { + return executionHolder.vertx() + .executeBlocking(Uni.createFrom().deferred(() -> runOnVirtualThread(currentContext, uni))); + } } else { if (currentContext != null) { return getWorker(workerName).executeBlocking(uni, ordered) - .onItemOrFailure().transformToUni((item, failure) -> { - return Uni.createFrom().emitter(emitter -> { - if (failure != null) { - currentContext.runOnContext(() -> emitter.fail(failure)); - } else { - currentContext.runOnContext(() -> emitter.complete(item)); - } - }); - }); + .onItemOrFailure().transformToUni((item, failure) -> Uni.createFrom().emitter(emitter -> { + if (failure != null) { + currentContext.runOnContext(() -> emitter.fail(failure)); + } else { + currentContext.runOnContext(() -> emitter.complete(item)); + } + })); } return getWorker(workerName).executeBlocking(uni, ordered); } } + private Uni runOnVirtualThread(Context currentContext, Uni uni) { + return uni.runSubscriptionOn(command -> VIRTUAL_EXECUTOR_SUPPLIER.get().execute(command)) + .onItemOrFailure().transformToUni((item, failure) -> Uni.createFrom().emitter(emitter -> { + if (failure != null) { + currentContext.runOnContext(() -> emitter.fail(failure)); + } else { + currentContext.runOnContext(() -> emitter.complete(item)); + } + })); + } + public WorkerExecutor getWorker(String workerName) { Objects.requireNonNull(workerName, "Worker Name not specified"); @@ -102,10 +170,10 @@ public WorkerExecutor getWorker(String workerName) { } // Shouldn't get here - throw new IllegalArgumentException("@Blocking referred to invalid worker name."); + throw new IllegalArgumentException("@Blocking referred to invalid worker name. " + workerName); } - public void defineWorker(String className, String method, String poolName) { + public void defineWorker(String className, String method, String poolName, boolean virtualThread) { Objects.requireNonNull(className, "className was empty"); Objects.requireNonNull(method, "Method was empty"); @@ -118,11 +186,16 @@ public void defineWorker(String className, String method, String poolName) { // Validate @Blocking worker pool has configuration to define concurrency String workerConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY; Optional concurrency = ConfigProvider.getConfig().getOptionalValue(workerConfigKey, Integer.class); - if (!concurrency.isPresent()) { - throw getBlockingError(className, method, workerConfigKey + " was not defined"); - } - workerConcurrency.put(poolName, concurrency.get()); + if (virtualThread) { + virtualThreadExecutors.add(poolName); + } else { + if (!concurrency.isPresent()) { + throw getBlockingError(className, method, workerConfigKey + " was not defined"); + } + + workerConcurrency.put(poolName, concurrency.get()); + } } } diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java index 89294f8e41b894..98ab6ec818596a 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java @@ -29,7 +29,8 @@ void onStaticInit(@Observes @Initialized(ApplicationScoped.class) Object event, QuarkusWorkerPoolRegistry workerPoolRegistry) { mediatorManager.addAnalyzed(context.getMediatorConfigurations()); for (WorkerConfiguration worker : context.getWorkerConfigurations()) { - workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName()); + workerPoolRegistry.defineWorker(worker.getClassName(), worker.getMethodName(), worker.getPoolName(), + worker.isVirtualThread()); } for (EmitterConfiguration emitter : context.getEmitterConfigurations()) { mediatorManager.addEmitter(emitter); diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java index 854bb1d09c49f0..148c5609e8dbed 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/WorkerConfiguration.java @@ -8,13 +8,16 @@ public class WorkerConfiguration { private String poolName; + private boolean virtualThread; + public WorkerConfiguration() { } - public WorkerConfiguration(String className, String name, String poolName) { + public WorkerConfiguration(String className, String name, String poolName, boolean virtualThread) { this.className = className; this.methodName = name; this.poolName = poolName; + this.virtualThread = virtualThread; } public String getClassName() { @@ -41,4 +44,11 @@ public void setPoolName(String poolName) { this.poolName = poolName; } + public boolean isVirtualThread() { + return virtualThread; + } + + public void setVirtualThread(boolean virtualThread) { + this.virtualThread = virtualThread; + } }