diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index e1e5c1231f942..12140fad24516 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -349,14 +349,14 @@ private[spark] object PythonRDD extends Logging { } } - /** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */ + /** Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */ def sequenceFile[K, V]( sc: JavaSparkContext, path: String, keyClass: String, valueClass: String, - keyWrapper: String, - valueWrapper: String, + keyConverter: String, + valueConverter: String, minSplits: Int) = { implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] @@ -374,18 +374,18 @@ private[spark] object PythonRDD extends Logging { def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( sc: JavaSparkContext, path: String, - inputFormatClazz: String, - keyClazz: String, - valueClazz: String, - keyWrapper: String, - valueWrapper: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverter: String, + valueConverter: String, confAsMap: java.util.HashMap[String, String]) = { val conf = PythonHadoopUtil.mapToConf(confAsMap) val baseConf = sc.hadoopConfiguration() val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, - Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf) + Some(path), inputFormatClass, keyClass, valueClass, mergedConf) val converted = PythonHadoopUtil.convertRDD[K, V](rdd) JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) } @@ -397,16 +397,16 @@ private[spark] object PythonRDD extends Logging { */ def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( sc: JavaSparkContext, - inputFormatClazz: String, - keyClazz: String, - valueClazz: String, - keyWrapper: String, - valueWrapper: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverter: String, + valueConverter: String, confAsMap: java.util.HashMap[String, String]) = { val conf = PythonHadoopUtil.mapToConf(confAsMap) val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, - None, inputFormatClazz, keyClazz, valueClazz, conf) + None, inputFormatClass, keyClass, valueClass, conf) val converted = PythonHadoopUtil.convertRDD[K, V](rdd) JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) } @@ -414,13 +414,13 @@ private[spark] object PythonRDD extends Logging { private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]]( sc: JavaSparkContext, path: Option[String] = None, - inputFormatClazz: String, - keyClazz: String, - valueClazz: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, conf: Configuration) = { - implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]] - implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]] - implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]] + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] val kc = kcm.runtimeClass.asInstanceOf[Class[K]] val vc = vcm.runtimeClass.asInstanceOf[Class[V]] val fc = fcm.runtimeClass.asInstanceOf[Class[F]] @@ -439,18 +439,18 @@ private[spark] object PythonRDD extends Logging { def hadoopFile[K, V, F <: InputFormat[K, V]]( sc: JavaSparkContext, path: String, - inputFormatClazz: String, - keyClazz: String, - valueClazz: String, - keyWrapper: String, - valueWrapper: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverter: String, + valueConverter: String, confAsMap: java.util.HashMap[String, String]) = { val conf = PythonHadoopUtil.mapToConf(confAsMap) val baseConf = sc.hadoopConfiguration() val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) val rdd = hadoopRDDFromClassNames[K, V, F](sc, - Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf) + Some(path), inputFormatClass, keyClass, valueClass, mergedConf) val converted = PythonHadoopUtil.convertRDD[K, V](rdd) JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) } @@ -462,16 +462,16 @@ private[spark] object PythonRDD extends Logging { */ def hadoopRDD[K, V, F <: InputFormat[K, V]]( sc: JavaSparkContext, - inputFormatClazz: String, - keyClazz: String, - valueClazz: String, - keyWrapper: String, - valueWrapper: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverter: String, + valueConverter: String, confAsMap: java.util.HashMap[String, String]) = { val conf = PythonHadoopUtil.mapToConf(confAsMap) val rdd = hadoopRDDFromClassNames[K, V, F](sc, - None, inputFormatClazz, keyClazz, valueClazz, conf) + None, inputFormatClass, keyClass, valueClass, conf) val converted = PythonHadoopUtil.convertRDD[K, V](rdd) JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) } @@ -479,13 +479,13 @@ private[spark] object PythonRDD extends Logging { private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]]( sc: JavaSparkContext, path: Option[String] = None, - inputFormatClazz: String, - keyClazz: String, - valueClazz: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, conf: Configuration) = { - implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]] - implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]] - implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]] + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] val kc = kcm.runtimeClass.asInstanceOf[Class[K]] val vc = vcm.runtimeClass.asInstanceOf[Class[V]] val fc = fcm.runtimeClass.asInstanceOf[Class[F]] diff --git a/python/pyspark/context.py b/python/pyspark/context.py index f6ed5efc80696..2f227c0ba3b66 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -328,13 +328,15 @@ def wholeTextFiles(self, path, minPartitions=None): return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) - def dictToJavaMap(self, d): + def _dictToJavaMap(self, d): jm = self._jvm.java.util.HashMap() + if not d: + d = {} for k, v in d.iteritems(): jm[k] = v return jm - def sequenceFile(self, path, keyClass, valueClass, keyConverter="", valueConverter="", + def sequenceFile(self, path, keyClass, valueClass, keyConverter=None, valueConverter=None, minSplits=None): """ Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, @@ -372,8 +374,8 @@ def sequenceFile(self, path, keyClass, valueClass, keyConverter="", valueConvert keyConverter, valueConverter, minSplits) return RDD(jrdd, self, PickleSerializer()) - def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, - keyConverter="", valueConverter="", conf={}): + def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -382,26 +384,26 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java """ - jconf = self.dictToJavaMap(conf) + jconf = self._dictToJavaMap(conf) jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) - def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, - keyConverter="", valueConverter="", conf={}): + def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile. """ - jconf = self.dictToJavaMap(conf) + jconf = self._dictToJavaMap(conf) jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) - def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, - keyConverter="", valueConverter="", conf={}): + def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -410,22 +412,22 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java """ - jconf = self.dictToJavaMap(conf) + jconf = self._dictToJavaMap(conf) for k, v in conf.iteritems(): jconf[k] = v jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) - def hadoopRDD(self, inputFormatClass, keyClass, valueClass, - keyConverter="", valueConverter="", conf={}): + def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile. """ - jconf = self.dictToJavaMap(conf) + jconf = self._dictToJavaMap(conf) jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer())