Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support jdk9 forkjoinpool maximum-pool-size #485

Merged
merged 2 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ pekko {
# Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack
# like peeking mode which "pop".
task-peeking-mode = "FIFO"

# This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above.
# Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff.
maximum-pool-size = 32767
He-Pin marked this conversation as resolved.
Show resolved Hide resolved
}

# This will be used if you have set "executor = "thread-pool-executor""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ForkJoinExecutorConfigurator.PekkoForkJoinTask

import java.util.concurrent.{ ForkJoinPool, ForkJoinTask, TimeUnit }

/**
* INTERNAL PEKKO USAGE ONLY
*
* An alternative version of [[ForkJoinExecutorConfigurator.PekkoForkJoinPool]]
* that supports the `maximumPoolSize` feature available in [[java.util.concurrent.ForkJoinPool]] in JDK9+.
*/
@InternalApi
private[dispatch] final class PekkoJdk9ForkJoinPool(
pjfanning marked this conversation as resolved.
Show resolved Hide resolved
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
maximumPoolSize: Int,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
asyncMode: Boolean)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode,
0, maximumPoolSize, 1, null, ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
with LoadMetrics {

override def execute(r: Runnable): Unit =
if (r ne null)
super.execute(
(if (r.isInstanceOf[ForkJoinTask[_]]) r else new PekkoForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]])
else
throw new NullPointerException("Runnable was null")

def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@

package org.apache.pekko.dispatch

import com.typesafe.config.Config

import java.lang.invoke.{ MethodHandle, MethodHandles, MethodType }
import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory }
import scala.util.Try

import com.typesafe.config.Config
import org.apache.pekko.util.JavaVersion

object ForkJoinExecutorConfigurator {

Expand Down Expand Up @@ -84,12 +88,41 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
class ForkJoinExecutorServiceFactory(
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean)
val asyncMode: Boolean,
val maxPoolSize: Int)
extends ExecutorServiceFactory {

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)

private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption
Copy link
Member

@He-Pin He-Pin Jul 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a util to read the current jdk version instead, try pekko.util.JavaVersion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of the way Pekko builds the Java9+ classes, even the unit tests seem not to have this class available. It does eventually get built and appears in the jar.

Copy link
Member

@He-Pin He-Pin Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private lazy val pekkoJdk9ForkJoinPoolHandleOpt: Option[MethodHandle] = {
     if (JavaVersion.majorVersion >= 9) {
      pekkoJdk9ForkJoinPoolClassOpt.map { clz =>
        val methodHandleLookup = MethodHandles.lookup()
        val mt = MethodType.methodType(classOf[Unit], classOf[Int],
          classOf[ForkJoinPool.ForkJoinWorkerThreadFactory],
          classOf[Int], classOf[Thread.UncaughtExceptionHandler], classOf[Boolean])
        methodHandleLookup.findConstructor(clz, mt)
      }} else None
    }

how about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try this yourself? I explained above that the build is weird - adding the java 9+ classes very late. After the unit tests run. Meaning we need to not rely on the classes being there. This change will fail multiple tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this again on my phone - it will probably be ok. Looks the same as existing code with an additional jdk version check. The existing code works - it just relies n checking if a class loads ok.. In the full lifetime of the JVM we will try to class load once.. Java 8 users who only ever create one dispatcher will see a small perf boost. Everyone else will see a perf decrease because of the extra if check on every dispatcher create call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the java-version check because it happens inside the lazy val calculation - see we won't check this over and over.


private lazy val pekkoJdk9ForkJoinPoolHandleOpt: Option[MethodHandle] = {
if (JavaVersion.majorVersion == 8) {
None
} else {
pekkoJdk9ForkJoinPoolClassOpt.map { clz =>
val methodHandleLookup = MethodHandles.lookup()
val mt = MethodType.methodType(classOf[Unit], classOf[Int],
classOf[ForkJoinPool.ForkJoinWorkerThreadFactory],
classOf[Int], classOf[Thread.UncaughtExceptionHandler], classOf[Boolean])
methodHandleLookup.findConstructor(clz, mt)
}
}
}

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService =
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)

def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
case Some(handle) =>
handle.invokeExact(parallelism, threadFactory, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
case _ =>
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
}
}

final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
Expand All @@ -115,6 +148,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode)
asyncMode,
config.getInt("maximum-pool-size"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

private[dispatch] object ForkJoinPoolConstants {
final val MaxCap: Int = 0x7FFF // 32767
final val DefaultKeepAliveMillis: Long = 60000
}
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ lazy val actor = pekkoModule("actor")
.settings(AddMetaInfLicenseFiles.actorSettings)
.settings(VersionGenerator.settings)
.settings(serialversionRemoverPluginSettings)
.enablePlugins(BoilerplatePlugin, SbtOsgi)
.enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9)

lazy val actorTests = pekkoModule("actor-tests")
.dependsOn(testkit % "compile->compile;test->test", actor)
Expand Down
Loading