Skip to content

Commit

Permalink
=act Make use of externalSubmit to yield.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 20, 2023
1 parent e94e7b9 commit 6017149
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ object ActorSystemSpec {
override protected[pekko] def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = {
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint)
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean = {
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint, needYield)
doneIt.switchOn {
TestKit.awaitCond(mbox.actor.actor != null, 1.second)
mbox.actor.actor match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ private[pekko] trait LoadMetrics { self: Executor =>
def atFullThrottle(): Boolean
}

/**
* INTERNAL API
*/
private[pekko] trait LazyExecuteSupport {
self: Executor =>
def lazyExecute(runnable: Runnable): Unit = self.execute(runnable)
}

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -160,7 +168,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
*/
final def attach(actor: ActorCell): Unit = {
register(actor)
registerForExecution(actor.mailbox, false, true)
registerForExecution(actor.mailbox, false, true, false)
}

/**
Expand Down Expand Up @@ -288,7 +296,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
protected[pekko] def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
registerForExecution(mbox, false, false)
registerForExecution(mbox, false, false, false)
}

/**
Expand All @@ -313,7 +321,8 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
protected[pekko] def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean

// TODO check whether this should not actually be a property of the mailbox
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[pekko] class BalancingDispatcher(

override protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
if (!registerForExecution(receiver.mailbox, false, false)) teamWork()
if (!registerForExecution(receiver.mailbox, false, false, false)) teamWork()
}

protected def teamWork(): Unit =
Expand All @@ -118,7 +118,7 @@ private[pekko] class BalancingDispatcher(
case lm: LoadMetrics => !lm.atFullThrottle()
case _ => true
})
&& !registerForExecution(i.next.mailbox, false, false))
&& !registerForExecution(i.next.mailbox, false, false, true))
scheduleOne(i)

scheduleOne()
Expand Down
25 changes: 16 additions & 9 deletions actor/src/main/scala/org/apache/pekko/dispatch/Dispatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@

package org.apache.pekko.dispatch

import java.util.concurrent.{ ExecutorService, RejectedExecutionException }
import java.util.concurrent.{ ExecutorService, ForkJoinPool, RejectedExecutionException }
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import scala.annotation.nowarn

import org.apache.pekko
import pekko.actor.ActorCell
import pekko.dispatch.sysmsg.SystemMessage
Expand Down Expand Up @@ -71,7 +68,7 @@ class Dispatcher(
protected[pekko] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
registerForExecution(mbox, true, false, false)
}

/**
Expand All @@ -80,7 +77,7 @@ class Dispatcher(
protected[pekko] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
val mbox = receiver.mailbox
mbox.systemEnqueue(receiver.self, invocation)
registerForExecution(mbox, false, true)
registerForExecution(mbox, false, true, false)
}

/**
Expand Down Expand Up @@ -130,16 +127,17 @@ class Dispatcher(
protected[pekko] override def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = {
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { // This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService.execute(mbox)
submit(mbox, needYield)
true
} catch {
case _: RejectedExecutionException =>
try {
executorService.execute(mbox)
submit(mbox, needYield)
true
} catch { // Retry once
case e: RejectedExecutionException =>
Expand All @@ -152,6 +150,15 @@ class Dispatcher(
} else false
}

@inline
private def submit(mbox: Mailbox, needYield: Boolean): Unit = {
if (needYield && executorService.executor.isInstanceOf[LazyExecuteSupport]) {
executorService.executor.asInstanceOf[LazyExecuteSupport].lazyExecute(mbox)
} else {
executorService.execute(mbox)
}
}

override val toString: String = Logging.simpleName(this) + "[" + id + "]"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
package org.apache.pekko.dispatch

import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory }

import com.typesafe.config.Config

import java.lang.invoke.{ MethodHandle, MethodHandles, MethodType }

object ForkJoinExecutorConfigurator {

/**
Expand All @@ -28,7 +29,7 @@ object ForkJoinExecutorConfigurator {
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
asyncMode: Boolean)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode)
with LoadMetrics {
with LoadMetrics with LazyExecuteSupport {
def this(
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
Expand All @@ -42,6 +43,33 @@ object ForkJoinExecutorConfigurator {
else
throw new NullPointerException("Runnable was null")

private val lazyExecuteHandle: MethodHandle = {
import org.apache.pekko.util.JavaVersion._
if (11 <= majorVersion && majorVersion <= 18) {
val method = classOf[ForkJoinPool].getDeclaredMethod("externalPush", classOf[ForkJoinTask[_]])
method.setAccessible(true)
MethodHandles.lookup().unreflect(method)
} else if (majorVersion >= 20) {
val mt = MethodType.methodType(classOf[ForkJoinTask[_]], classOf[ForkJoinTask[_]])
MethodHandles.publicLookup()
.findVirtual(classOf[ForkJoinPool], "externalSubmit", mt)
} else null
}

override def lazyExecute(r: Runnable): Unit = {
import org.apache.pekko.util.JavaVersion._
if (majorVersion < 11 || majorVersion == 19 || lazyExecuteHandle == null) {
super.execute(r)
} else {
val task: ForkJoinTask[_] = if (r ne null)
if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[_]] else new PekkoForkJoinTask(r)
else
throw new NullPointerException("Runnable was null")
lazyExecuteHandle.invoke(task)
}

}

def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue)
}
} finally {
setAsIdle() // Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
dispatcher.registerForExecution(this, false, false, true)
}
}

Expand Down
2 changes: 2 additions & 0 deletions project/JdkOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ object JdkOptions extends AutoPlugin {
if (isJdk17orHigher) {
// for aeron
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" ::
// for fork join pool
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" ::
// for LevelDB
"--add-opens=java.base/java.nio=ALL-UNNAMED" :: Nil
} else Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) exte
protected[pekko] override def registerForExecution(
mbox: Mailbox,
hasMessageHint: Boolean,
hasSystemMessageHint: Boolean): Boolean = false
hasSystemMessageHint: Boolean,
needYield: Boolean): Boolean = false

protected[pekko] override def shutdownTimeout = 1 second

Expand Down

0 comments on commit 6017149

Please sign in to comment.