Skip to content

Commit

Permalink
[SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each task.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Oct 6, 2014
1 parent 964e3aa commit dd25697
Showing 1 changed file with 8 additions and 17 deletions.
25 changes: 8 additions & 17 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,12 @@ class HadoopRDD[K, V](
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
val newJobConf = new JobConf(conf)
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
val newJobConf = new JobConf(conf)
if (!conf.isInstanceOf[JobConf]) {
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
}
newJobConf
}
}

Expand Down Expand Up @@ -257,7 +245,10 @@ class HadoopRDD[K, V](
}

private[spark] object HadoopRDD {
/** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
/**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()

/**
Expand Down

0 comments on commit dd25697

Please sign in to comment.