Skip to content

Commit

Permalink
Exclude certain local properties from being inherited
Browse files Browse the repository at this point in the history
such as, cough cough, the SQL execution ID. This was a problem
because scala's parallel collections spawns threads as children
of the existing threads, causing the execution ID to be inherited
when it shouldn't be.
  • Loading branch information
Andrew Or committed Sep 10, 2015
1 parent 4204757 commit 8ceae42
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.util.UUID.randomUUID
import scala.collection.JavaConverters._
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, HashSet}
import scala.reflect.{ClassTag, classTag}
import scala.util.control.NonFatal

Expand Down Expand Up @@ -348,10 +348,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
override protected def childValue(parent: Properties): Properties = {
val p = new Properties(parent)
nonInheritedLocalProperties.foreach(p.remove)
p
}
override protected def initialValue(): Properties = new Properties()
}

/**
* Keys of local properties that should not be inherited by children threads.
*/
private[spark] val nonInheritedLocalProperties: HashSet[String] = new HashSet[String]

/* ------------------------------------------------------------------------------------- *
| Initialization. This code initializes the context in a manner that is exception-safe. |
| All internal fields holding state are initialized here, and any error prompts the |
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
sparkContext.addSparkListener(listener)
sparkContext.ui.foreach(new SQLTab(this, _))

// Ensure query execution IDs are not inherited across the thread hierarchy, which is
// the default behavior for SparkContext local properties. Otherwise, we may confuse
// the listener as to which query is being executed. (SPARK-10548)
sparkContext.nonInheritedLocalProperties.add(SQLExecution.EXECUTION_ID_KEY)

/**
* Set Spark SQL configuration properties.
*
Expand Down

0 comments on commit 8ceae42

Please sign in to comment.