Skip to content

Commit

Permalink
Merge pull request apache#502 from pwendell/clone-1
Browse files Browse the repository at this point in the history
Remove Hadoop object cloning and warn users making Hadoop RDD's.

The code introduced in apache#359 used Hadoop's WritableUtils.clone() to
duplicate objects when reading from Hadoop files. Some users have
reported exceptions when cloning data in various file formats,
including Avro and another custom format.

This patch removes that functionality to ensure stability for the
0.9 release. Instead, it puts a clear warning in the documentation
that copying may be necessary for Hadoop data sets.
(cherry picked from commit c319617)

Conflicts:

	core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
  • Loading branch information
pwendell committed Jan 24, 2014
1 parent 7a62353 commit e8d3f2b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 116 deletions.
127 changes: 81 additions & 46 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class SparkContext(
*/
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits, cloneRecords = false).map(pair => pair._2.toString)
minSplits).map(pair => pair._2.toString)
}

/**
Expand All @@ -354,33 +354,37 @@ class SparkContext(
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
* @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
* Most RecordReader implementations reuse wrapper objects across multiple
* records, and can cause problems in RDD collect or aggregation operations.
* By default the records are cloned in Spark. However, application
* programmers can explicitly disable the cloning for better performance.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def hadoopRDD[K: ClassTag, V: ClassTag](
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits,
cloneRecords: Boolean = true
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K: ClassTag, V: ClassTag](
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* */
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits,
cloneRecords: Boolean = true
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
Expand All @@ -392,8 +396,7 @@ class SparkContext(
inputFormatClass,
keyClass,
valueClass,
minSplits,
cloneRecords)
minSplits)
}

/**
Expand All @@ -403,16 +406,20 @@ class SparkContext(
* {{{
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minSplits: Int, cloneRecords: Boolean = true)
(path: String, minSplits: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
minSplits,
cloneRecords)
minSplits)
}

/**
Expand All @@ -422,68 +429,91 @@ class SparkContext(
* {{{
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
hadoopFile[K, V, F](path, defaultMinSplits)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String, cloneRecords: Boolean = true)
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
cloneRecords = cloneRecords)
vm.runtimeClass.asInstanceOf[Class[V]])
}

/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration,
cloneRecords: Boolean = true): RDD[(K, V)] = {
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
}

/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
cloneRecords: Boolean = true): RDD[(K, V)] = {
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K: ClassTag, V: ClassTag](path: String,
vClass: Class[V]): RDD[(K, V)] = {
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int,
cloneRecords: Boolean = true
minSplits: Int
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
cloneRecords: Boolean = true): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits)

/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
Expand All @@ -500,9 +530,14 @@ class SparkContext(
* have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def sequenceFile[K, V]
(path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
(path: String, minSplits: Int = defaultMinSplits)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
Expand All @@ -511,7 +546,7 @@ class SparkContext(
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}

Expand Down Expand Up @@ -1024,7 +1059,7 @@ object SparkContext {
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)

/**Get an RDD for a Hadoop SequenceFile with given key and value types. */
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
Expand All @@ -148,7 +154,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}

/**Get an RDD for a Hadoop SequenceFile. */
/** Get an RDD for a Hadoop SequenceFile.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
Expand Down Expand Up @@ -184,6 +196,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](
conf: JobConf,
Expand All @@ -201,6 +218,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](
conf: JobConf,
Expand All @@ -213,7 +235,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
/** Get an RDD for a Hadoop file with an arbitrary InputFormat.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
Expand All @@ -226,7 +254,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
Expand All @@ -242,6 +276,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
Expand All @@ -257,6 +296,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration,
Expand Down
Loading

0 comments on commit e8d3f2b

Please sign in to comment.