diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala new file mode 100644 index 00000000000..739bf26a5e3 --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import com.typesafe.config.ConfigFactory + +import org.apache.pekko +import pekko.actor.{ Actor, Props } +import pekko.testkit.{ ImplicitSender, PekkoSpec } + +object VirtualThreadPoolDispatcherSpec { + val config = ConfigFactory.parseString(""" + |virtual-thread-dispatcher { + | executor = virtual-thread-executor + |} + """.stripMargin) + + class InnocentActor extends Actor { + + override def receive = { + case "ping" => + sender() ! "All fine" + } + } + +} + +class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender { + import VirtualThreadPoolDispatcherSpec._ + + val Iterations = 1000 + + "VirtualThreadPool support" must { + + "handle simple dispatch" in { + val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher")) + innocentActor ! "ping" + expectMsg("All fine") + } + + } +} diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 9459cda1418..7ef70b85606 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -376,6 +376,7 @@ pekko { # Valid options: # - "default-executor" requires a "default-executor" section # - "fork-join-executor" requires a "fork-join-executor" section + # - "virtual-thread-executor" requires a "virtual-thread-executor" section # - "thread-pool-executor" requires a "thread-pool-executor" section # - "affinity-pool-executor" requires an "affinity-pool-executor" section # - A FQCN of a class extending ExecutorServiceConfigurator @@ -539,6 +540,19 @@ pekko { allow-core-timeout = on } + # This will be used if you have set "executor = "virtual-thread-executor" + # This executor will execute the every task on a new virtual thread. + # Underlying thread pool implementation is java.util.concurrent.ForkJoinPool for JDK <= 22 + # If the current runtime does not support virtual thread, + # then the executor configured in "fallback" will be used. + virtual-thread-executor { + #Please set the the underlying pool with system properties below: + #jdk.virtualThreadScheduler.parallelism + #jdk.virtualThreadScheduler.maxPoolSize + #jdk.virtualThreadScheduler.minRunnable + #jdk.unparker.maxPoolSize + fallback = "fork-join-executor" + } # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 1s diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index f4fb6a73c48..f163f01960d 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -16,14 +16,11 @@ package org.apache.pekko.dispatch import java.{ util => ju } import java.util.concurrent._ -import scala.annotation.tailrec +import scala.annotation.{ nowarn, tailrec } import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.util.control.NonFatal -import scala.annotation.nowarn -import com.typesafe.config.Config - import org.apache.pekko import pekko.actor._ import pekko.annotation.InternalStableApi @@ -33,6 +30,8 @@ import pekko.event.EventStream import pekko.event.Logging.{ Debug, Error, LogEventException } import pekko.util.{ unused, Index, Unsafe } +import com.typesafe.config.Config + final case class Envelope private (message: Any, sender: ActorRef) { def copy(message: Any = message, sender: ActorRef = sender) = { @@ -367,9 +366,16 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: def dispatcher(): MessageDispatcher def configureExecutor(): ExecutorServiceConfigurator = { + @tailrec def configurator(executor: String): ExecutorServiceConfigurator = executor match { case null | "" | "fork-join-executor" => new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) + case "virtual-thread-executor" => + if (VirtualThreadSupport.isSupported) { + new VirtualThreadExecutorConfigurator(config.getConfig("virtual-thread-executor"), prerequisites) + } else { + configurator(config.getString("virtual-thread-executor.fallback")) + } case "thread-pool-executor" => new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case "affinity-pool-executor" => @@ -401,6 +407,32 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: } } +final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends ExecutorServiceConfigurator(config, prerequisites) { + + override def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { + import VirtualThreadSupport._ + val tf: ThreadFactory = threadFactory match { + case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => + new ThreadFactory { + private val vtFactory = newVirtualThreadFactory(name) + + override def newThread(r: Runnable): Thread = { + val vt = vtFactory.newThread(r) + vt.setUncaughtExceptionHandler(exceptionHandler) + contextClassLoader.foreach(vt.setContextClassLoader) + vt + } + } + case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name); + } + new ExecutorServiceFactory { + import VirtualThreadSupport._ + override def createExecutorService: ExecutorService = newThreadPerTaskExecutor(tf) + } + } +} + class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala new file mode 100644 index 00000000000..3a777891559 --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import org.apache.pekko.annotation.InternalApi +import org.apache.pekko.util.JavaVersion + +import java.lang.invoke.{ MethodHandles, MethodType } +import java.util.concurrent.{ ExecutorService, ThreadFactory } +import scala.util.control.NonFatal + +@InternalApi +private[dispatch] object VirtualThreadSupport { + private val lookup = MethodHandles.publicLookup() + + /** + * Is virtual thread supported + */ + val isSupported: Boolean = JavaVersion.majorVersion >= 21 + + /** + * Create a virtual thread factory with a executor, the executor will be used as the scheduler of + * virtual thread. + */ + def newVirtualThreadFactory(prefix: String): ThreadFactory = { + require(isSupported, "Virtual thread is not supported.") + try { + val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder") + val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual") + val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass)) + var builder = ofVirtualMethod.invoke() + val nameMethod = lookup.findVirtual(ofVirtualClass, "name", + MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long])) + // TODO support replace scheduler when we drop Java 8 support + val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory])) + builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L) + factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] + } catch { + case NonFatal(e) => + // --add-opens java.base/java.lang=ALL-UNNAMED + throw new UnsupportedOperationException("Failed to create virtual thread factory", e) + } + } + + def newThreadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = { + require(threadFactory != null, "threadFactory should not be null.") + try { + val executorsClazz = ClassLoader.getSystemClassLoader.loadClass("java.util.concurrent.Executors") + val newThreadPerTaskExecutorMethod = lookup.findStatic( + executorsClazz, + "newThreadPerTaskExecutor", + MethodType.methodType(classOf[ExecutorService], classOf[ThreadFactory])) + newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService] + } catch { + case NonFatal(e) => + throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e) + } + } + +} diff --git a/build.sbt b/build.sbt index a57f790104d..d93f55ee67c 100644 --- a/build.sbt +++ b/build.sbt @@ -124,9 +124,10 @@ lazy val actor = pekkoModule("actor") .enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9) lazy val actorTests = pekkoModule("actor-tests") + .configs(Jdk9.TestJdk9) .dependsOn(testkit % "compile->compile;test->test", actor) .settings(Dependencies.actorTests) - .enablePlugins(NoPublish) + .enablePlugins(NoPublish, Jdk9) .disablePlugins(MimaPlugin) lazy val pekkoScalaNightly = pekkoModule("scala-nightly")