From 15a7d073515ab0f0d71a195b779afa07e76bd2fc Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 3 Jun 2014 15:52:06 +0200 Subject: [PATCH] Remove default args for key/value classes. Arg names to camelCase --- python/pyspark/context.py | 80 ++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a3311e289418e..004712dd7d3bf 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -328,8 +328,14 @@ def wholeTextFiles(self, path, minPartitions=None): return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) - def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", - key_wrapper="", value_wrapper="", minSplits=None): + def dictToJavaMap(self, d): + jm = self._jvm.java.util.HashMap() + for k, v in d.iteritems(): + jm[k] = v + return jm + + def sequenceFile(self, path, keyClass, valueClass, keyConverter="", valueConverter="", + minSplits=None): """ Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. @@ -338,6 +344,13 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class= 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side + + @param path: + @param keyClass: + @param valueClass: + @param keyWrapper: + @param valueWrapper: + @param minSplits: >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect()) [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect()) @@ -355,69 +368,66 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class= True """ minSplits = minSplits or min(self.defaultParallelism, 2) - jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper, - minSplits) + jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, + keyConverter, valueConverter, minSplits) return RDD(jrdd, self, PickleSerializer()) - def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="toString", - value_wrapper="toString", conf={}): + def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, + keyConverter="", valueConverter="", conf={}): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. - A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java """ - jconf = self._jvm.java.util.HashMap() - for k, v in conf.iteritems(): - jconf[k] = v - jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, - key_wrapper, value_wrapper, jconf) + jconf = self.dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) - def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): + def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, + keyConverter="", valueConverter="", conf={}): """ - Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, + Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile. """ - jconf = self._jvm.java.util.HashMap() - for k, v in conf.iteritems(): - jconf[k] = v - jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, - value_wrapper, jconf) + jconf = self.dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) - def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): + def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, + keyConverter="", valueConverter="", conf={}): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. - A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java """ - jconf = self._jvm.java.util.HashMap() + jconf = self.dictToJavaMap(conf) for k, v in conf.iteritems(): jconf[k] = v - jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper, - value_wrapper, jconf) + jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) - def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", - value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): + def hadoopRDD(self, inputFormatClass, keyClass, valueClass, + keyConverter="", valueConverter="", conf={}): """ - Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, + Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile. """ - jconf = self._jvm.java.util.HashMap() - for k, v in conf.iteritems(): - jconf[k] = v - jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, - value_wrapper, jconf) + jconf = self.dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, + keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) def _checkpointFile(self, name, input_deserializer):