From 6da47ce08a7a2ee809d7101d58fcd730170056da Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 27 Apr 2024 17:49:18 +0800 Subject: [PATCH] feat: Add virtual thread support --- actor/src/main/resources/reference.conf | 14 ++++ .../pekko/dispatch/AbstractDispatcher.scala | 40 +++++++++- .../pekko/dispatch/VirtualThreadSupport.scala | 75 +++++++++++++++++++ 3 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 9459cda1418..7ef70b85606 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -376,6 +376,7 @@ pekko { # Valid options: # - "default-executor" requires a "default-executor" section # - "fork-join-executor" requires a "fork-join-executor" section + # - "virtual-thread-executor" requires a "virtual-thread-executor" section # - "thread-pool-executor" requires a "thread-pool-executor" section # - "affinity-pool-executor" requires an "affinity-pool-executor" section # - A FQCN of a class extending ExecutorServiceConfigurator @@ -539,6 +540,19 @@ pekko { allow-core-timeout = on } + # This will be used if you have set "executor = "virtual-thread-executor" + # This executor will execute the every task on a new virtual thread. + # Underlying thread pool implementation is java.util.concurrent.ForkJoinPool for JDK <= 22 + # If the current runtime does not support virtual thread, + # then the executor configured in "fallback" will be used. + virtual-thread-executor { + #Please set the the underlying pool with system properties below: + #jdk.virtualThreadScheduler.parallelism + #jdk.virtualThreadScheduler.maxPoolSize + #jdk.virtualThreadScheduler.minRunnable + #jdk.unparker.maxPoolSize + fallback = "fork-join-executor" + } # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 1s diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index f4fb6a73c48..f163f01960d 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -16,14 +16,11 @@ package org.apache.pekko.dispatch import java.{ util => ju } import java.util.concurrent._ -import scala.annotation.tailrec +import scala.annotation.{ nowarn, tailrec } import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.util.control.NonFatal -import scala.annotation.nowarn -import com.typesafe.config.Config - import org.apache.pekko import pekko.actor._ import pekko.annotation.InternalStableApi @@ -33,6 +30,8 @@ import pekko.event.EventStream import pekko.event.Logging.{ Debug, Error, LogEventException } import pekko.util.{ unused, Index, Unsafe } +import com.typesafe.config.Config + final case class Envelope private (message: Any, sender: ActorRef) { def copy(message: Any = message, sender: ActorRef = sender) = { @@ -367,9 +366,16 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: def dispatcher(): MessageDispatcher def configureExecutor(): ExecutorServiceConfigurator = { + @tailrec def configurator(executor: String): ExecutorServiceConfigurator = executor match { case null | "" | "fork-join-executor" => new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) + case "virtual-thread-executor" => + if (VirtualThreadSupport.isSupported) { + new VirtualThreadExecutorConfigurator(config.getConfig("virtual-thread-executor"), prerequisites) + } else { + configurator(config.getString("virtual-thread-executor.fallback")) + } case "thread-pool-executor" => new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case "affinity-pool-executor" => @@ -401,6 +407,32 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: } } +final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ExecutorServiceConfigurator(config, prerequisites) { + + override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + import VirtualThreadSupport._ + val tf: ThreadFactory = threadFactory match { + case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => + new ThreadFactory { + private val vtFactory = newVirtualThreadFactory(name) + + override def newThread(r: Runnable): Thread = { + val vt = vtFactory.newThread(r) + vt.setUncaughtExceptionHandler(exceptionHandler) + contextClassLoader.foreach(vt.setContextClassLoader) + vt + } + } + case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name); + } + new ExecutorServiceFactory { + import VirtualThreadSupport._ + override def createExecutorService: ExecutorService = newThreadPerTaskExecutor(tf) + } + } +} + class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala new file mode 100644 index 00000000000..ae9471e59bd --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.util.JavaVersion + +import java.lang.invoke.{ MethodHandles, MethodType } +import java.util.concurrent.{ ExecutorService, ThreadFactory } +import scala.util.control.NonFatal + +@InternalApi +private[dispatch] object VirtualThreadSupport { + private val lookup = MethodHandles.lookup + + /** + * Is virtual thread supported + */ + val isSupported: Boolean = JavaVersion.majorVersion >= 21 + + /** + * Create a virtual thread factory with a executor, the executor will be used as the scheduler of + * virtual thread. + */ + def newVirtualThreadFactory(prefix: String): ThreadFactory = { + require(isSupported, "Virtual thread is not supported.") + try { + val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder") + val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual") + val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass)) + var builder = ofVirtualMethod.invoke() + val nameMethod = lookup.findVirtual(ofVirtualClass, "name", + MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long])) + // TODO support replace scheduler when we drop Java 8 support + val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory])) + builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L) + factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] + } catch { + case NonFatal(e) => + // --add-opens java.base/java.lang=ALL-UNNAMED + throw new UnsupportedOperationException("Failed to create virtual thread factory", e) + } + } + + def newThreadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = { + require(threadFactory != null, "threadFactory should not be null.") + try { + val executorsClazz = ClassLoader.getSystemClassLoader.loadClass("java.util.concurrent.Executors") + val newThreadPerTaskExecutorMethod = lookup.findStatic( + executorsClazz, + "newThreadPerTaskExecutor", + MethodType.methodType(classOf[ExecutorService], classOf[ThreadFactory])) + newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService] + } catch { + case NonFatal(e) => + throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e) + } + } + +}