diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala index a5c25b9ee98..e0423c95f4c 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala @@ -99,8 +99,9 @@ object ActorSystemSpec { override protected[pekko] def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean = { - val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint) + hasSystemMessageHint: Boolean, + needYield: Boolean): Boolean = { + val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint, needYield) doneIt.switchOn { TestKit.awaitCond(mbox.actor.actor != null, 1.second) mbox.actor.actor match { diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index 08afe54e947..b673a0fbfa4 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -69,6 +69,14 @@ private[pekko] trait LoadMetrics { self: Executor => def atFullThrottle(): Boolean } +/** + * INTERNAL API + */ +private[pekko] trait LazyExecuteSupport { + self: Executor => + def lazyExecute(runnable: Runnable): Unit = self.execute(runnable) +} + /** * INTERNAL API */ @@ -160,7 +168,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator */ final def attach(actor: ActorCell): Unit = { register(actor) - registerForExecution(actor.mailbox, false, true) + registerForExecution(actor.mailbox, false, true, false) } /** @@ -288,7 +296,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator protected[pekko] def resume(actor: ActorCell): Unit = { val mbox = actor.mailbox if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume()) - registerForExecution(mbox, false, false) + registerForExecution(mbox, false, false, false) } /** @@ -313,7 +321,8 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator protected[pekko] def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean + hasSystemMessageHint: Boolean, + needYield: Boolean): Boolean // TODO check whether this should not actually be a property of the mailbox /** diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/BalancingDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/BalancingDispatcher.scala index f2292a9e506..1fe599b51f3 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/BalancingDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/BalancingDispatcher.scala @@ -106,7 +106,7 @@ private[pekko] class BalancingDispatcher( override protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - if (!registerForExecution(receiver.mailbox, false, false)) teamWork() + if (!registerForExecution(receiver.mailbox, false, false, false)) teamWork() } protected def teamWork(): Unit = @@ -118,7 +118,7 @@ private[pekko] class BalancingDispatcher( case lm: LoadMetrics => !lm.atFullThrottle() case _ => true }) - && !registerForExecution(i.next.mailbox, false, false)) + && !registerForExecution(i.next.mailbox, false, false, true)) scheduleOne(i) scheduleOne() diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala index 01fe66f0a26..bb4b26ff408 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala @@ -13,14 +13,11 @@ package org.apache.pekko.dispatch -import java.util.concurrent.{ ExecutorService, RejectedExecutionException } +import java.util.concurrent.{ ExecutorService, ForkJoinPool, RejectedExecutionException } import java.util.concurrent.atomic.AtomicReferenceFieldUpdater - import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration - import scala.annotation.nowarn - import org.apache.pekko import pekko.actor.ActorCell import pekko.dispatch.sysmsg.SystemMessage @@ -71,7 +68,7 @@ class Dispatcher( protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { val mbox = receiver.mailbox mbox.enqueue(receiver.self, invocation) - registerForExecution(mbox, true, false) + registerForExecution(mbox, true, false, false) } /** @@ -80,7 +77,7 @@ class Dispatcher( protected[pekko] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = { val mbox = receiver.mailbox mbox.systemEnqueue(receiver.self, invocation) - registerForExecution(mbox, false, true) + registerForExecution(mbox, false, true, false) } /** @@ -130,16 +127,17 @@ class Dispatcher( protected[pekko] override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean = { + hasSystemMessageHint: Boolean, + needYield: Boolean): Boolean = { if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { // This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { - executorService.execute(mbox) + submit(mbox, needYield) true } catch { case _: RejectedExecutionException => try { - executorService.execute(mbox) + submit(mbox, needYield) true } catch { // Retry once case e: RejectedExecutionException => @@ -152,6 +150,15 @@ class Dispatcher( } else false } + @inline + private def submit(mbox: Mailbox, needYield: Boolean): Unit = { + if (needYield && executorService.executor.isInstanceOf[LazyExecuteSupport]) { + executorService.executor.asInstanceOf[LazyExecuteSupport].lazyExecute(mbox) + } else { + executorService.execute(mbox) + } + } + override val toString: String = Logging.simpleName(this) + "[" + id + "]" } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 4a2e168f0ab..4745ef8299b 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -14,9 +14,10 @@ package org.apache.pekko.dispatch import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory } - import com.typesafe.config.Config +import java.lang.invoke.{ MethodHandle, MethodHandles, MethodType } + object ForkJoinExecutorConfigurator { /** @@ -28,7 +29,7 @@ object ForkJoinExecutorConfigurator { unhandledExceptionHandler: Thread.UncaughtExceptionHandler, asyncMode: Boolean) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) - with LoadMetrics { + with LoadMetrics with LazyExecuteSupport { def this( parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, @@ -42,6 +43,33 @@ object ForkJoinExecutorConfigurator { else throw new NullPointerException("Runnable was null") + private val lazyExecuteHandle: MethodHandle = { + import org.apache.pekko.util.JavaVersion._ + if (11 <= majorVersion && majorVersion <= 18) { + val method = classOf[ForkJoinPool].getDeclaredMethod("externalPush", classOf[ForkJoinTask[_]]) + method.setAccessible(true) + MethodHandles.lookup().unreflect(method) + } else if (majorVersion >= 20) { + val mt = MethodType.methodType(classOf[ForkJoinTask[_]], classOf[ForkJoinTask[_]]) + MethodHandles.publicLookup() + .findVirtual(classOf[ForkJoinPool], "externalSubmit", mt) + } else null + } + + override def lazyExecute(r: Runnable): Unit = { + import org.apache.pekko.util.JavaVersion._ + if (majorVersion < 11 || majorVersion == 19 || lazyExecuteHandle == null) { + super.execute(r) + } else { + val task: ForkJoinTask[_] = if (r ne null) + if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[_]] else new PekkoForkJoinTask(r) + else + throw new NullPointerException("Runnable was null") + lazyExecuteHandle.invoke(task) + } + + } + def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala index 088a0a33e64..c839abd6f00 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Mailbox.scala @@ -242,7 +242,7 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue) } } finally { setAsIdle() // Volatile write, needed here - dispatcher.registerForExecution(this, false, false) + dispatcher.registerForExecution(this, false, false, true) } } diff --git a/project/JdkOptions.scala b/project/JdkOptions.scala index bc452c6394c..02ef7a6a612 100644 --- a/project/JdkOptions.scala +++ b/project/JdkOptions.scala @@ -38,6 +38,8 @@ object JdkOptions extends AutoPlugin { if (isJdk17orHigher) { // for aeron "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" :: + // for fork join pool + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" :: // for LevelDB "--add-opens=java.base/java.nio=ALL-UNNAMED" :: Nil } else Nil diff --git a/testkit/src/main/scala/org/apache/pekko/testkit/CallingThreadDispatcher.scala b/testkit/src/main/scala/org/apache/pekko/testkit/CallingThreadDispatcher.scala index 9eeff02b14b..f685216b6bf 100644 --- a/testkit/src/main/scala/org/apache/pekko/testkit/CallingThreadDispatcher.scala +++ b/testkit/src/main/scala/org/apache/pekko/testkit/CallingThreadDispatcher.scala @@ -177,7 +177,8 @@ class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) exte protected[pekko] override def registerForExecution( mbox: Mailbox, hasMessageHint: Boolean, - hasSystemMessageHint: Boolean): Boolean = false + hasSystemMessageHint: Boolean, + needYield: Boolean): Boolean = false protected[pekko] override def shutdownTimeout = 1 second