diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 923b4ed68839c..566472e597958 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -341,7 +341,7 @@ class SparkContext( */ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minSplits, cloneRecords = false).map(pair => pair._2.toString) + minSplits).map(pair => pair._2.toString) } /** @@ -354,33 +354,37 @@ class SparkContext( * @param keyClass Class of the keys * @param valueClass Class of the values * @param minSplits Minimum number of Hadoop Splits to generate. - * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. - * Most RecordReader implementations reuse wrapper objects across multiple - * records, and can cause problems in RDD collect or aggregation operations. - * By default the records are cloned in Spark. However, application - * programmers can explicitly disable the cloning for better performance. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ - def hadoopRDD[K: ClassTag, V: ClassTag]( + def hadoopRDD[K, V]( conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits, - cloneRecords: Boolean = true + minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ - def hadoopFile[K: ClassTag, V: ClassTag]( + /** Get an RDD for a Hadoop file with an arbitrary InputFormat + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + * */ + def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits, - cloneRecords: Boolean = true + minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -392,8 +396,7 @@ class SparkContext( inputFormatClass, keyClass, valueClass, - minSplits, - cloneRecords) + minSplits) } /** @@ -403,16 +406,20 @@ class SparkContext( * {{{ * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) * }}} + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ def hadoopFile[K, V, F <: InputFormat[K, V]] - (path: String, minSplits: Int, cloneRecords: Boolean = true) + (path: String, minSplits: Int) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], - minSplits, - cloneRecords) + minSplits) } /** @@ -422,68 +429,91 @@ class SparkContext( * {{{ * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) * }}} + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ - def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true) + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = - hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords) + hadoopFile[K, V, F](path, defaultMinSplits) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] - (path: String, cloneRecords: Boolean = true) + (path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { newAPIHadoopFile( path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], - vm.runtimeClass.asInstanceOf[Class[V]], - cloneRecords = cloneRecords) + vm.runtimeClass.asInstanceOf[Class[V]]) } /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ - def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]]( + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration = hadoopConfiguration, - cloneRecords: Boolean = true): RDD[(K, V)] = { + conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) val updatedConf = job.getConfiguration - new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords) + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) } /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ - def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]]( + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], - vClass: Class[V], - cloneRecords: Boolean = true): RDD[(K, V)] = { - new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords) - } - - /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ - def sequenceFile[K: ClassTag, V: ClassTag](path: String, + vClass: Class[V]): RDD[(K, V)] = { + new NewHadoopRDD(this, fClass, kClass, vClass, conf) + } + + /** Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ + def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], - minSplits: Int, - cloneRecords: Boolean = true + minSplits: Int ): RDD[(K, V)] = { val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ - def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V], - cloneRecords: Boolean = true): RDD[(K, V)] = - sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords) + /** Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + * */ + def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V] + ): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, defaultMinSplits) /** * Version of sequenceFile() for types implicitly convertible to Writables through a @@ -500,9 +530,14 @@ class SparkContext( * have a parameterized singleton object). We use functions instead to create a new converter * for the appropriate type. In addition, we pass the converter a ClassTag of its type to * allow it to figure out the Writable class to use in the subclass case. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ def sequenceFile[K, V] - (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true) + (path: String, minSplits: Int = defaultMinSplits) (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { @@ -511,7 +546,7 @@ class SparkContext( val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords) + vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } } @@ -1024,7 +1059,7 @@ object SparkContext { implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd) implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( - rdd: RDD[(K, V)]) = + rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag]( diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 50ac700823fba..5a426b983519c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -137,7 +137,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork */ def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits) - /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + * */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -148,19 +154,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ - def sequenceFile[K, V](path: String, - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int, - cloneRecords: Boolean - ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords)) - } - - /** Get an RDD for a Hadoop SequenceFile. */ + /** Get an RDD for a Hadoop SequenceFile. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): JavaPairRDD[K, V] = { implicit val kcm: ClassTag[K] = ClassTag(keyClass) @@ -168,15 +168,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass)) } - /** Get an RDD for a Hadoop SequenceFile. */ - def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], - cloneRecords: Boolean): - JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords)) - } - /** * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and * BytesWritable values that contain a serialized partition. This is still an experimental storage @@ -205,6 +196,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, * etc). + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ def hadoopRDD[K, V, F <: InputFormat[K, V]]( conf: JobConf, @@ -218,41 +214,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) } - - /** - * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other - * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), - * using the older MapReduce API (`org.apache.hadoop.mapred`). - * - * @param conf JobConf for setting up the dataset - * @param inputFormatClass Class of the [[InputFormat]] - * @param keyClass Class of the keys - * @param valueClass Class of the values - * @param minSplits Minimum number of Hadoop Splits to generate. - * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. - * Most RecordReader implementations reuse wrapper objects across multiple - * records, and can cause problems in RDD collect or aggregation operations. - * By default the records are cloned in Spark. However, application - * programmers can explicitly disable the cloning for better performance. - */ - def hadoopRDD[K, V, F <: InputFormat[K, V]]( - conf: JobConf, - inputFormatClass: Class[F], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int, - cloneRecords: Boolean - ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits, - cloneRecords)) - } - /** * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, - * etc). + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ def hadoopRDD[K, V, F <: InputFormat[K, V]]( conf: JobConf, @@ -265,7 +234,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ + /** Get an RDD for a Hadoop file with an arbitrary InputFormat. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, inputFormatClass: Class[F], @@ -278,22 +253,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ - def hadoopFile[K, V, F <: InputFormat[K, V]]( - path: String, - inputFormatClass: Class[F], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int, - cloneRecords: Boolean - ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, - minSplits, cloneRecords)) - } - - /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ + /** Get an RDD for a Hadoop file with an arbitrary InputFormat + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. + */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, inputFormatClass: Class[F], @@ -306,23 +272,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork inputFormatClass, keyClass, valueClass)) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ - def hadoopFile[K, V, F <: InputFormat[K, V]]( - path: String, - inputFormatClass: Class[F], - keyClass: Class[K], - valueClass: Class[V], - cloneRecords: Boolean - ): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(keyClass) - implicit val vcm: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.hadoopFile(path, - inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords)) - } - /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String, @@ -338,22 +295,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. - */ - def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( - path: String, - fClass: Class[F], - kClass: Class[K], - vClass: Class[V], - conf: Configuration, - cloneRecords: Boolean): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(kClass) - implicit val vcm: ClassTag[V] = ClassTag(vClass) - new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords)) - } - - /** - * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat - * and extra configuration options to pass to the input format. + * + * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each + * record, directly caching the returned RDD will create many references to the same object. + * If you plan to directly cache Hadoop writable objects, you should first copy them using + * a `map` function. */ def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration, @@ -365,21 +311,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } - /** - * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat - * and extra configuration options to pass to the input format. - */ - def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( - conf: Configuration, - fClass: Class[F], - kClass: Class[K], - vClass: Class[V], - cloneRecords: Boolean): JavaPairRDD[K, V] = { - implicit val kcm: ClassTag[K] = ClassTag(kClass) - implicit val vcm: ClassTag[V] = ClassTag(vClass) - new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords)) - } - /** Build the union of two or more RDDs. */ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index dbe76f34316ae..ad74d4636fb1b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -19,10 +19,7 @@ package org.apache.spark.rdd import java.io.EOFException -import scala.reflect.ClassTag - import org.apache.hadoop.conf.{Configuration, Configurable} -import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -34,7 +31,6 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator -import org.apache.spark.util.Utils.cloneWritables /** @@ -64,21 +60,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. - * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. - * Most RecordReader implementations reuse wrapper objects across multiple - * records, and can cause problems in RDD collect or aggregation operations. - * By default the records are cloned in Spark. However, application - * programmers can explicitly disable the cloning for better performance. */ -class HadoopRDD[K: ClassTag, V: ClassTag]( +class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableWritable[Configuration]], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int, - cloneRecords: Boolean = true) + minSplits: Int) extends RDD[(K, V)](sc, Nil) with Logging { def this( @@ -87,8 +77,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int, - cloneRecords: Boolean) = { + minSplits: Int) = { this( sc, sc.broadcast(new SerializableWritable(conf)) @@ -97,8 +86,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( inputFormatClass, keyClass, valueClass, - minSplits, - cloneRecords) + minSplits) } protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -170,9 +158,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() - val keyCloneFunc = cloneWritables[K](jobConf) val value: V = reader.createValue() - val valueCloneFunc = cloneWritables[V](jobConf) override def getNext() = { try { finished = !reader.next(key, value) @@ -180,11 +166,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( case eof: EOFException => finished = true } - if (cloneRecords) { - (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable])) - } else { - (key, value) - } + (key, value) } override def close() { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 992bd4aa0ad5d..d1fff296878c3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -20,15 +20,11 @@ package org.apache.spark.rdd import java.text.SimpleDateFormat import java.util.Date -import scala.reflect.ClassTag - import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} -import org.apache.spark.util.Utils.cloneWritables - private[spark] class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) @@ -48,19 +44,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param conf The Hadoop configuration. - * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. - * Most RecordReader implementations reuse wrapper objects across multiple - * records, and can cause problems in RDD collect or aggregation operations. - * By default the records are cloned in Spark. However, application - * programmers can explicitly disable the cloning for better performance. */ -class NewHadoopRDD[K: ClassTag, V: ClassTag]( +class NewHadoopRDD[K, V]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - @transient conf: Configuration, - cloneRecords: Boolean) + @transient conf: Configuration) extends RDD[(K, V)](sc, Nil) with SparkHadoopMapReduceUtil with Logging { @@ -107,8 +97,6 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) - val keyCloneFunc = cloneWritables[K](conf) - val valueCloneFunc = cloneWritables[V](conf) var havePair = false var finished = false @@ -125,13 +113,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( throw new java.util.NoSuchElementException("End of stream") } havePair = false - val key = reader.getCurrentKey - val value = reader.getCurrentValue - if (cloneRecords) { - (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable])) - } else { - (key, value) - } + (reader.getCurrentKey, reader.getCurrentValue) } private def close() { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index caa9bf4c9280e..61d8ef5efeb00 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,7 +26,6 @@ import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source -import scala.reflect.{classTag, ClassTag} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -46,27 +45,6 @@ import org.apache.spark.{SparkConf, SparkException, Logging} */ private[spark] object Utils extends Logging { - /** - * We try to clone for most common types of writables and we call WritableUtils.clone otherwise - * intention is to optimize, for example for NullWritable there is no need and for Long, int and - * String creating a new object with value set would be faster. - */ - def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = { - val cloneFunc = classTag[T] match { - case ClassTag(_: Text) => - (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T] - case ClassTag(_: LongWritable) => - (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T] - case ClassTag(_: IntWritable) => - (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T] - case ClassTag(_: NullWritable) => - (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ? - case _ => - (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning. - } - cloneFunc - } - /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream()