diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index c7febd24584..6a920143bce 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -482,6 +482,10 @@ pekko { # Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack # like peeking mode which "pop". task-peeking-mode = "FIFO" + + # This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above. + # Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff. + maximum-pool-size = 32767 } # This will be used if you have set "executor = "thread-pool-executor"" diff --git a/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala b/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala new file mode 100644 index 00000000000..8e089d16a34 --- /dev/null +++ b/actor/src/main/scala-jdk-9/org/apache/pekko/dispatch/PekkoJdk9ForkJoinPool.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.dispatch.ForkJoinExecutorConfigurator.PekkoForkJoinTask + +import java.util.concurrent.{ ForkJoinPool, ForkJoinTask, TimeUnit } + +/** + * INTERNAL PEKKO USAGE ONLY + * + * An alternative version of [[ForkJoinExecutorConfigurator.PekkoForkJoinPool]] + * that supports the `maximumPoolSize` feature available in [[java.util.concurrent.ForkJoinPool]] in JDK9+. + */ +@InternalApi +private[dispatch] final class PekkoJdk9ForkJoinPool( + parallelism: Int, + threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + maximumPoolSize: Int, + unhandledExceptionHandler: Thread.UncaughtExceptionHandler, + asyncMode: Boolean) + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode, + 0, maximumPoolSize, 1, null, ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS) + with LoadMetrics { + + override def execute(r: Runnable): Unit = + if (r ne null) + super.execute( + (if (r.isInstanceOf[ForkJoinTask[_]]) r else new PekkoForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]]) + else + throw new NullPointerException("Runnable was null") + + def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() +} diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 4a2e168f0ab..71d2ffc750b 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -13,9 +13,13 @@ package org.apache.pekko.dispatch +import com.typesafe.config.Config + +import java.lang.invoke.{ MethodHandle, MethodHandles, MethodType } import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory } +import scala.util.Try -import com.typesafe.config.Config +import org.apache.pekko.util.JavaVersion object ForkJoinExecutorConfigurator { @@ -84,12 +88,41 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer class ForkJoinExecutorServiceFactory( val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int, - val asyncMode: Boolean) + val asyncMode: Boolean, + val maxPoolSize: Int) extends ExecutorServiceFactory { + + def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + parallelism: Int, + asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap) + + private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] = + Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption + + private lazy val pekkoJdk9ForkJoinPoolHandleOpt: Option[MethodHandle] = { + if (JavaVersion.majorVersion == 8) { + None + } else { + pekkoJdk9ForkJoinPoolClassOpt.map { clz => + val methodHandleLookup = MethodHandles.lookup() + val mt = MethodType.methodType(classOf[Unit], classOf[Int], + classOf[ForkJoinPool.ForkJoinWorkerThreadFactory], + classOf[Int], classOf[Thread.UncaughtExceptionHandler], classOf[Boolean]) + methodHandleLookup.findConstructor(clz, mt) + } + } + } + def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true) - def createExecutorService: ExecutorService = - new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode) + + def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match { + case Some(handle) => + handle.invokeExact(parallelism, threadFactory, maxPoolSize, + MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService] + case _ => + new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode) + } } final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { @@ -115,6 +148,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer config.getInt("parallelism-min"), config.getDouble("parallelism-factor"), config.getInt("parallelism-max")), - asyncMode) + asyncMode, + config.getInt("maximum-pool-size")) } } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinPoolConstants.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinPoolConstants.scala new file mode 100644 index 00000000000..f7235f608d0 --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinPoolConstants.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +private[dispatch] object ForkJoinPoolConstants { + final val MaxCap: Int = 0x7FFF // 32767 + final val DefaultKeepAliveMillis: Long = 60000 +} diff --git a/build.sbt b/build.sbt index 230490a6900..37f9475a22f 100644 --- a/build.sbt +++ b/build.sbt @@ -123,7 +123,7 @@ lazy val actor = pekkoModule("actor") .settings(AddMetaInfLicenseFiles.actorSettings) .settings(VersionGenerator.settings) .settings(serialversionRemoverPluginSettings) - .enablePlugins(BoilerplatePlugin, SbtOsgi) + .enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9) lazy val actorTests = pekkoModule("actor-tests") .dependsOn(testkit % "compile->compile;test->test", actor)