diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index a704a8a7d98f2..9abbf4a7a3971 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import java.util.concurrent._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.util.control.NonFatal import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} @@ -158,17 +159,17 @@ private[spark] object ThreadUtils { } /** - * Construct a new Java ForkJoinPool with a specified max parallelism and name prefix. + * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix. */ - def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = { + def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = { // Custom factory to set thread names - val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory { - override def newThread(pool: ForkJoinPool) = - new ForkJoinWorkerThread(pool) { + val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory { + override def newThread(pool: SForkJoinPool) = + new SForkJoinWorkerThread(pool) { setName(prefix + "-" + super.getName) } } - new ForkJoinPool(maxThreadNumber, factory, + new SForkJoinPool(maxThreadNumber, factory, null, // handler false // asyncMode )