Skip to content

Commit

Permalink
Use scala ForkJoinPool instead (in 2.12 this just wraps Java but in 2…
Browse files Browse the repository at this point in the history
….11 necessary for correct concurrency report)
  • Loading branch information
holdenk committed Mar 3, 2016
1 parent 28d7d38 commit c611178
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.util
import java.util.concurrent._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
Expand Down Expand Up @@ -158,17 +159,17 @@ private[spark] object ThreadUtils {
}

/**
* Construct a new Java ForkJoinPool with a specified max parallelism and name prefix.
* Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
*/
def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
// Custom factory to set thread names
val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
override def newThread(pool: ForkJoinPool) =
new ForkJoinWorkerThread(pool) {
val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
override def newThread(pool: SForkJoinPool) =
new SForkJoinWorkerThread(pool) {
setName(prefix + "-" + super.getName)
}
}
new ForkJoinPool(maxThreadNumber, factory,
new SForkJoinPool(maxThreadNumber, factory,
null, // handler
false // asyncMode
)
Expand Down

0 comments on commit c611178

Please sign in to comment.