Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2585] remove unnecessary broadcast for conf #2935

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark

import java.io._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable
import org.apache.spark.deploy.SparkHadoopUtil

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
Expand All @@ -39,8 +39,10 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
ow.readFields(in)
SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
ow.setConf(SparkHadoopUtil.newConfiguration())
ow.readFields(in) // not thread safe
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

}
t = ow.get().asInstanceOf[T]
}
}
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
hadoopConfiguration,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
* Other objects are passed through without conversion.
*/
private[python] class WritableToJavaConverter(
conf: Broadcast[SerializableWritable[Configuration]],
conf: SerializableWritable[Configuration],
batchSize: Int) extends Converter[Any, Any] {

/**
Expand Down Expand Up @@ -95,7 +95,7 @@ private[python] class WritableToJavaConverter(
}
map
case w: Writable =>
if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
if (batchSize > 1) WritableUtils.clone(w, conf.value) else w
case other => other
}
}
Expand Down
15 changes: 5 additions & 10 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,8 @@ private[spark] object PythonRDD extends Logging {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted, batchSize))
new WritableToJavaConverter(new SerializableWritable(sc.hadoopConfiguration()), batchSize))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}

Expand All @@ -467,9 +466,8 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted, batchSize))
new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}

Expand All @@ -493,9 +491,8 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted, batchSize))
new WritableToJavaConverter(new SerializableWritable(conf), batchSize))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}

Expand Down Expand Up @@ -536,9 +533,8 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted, batchSize))
new WritableToJavaConverter(new SerializableWritable(mergedConf), batchSize))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}

Expand All @@ -562,9 +558,8 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted, batchSize))
new WritableToJavaConverter(new SerializableWritable(conf), batchSize))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
}

Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ class SparkHadoopUtil extends Logging {
}

object SparkHadoopUtil {
/**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
*/
private[spark] val CONFIGURATION_INSTANTIATION_LOCK = new Object()

/**
* Create a new Configuration in thread-safe way
*/
private[spark] def newConfiguration(): Configuration = {
CONFIGURATION_INSTANTIATION_LOCK.synchronized {
new Configuration()
}
}

private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil

private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
Expand All @@ -37,7 +36,8 @@ private[spark]
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {

val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
// Serializable configuration
private val sConf = new SerializableWritable(sc.hadoopConfiguration)

@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)

Expand Down Expand Up @@ -71,7 +71,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
CheckpointRDD.readFromFile(file, broadcastedConf, context)
CheckpointRDD.readFromFile(file, sConf.value, context)
}

override def checkpoint() {
Expand All @@ -86,12 +86,12 @@ private[spark] object CheckpointRDD extends Logging {

def writeToFile[T: ClassTag](
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
conf: SerializableWritable[Configuration],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val fs = outputDir.getFileSystem(conf.value)

val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
Expand Down Expand Up @@ -130,11 +130,11 @@ private[spark] object CheckpointRDD extends Logging {

def readFromFile[T](
path: Path,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
conf: Configuration,
context: TaskContext
): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(broadcastedConf.value.value)
val fs = path.getFileSystem(conf)
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
Expand All @@ -159,8 +159,8 @@ private[spark] object CheckpointRDD extends Logging {
val path = new Path(hdfsPath, "temp")
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](
path.toString, new SerializableWritable(conf), 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
Expand Down
63 changes: 11 additions & 52 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
Expand Down Expand Up @@ -85,7 +84,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* [[org.apache.spark.SparkContext.hadoopRDD()]]
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* @param conf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
Expand All @@ -98,14 +97,17 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
@DeveloperApi
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
@transient conf: Configuration,
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {

// The serializable configuration
private val sConf = new SerializableWritable(conf)

def this(
sc: SparkContext,
conf: JobConf,
Expand All @@ -115,64 +117,27 @@ class HadoopRDD[K, V](
minPartitions: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
conf,
None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
valueClass,
minPartitions)
}

protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)

protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)

// used to build JobTracker ID
private val createTime = new Date()

private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (shouldCloneJobConf) {
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
// one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
// somewhat rarely because most jobs treat the configuration as though it's immutable. One
// solution, implemented here, is to clone the Configuration object. Unfortunately, this
// clone can be very expensive. To avoid unexpected performance regressions for workloads and
// Hadoop versions that do not suffer from these thread-safety issues, this cloning is
// disabled by default.
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf)
if (!conf.isInstanceOf[JobConf]) {
initLocalJobConfFuncOpt.map(f => f(newJobConf))
}
protected def getJobConf(): JobConf = sConf.value match {
case jobConf: JobConf => jobConf
case c => SparkHadoopUtil.CONFIGURATION_INSTANTIATION_LOCK synchronized {
val newJobConf = new JobConf(c)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
newJobConf
}
} else {
if (conf.isInstanceOf[JobConf]) {
logDebug("Re-using user-broadcasted JobConf")
conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
logDebug("Re-using cached JobConf")
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Creating new JobConf and caching it for later re-use")
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
}
}
}
}

protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
Expand Down Expand Up @@ -319,12 +284,6 @@ class HadoopRDD[K, V](
}

private[spark] object HadoopRDD extends Logging {
/**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()

/** Update the input bytes read metric each time this number of records has been read */
val RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES = 256

Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ class NewHadoopRDD[K, V](
with SparkHadoopMapReduceUtil
with Logging {

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
private val sConf = new SerializableWritable(conf)

private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
Expand Down Expand Up @@ -106,7 +104,7 @@ class NewHadoopRDD[K, V](
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val conf = sConf.value
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
Expand Down Expand Up @@ -220,7 +218,7 @@ class NewHadoopRDD[K, V](
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}

def getConf: Configuration = confBroadcast.value.value
def getConf: Configuration = sConf.value
}

private[spark] object NewHadoopRDD {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
}

// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
val sConf = new SerializableWritable(rdd.context.hadoopConfiguration)
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, sConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
throw new SparkException(
Expand Down
9 changes: 0 additions & 9 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -639,15 +639,6 @@ Apart from these, the following properties are also available, and may be useful
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr>
<tr>
<td><code>spark.hadoop.cloneConf</code></td>
<td>false</td>
<td>If set to true, clones a new Hadoop <code>Configuration</code> object for each task. This
option should be enabled to work around <code>Configuration</code> thread-safety issues (see
<a href="https://issues.apache.org/jira/browse/SPARK-2546">SPARK-2546</a> for more details).
This is disabled by default in order to avoid unexpected performance regressions for jobs that
are not affected by these issues.</td>
</tr>
<tr>
<td><code>spark.executor.heartbeatInterval</code></td>
<td>10000</td>
Expand Down
Loading