From b4cd73eb804da65f456af37b89fcda1acdb263f0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 24 Oct 2014 14:31:03 -0700 Subject: [PATCH 1/7] remove unnecessary broadcast for conf --- .../scala/org/apache/spark/SparkContext.scala | 4 +--- .../spark/api/python/PythonHadoopUtil.scala | 4 ++-- .../org/apache/spark/api/python/PythonRDD.scala | 15 +++++---------- .../org/apache/spark/rdd/CheckpointRDD.scala | 16 +++++++--------- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 7 +++---- .../org/apache/spark/rdd/NewHadoopRDD.scala | 7 +++---- .../org/apache/spark/rdd/RDDCheckpointData.scala | 5 ++--- .../org/apache/spark/sql/hive/TableReader.scala | 7 +++---- 8 files changed, 26 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ac7935b8c231e..0118f7706fe10 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -566,12 +566,10 @@ class SparkContext(config: SparkConf) extends Logging { valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, - confBroadcast, + new SerializableWritable(hadoopConfiguration), Some(setInputPathsFunc), inputFormatClass, keyClass, diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index 49dc95f349eac..0f0bb087908c1 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -61,7 +61,7 @@ private[python] object Converter extends Logging { * Other objects are passed through without conversion. */ private[python] class WritableToJavaConverter( - conf: Broadcast[SerializableWritable[Configuration]], + conf: SerializableWritable[Configuration], batchSize: Int) extends Converter[Any, Any] { /** @@ -95,7 +95,7 @@ private[python] class WritableToJavaConverter( } map case w: Writable => - if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w + if (batchSize > 1) WritableUtils.clone(w, conf.value) else w case other => other } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 29ca751519abd..d0a4efc1d8f95 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -432,9 +432,8 @@ private[spark] object PythonRDD extends Logging { val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]] val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]] val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration())) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(sc.hadoopConfiguration()), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -458,9 +457,8 @@ private[spark] object PythonRDD extends Logging { val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, Some(path), inputFormatClass, keyClass, valueClass, mergedConf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -484,9 +482,8 @@ private[spark] object PythonRDD extends Logging { val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(conf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -527,9 +524,8 @@ private[spark] object PythonRDD extends Logging { val rdd = hadoopRDDFromClassNames[K, V, F](sc, Some(path), inputFormatClass, keyClass, valueClass, mergedConf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } @@ -553,9 +549,8 @@ private[spark] object PythonRDD extends Logging { val rdd = hadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) - val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf)) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, - new WritableToJavaConverter(confBroadcasted, batchSize)) + new WritableToJavaConverter(new SerializableWritable(conf), batchSize)) JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 7ba1182f0ed27..faa38a08e9aec 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -37,8 +37,6 @@ private[spark] class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { - val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration)) - @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) override def getPartitions: Array[Partition] = { @@ -71,7 +69,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) - CheckpointRDD.readFromFile(file, broadcastedConf, context) + CheckpointRDD.readFromFile(file, new SerializableWritable(sc.hadoopConfiguration), context) } override def checkpoint() { @@ -86,12 +84,12 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T: ClassTag]( path: String, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], + conf: SerializableWritable[Configuration], blockSize: Int = -1 )(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(broadcastedConf.value.value) + val fs = outputDir.getFileSystem(conf.value) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -130,11 +128,11 @@ private[spark] object CheckpointRDD extends Logging { def readFromFile[T]( path: Path, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], + conf: SerializableWritable[Configuration], context: TaskContext ): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(broadcastedConf.value.value) + val fs = path.getFileSystem(conf.value) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() @@ -159,8 +157,8 @@ private[spark] object CheckpointRDD extends Logging { val path = new Path(hdfsPath, "temp") val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) val fs = path.getFileSystem(conf) - val broadcastedConf = sc.broadcast(new SerializableWritable(conf)) - sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _) + sc.runJob(rdd, CheckpointRDD.writeToFile[Int]( + path.toString, new SerializableWritable(conf), 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 775141775e06c..729631061b0b0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], + conf: SerializableWritable[Configuration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], @@ -116,8 +116,7 @@ class HadoopRDD[K, V]( minPartitions: Int) = { this( sc, - sc.broadcast(new SerializableWritable(conf)) - .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + new SerializableWritable(conf), None /* initLocalJobConfFuncOpt */, inputFormatClass, keyClass, @@ -136,7 +135,7 @@ class HadoopRDD[K, V]( // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { - val conf: Configuration = broadcastedConf.value.value + val conf: Configuration = this.conf.value if (shouldCloneJobConf) { // Hadoop Configuration objects are not thread-safe, which may lead to various problems if // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 0cccdefc5ee09..01877fbd0167e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -73,8 +73,7 @@ class NewHadoopRDD[K, V]( with SparkHadoopMapReduceUtil with Logging { - // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + private val sConf = new SerializableWritable(conf) // private val serializableConf = new SerializableWritable(conf) private val jobTrackerId: String = { @@ -104,7 +103,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = sConf.value val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance @@ -190,7 +189,7 @@ class NewHadoopRDD[K, V]( locs.getOrElse(split.getLocations.filter(_ != "localhost")) } - def getConf: Configuration = confBroadcast.value.value + def getConf: Configuration = sConf.value } private[spark] object NewHadoopRDD { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index f67e5f1857979..471920530e0cb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -90,9 +90,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) } // Save to file, and reload it as an RDD - val broadcastedConf = rdd.context.broadcast( - new SerializableWritable(rdd.context.hadoopConfiguration)) - rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _) + val conf = new SerializableWritable(rdd.context.hadoopConfiguration) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, conf) _) val newRDD = new CheckpointRDD[T](rdd.context, path.toString) if (newRDD.partitions.size != rdd.partitions.size) { throw new SparkException( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index fd4f65e488259..d1576183cf30a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -64,8 +64,7 @@ class HadoopTableReader( // TODO: set aws s3 credentials. - private val _broadcastedHiveConf = - sc.sparkContext.broadcast(new SerializableWritable(hiveExtraConf)) + private val conf = new SerializableWritable(hiveExtraConf) override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = makeRDDForTable( @@ -93,7 +92,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val _conf = conf val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) @@ -107,7 +106,7 @@ class HadoopTableReader( val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value + val hconf = _conf.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) From bc46dda866a56b82cc3080c420e3b46434421905 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 24 Oct 2014 22:06:46 -0700 Subject: [PATCH 2/7] thread safety --- .../apache/spark/SerializableWritable.scala | 12 ++-- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 14 +++++ .../org/apache/spark/rdd/CheckpointRDD.scala | 1 - .../org/apache/spark/rdd/HadoopRDD.scala | 62 ++++--------------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 1 - .../apache/spark/sql/hive/TableReader.scala | 11 ++-- 7 files changed, 38 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index e50b9ac2291f9..f1d0590e4b25a 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -19,9 +19,9 @@ package org.apache.spark import java.io._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.annotation.DeveloperApi @@ -30,16 +30,18 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa def value = t override def toString = t.toString - private def writeObject(out: ObjectOutputStream) { + protected def writeObject(out: ObjectOutputStream) { out.defaultWriteObject() new ObjectWritable(t).write(out) } - private def readObject(in: ObjectInputStream) { + protected def readObject(in: ObjectInputStream) { in.defaultReadObject() val ow = new ObjectWritable() - ow.setConf(new Configuration()) - ow.readFields(in) + SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized { + ow.setConf(SparkHadoopUtil.newConfiguration()) + ow.readFields(in) // not thread safe + } t = ow.get().asInstanceOf[T] } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0118f7706fe10..630f39961a53c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -569,7 +569,7 @@ class SparkContext(config: SparkConf) extends Logging { val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, - new SerializableWritable(hadoopConfiguration), + hadoopConfiguration, Some(setInputPathsFunc), inputFormatClass, keyClass, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index fe0ad9ebbca12..e6079212b1694 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -124,6 +124,20 @@ class SparkHadoopUtil extends Logging { } object SparkHadoopUtil { + /** + * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). + * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). + */ + val CONFIGURATION_INSTANTIATION_LOCK = new Object() + + /** + * Create a new Configuration in thread-safe way + */ + def newConfiguration(): Configuration = { + CONFIGURATION_INSTANTIATION_LOCK.synchronized { + new Configuration() + } + } private val hadoop = { val yarnMode = java.lang.Boolean.valueOf( diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index faa38a08e9aec..c8216a605818d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark._ -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 729631061b0b0..b0b39477d6811 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD @@ -86,7 +85,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * [[org.apache.spark.SparkContext.hadoopRDD()]] * * @param sc The SparkContext to associate the RDD with. - * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed + * @param conf A general Hadoop Configuration, or a subclass of it. If the enclosed * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD @@ -99,7 +98,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, - conf: SerializableWritable[Configuration], + @transient conf: Configuration, initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], @@ -107,6 +106,9 @@ class HadoopRDD[K, V]( minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { + // The serializable configuration + private val sConf = new SerializableWritable(conf) + def this( sc: SparkContext, conf: JobConf, @@ -116,7 +118,7 @@ class HadoopRDD[K, V]( minPartitions: Int) = { this( sc, - new SerializableWritable(conf), + conf, None /* initLocalJobConfFuncOpt */, inputFormatClass, keyClass, @@ -124,55 +126,19 @@ class HadoopRDD[K, V]( minPartitions) } - protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) - protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) // used to build JobTracker ID private val createTime = new Date() - private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean - // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = { - val conf: Configuration = this.conf.value - if (shouldCloneJobConf) { - // Hadoop Configuration objects are not thread-safe, which may lead to various problems if - // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs - // somewhat rarely because most jobs treat the configuration as though it's immutable. One - // solution, implemented here, is to clone the Configuration object. Unfortunately, this - // clone can be very expensive. To avoid unexpected performance regressions for workloads and - // Hadoop versions that do not suffer from these thread-safety issues, this cloning is - // disabled by default. - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - logDebug("Cloning Hadoop Configuration") - val newJobConf = new JobConf(conf) - if (!conf.isInstanceOf[JobConf]) { - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - } + protected def getJobConf(): JobConf = sConf.value match { + case jobConf: JobConf => jobConf + case c => SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK synchronized { + val newJobConf = new JobConf(c) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) newJobConf } - } else { - if (conf.isInstanceOf[JobConf]) { - logDebug("Re-using user-broadcasted JobConf") - conf.asInstanceOf[JobConf] - } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { - logDebug("Re-using cached JobConf") - HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] - } else { - // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the - // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - // Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456). - HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { - logDebug("Creating new JobConf and caching it for later re-use") - val newJobConf = new JobConf(conf) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf - } - } - } } protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { @@ -295,12 +261,6 @@ class HadoopRDD[K, V]( } private[spark] object HadoopRDD extends Logging { - /** - * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). - * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). - */ - val CONFIGURATION_INSTANTIATION_LOCK = new Object() - /** * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 01877fbd0167e..6f745e4bc7bde 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -74,7 +74,6 @@ class NewHadoopRDD[K, V]( with Logging { private val sConf = new SerializableWritable(conf) - // private val serializableConf = new SerializableWritable(conf) private val jobTrackerId: String = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index d1576183cf30a..8d42de69c2959 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} import org.apache.spark.SerializableWritable -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ @@ -64,7 +63,7 @@ class HadoopTableReader( // TODO: set aws s3 credentials. - private val conf = new SerializableWritable(hiveExtraConf) + private val conf: SerializableWritable[Configuration] = new SerializableWritable(hiveExtraConf) override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = makeRDDForTable( @@ -157,7 +156,7 @@ class HadoopTableReader( // Create local references so that the outer object isn't serialized. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val _conf = conf val localDeserializer = partDeserializer val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) @@ -179,7 +178,7 @@ class HadoopTableReader( fillPartitionKeys(partValues, mutableRow) createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter => - val hconf = broadcastedHiveConf.value.value + val hconf = _conf.value val deserializer = localDeserializer.newInstance() deserializer.initialize(hconf, partProps) @@ -211,7 +210,7 @@ class HadoopTableReader( } /** - * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * Creates a HadoopRDD based on the HiveConf and other job properties that will be * applied locally on each slave. */ private def createHadoopRdd( @@ -223,7 +222,7 @@ class HadoopTableReader( val rdd = new HadoopRDD( sc.sparkContext, - _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + conf.value, Some(initializeJobConfFunc), inputFormatClass, classOf[Writable], From 74f4102c8d98e80f1a556b3f185f62637bc6f238 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 24 Oct 2014 22:17:10 -0700 Subject: [PATCH 3/7] bugfix --- .../scala/org/apache/spark/SerializableWritable.scala | 4 ++-- .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 4 ++-- .../main/scala/org/apache/spark/rdd/CheckpointRDD.scala | 8 +++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index f1d0590e4b25a..547a3d3025c54 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -30,12 +30,12 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa def value = t override def toString = t.toString - protected def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream) { out.defaultWriteObject() new ObjectWritable(t).write(out) } - protected def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream) { in.defaultReadObject() val ow = new ObjectWritable() SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e6079212b1694..e782c724a0e61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -128,12 +128,12 @@ object SparkHadoopUtil { * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). * Therefore, we synchronize on this lock before calling new JobConf() or new Configuration(). */ - val CONFIGURATION_INSTANTIATION_LOCK = new Object() + private[spark] val CONFIGURATION_INSTANTIATION_LOCK = new Object() /** * Create a new Configuration in thread-safe way */ - def newConfiguration(): Configuration = { + private[spark] def newConfiguration(): Configuration = { CONFIGURATION_INSTANTIATION_LOCK.synchronized { new Configuration() } diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index c8216a605818d..fca34e355d54a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -37,6 +38,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) + private val sConf = new SerializableWritable(sc.hadoopConfiguration) override def getPartitions: Array[Partition] = { val cpath = new Path(checkpointPath) @@ -68,7 +70,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) - CheckpointRDD.readFromFile(file, new SerializableWritable(sc.hadoopConfiguration), context) + CheckpointRDD.readFromFile(file, sConf.value, context) } override def checkpoint() { @@ -127,11 +129,11 @@ private[spark] object CheckpointRDD extends Logging { def readFromFile[T]( path: Path, - conf: SerializableWritable[Configuration], + conf: Configuration, context: TaskContext ): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(conf.value) + val fs = path.getFileSystem(conf) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() From 8b0fcd8eeebc03fc2e3c014347e690854117e6a6 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 24 Oct 2014 22:21:37 -0700 Subject: [PATCH 4/7] refactor --- .../scala/org/apache/spark/rdd/CheckpointRDD.scala | 11 ++++++----- .../org/apache/spark/rdd/RDDCheckpointData.scala | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index fca34e355d54a..041c626e06ef7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark._ -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -37,9 +36,11 @@ private[spark] class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { - @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) + // Serializable configuration private val sConf = new SerializableWritable(sc.hadoopConfiguration) + @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) + override def getPartitions: Array[Partition] = { val cpath = new Path(checkpointPath) val numPartitions = @@ -85,12 +86,12 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T: ClassTag]( path: String, - conf: SerializableWritable[Configuration], + conf: Configuration, blockSize: Int = -1 )(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(conf.value) + val fs = outputDir.getFileSystem(conf) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -159,7 +160,7 @@ private[spark] object CheckpointRDD extends Logging { val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) val fs = path.getFileSystem(conf) sc.runJob(rdd, CheckpointRDD.writeToFile[Int]( - path.toString, new SerializableWritable(conf), 1024) _) + path.toString, conf, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index 471920530e0cb..b0a96c3da13af 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -90,8 +90,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) } // Save to file, and reload it as an RDD - val conf = new SerializableWritable(rdd.context.hadoopConfiguration) - rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, conf) _) + rdd.context.runJob(rdd, + CheckpointRDD.writeToFile[T](path.toString, rdd.context.hadoopConfiguration) _) val newRDD = new CheckpointRDD[T](rdd.context, path.toString) if (newRDD.partitions.size != rdd.partitions.size) { throw new SparkException( From 0de73d40e3fcee9297f4179c3b336cb786d57a4a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 24 Oct 2014 22:23:45 -0700 Subject: [PATCH 5/7] refactor --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 8d42de69c2959..a10d3269de44b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -63,7 +63,7 @@ class HadoopTableReader( // TODO: set aws s3 credentials. - private val conf: SerializableWritable[Configuration] = new SerializableWritable(hiveExtraConf) + private val conf = new SerializableWritable(hiveExtraConf) override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] = makeRDDForTable( From 8694cb31a49aa10c97cc4f689b38eded872d7aea Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 24 Oct 2014 22:26:28 -0700 Subject: [PATCH 6/7] remove docs --- docs/configuration.md | 9 --------- 1 file changed, 9 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 66738d3ca754e..e9c146b58e909 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -630,15 +630,6 @@ Apart from these, the following properties are also available, and may be useful output directories. We recommend that users do not disable this except if trying to achieve compatibility with previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand. - - spark.hadoop.cloneConf - false - If set to true, clones a new Hadoop Configuration object for each task. This - option should be enabled to work around Configuration thread-safety issues (see - SPARK-2546 for more details). - This is disabled by default in order to avoid unexpected performance regressions for jobs that - are not affected by these issues. - spark.executor.heartbeatInterval 10000 From 1fd70df3f90652d544109766d61286309892a93f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 24 Oct 2014 23:34:59 -0700 Subject: [PATCH 7/7] bugfix --- .../src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala | 6 +++--- .../main/scala/org/apache/spark/rdd/RDDCheckpointData.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 041c626e06ef7..39cf0683337a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -86,12 +86,12 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T: ClassTag]( path: String, - conf: Configuration, + conf: SerializableWritable[Configuration], blockSize: Int = -1 )(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(conf) + val fs = outputDir.getFileSystem(conf.value) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -160,7 +160,7 @@ private[spark] object CheckpointRDD extends Logging { val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) val fs = path.getFileSystem(conf) sc.runJob(rdd, CheckpointRDD.writeToFile[Int]( - path.toString, conf, 1024) _) + path.toString, new SerializableWritable(conf), 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index b0a96c3da13af..8a989559a2d4f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -90,8 +90,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) } // Save to file, and reload it as an RDD - rdd.context.runJob(rdd, - CheckpointRDD.writeToFile[T](path.toString, rdd.context.hadoopConfiguration) _) + val sConf = new SerializableWritable(rdd.context.hadoopConfiguration) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, sConf) _) val newRDD = new CheckpointRDD[T](rdd.context, path.toString) if (newRDD.partitions.size != rdd.partitions.size) { throw new SparkException(