Skip to content

Commit

Permalink
Clean up args in PythonRDD. Set key/value converter defaults to None …
Browse files Browse the repository at this point in the history
…for PySpark context.py methods
  • Loading branch information
MLnick committed Jun 3, 2014
1 parent 1a4a1d6 commit 94beedc
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 53 deletions.
78 changes: 39 additions & 39 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,14 @@ private[spark] object PythonRDD extends Logging {
}
}

/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
/** Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
def sequenceFile[K, V](
sc: JavaSparkContext,
path: String,
keyClass: String,
valueClass: String,
keyWrapper: String,
valueWrapper: String,
keyConverter: String,
valueConverter: String,
minSplits: Int) = {
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
Expand All @@ -374,18 +374,18 @@ private[spark] object PythonRDD extends Logging {
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverter: String,
valueConverter: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}
Expand All @@ -397,30 +397,30 @@ private[spark] object PythonRDD extends Logging {
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverter: String,
valueConverter: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClazz, keyClazz, valueClazz, conf)
None, inputFormatClass, keyClass, valueClass, conf)
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
Expand All @@ -439,18 +439,18 @@ private[spark] object PythonRDD extends Logging {
def hadoopFile[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverter: String,
valueConverter: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}
Expand All @@ -462,30 +462,30 @@ private[spark] object PythonRDD extends Logging {
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
keyConverter: String,
valueConverter: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClazz, keyClazz, valueClazz, conf)
None, inputFormatClass, keyClass, valueClass, conf)
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
}

private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
inputFormatClass: String,
keyClass: String,
valueClass: String,
conf: Configuration) = {
implicit val kcm = ClassTag(Class.forName(keyClazz)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClazz)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClazz)).asInstanceOf[ClassTag[F]]
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
Expand Down
30 changes: 16 additions & 14 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,15 @@ def wholeTextFiles(self, path, minPartitions=None):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

def dictToJavaMap(self, d):
def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
if not d:
d = {}
for k, v in d.iteritems():
jm[k] = v
return jm

def sequenceFile(self, path, keyClass, valueClass, keyConverter="", valueConverter="",
def sequenceFile(self, path, keyClass, valueClass, keyConverter=None, valueConverter=None,
minSplits=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
Expand Down Expand Up @@ -372,8 +374,8 @@ def sequenceFile(self, path, keyClass, valueClass, keyConverter="", valueConvert
keyConverter, valueConverter, minSplits)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
valueConverter=None, conf=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
Expand All @@ -382,26 +384,26 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass,
A Hadoop configuration can be passed in as a Python dict. This will be converted into a
Configuration in Java
"""
jconf = self.dictToJavaMap(conf)
jconf = self._dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
valueConverter=None, conf=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self.dictToJavaMap(conf)
jconf = self._dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopFile(self, path, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
valueConverter=None, conf=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
Expand All @@ -410,22 +412,22 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass,
A Hadoop configuration can be passed in as a Python dict. This will be converted into a
Configuration in Java
"""
jconf = self.dictToJavaMap(conf)
jconf = self._dictToJavaMap(conf)
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopRDD(self, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
valueConverter=None, conf=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self.dictToJavaMap(conf)
jconf = self._dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())
Expand Down

0 comments on commit 94beedc

Please sign in to comment.