diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ae29728551992..db896768e04a3 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -214,56 +214,104 @@ def textFile(self, name, minSplits=None): MUTF8Deserializer()) ### - def sequenceFile(self, name, keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text", - keyWrapper="", valueWrapper="", minSplits=None): + def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", + key_wrapper="", value_wrapper="", minSplits=None): """ - Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS, - a local file system (available on all nodes), or any Hadoop-supported file system URI, - and return it as an RDD of (String, String) where the key and value representations - are generated using the 'toString()' method of the relevant Java class. + Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the SequenceFile, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side >>> sc.sequenceFile("test_support/data/sfint/").collect() [(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')] + >>> sc.sequenceFile("test_support/data/sfdouble/").collect() + [(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')] >>> sc.sequenceFile("test_support/data/sftext/").collect() [('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')] """ minSplits = minSplits or min(self.defaultParallelism, 2) - jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper, + jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper, minSplits) - #jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyWrapper, valueWrapper, minSplits) - return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer + return RDD(jrdd, self, MsgPackDeserializer()) - def newAPIHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString", - conf = {}): + def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): """ - Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS, - a local file system (available on all nodes), or any Hadoop-supported file system URI, - and return it as an RDD of (String, String), where the key and value representations - are generated using the 'toString()' method of the relevant Java class. + Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java """ jconf = self._jvm.java.util.HashMap() for k, v in conf.iteritems(): jconf[k] = v - jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputFormat, keyClass, valueClass, keyWrapper, - valueWrapper, jconf) + jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, + key_wrapper, value_wrapper, jconf) return RDD(jrdd, self, MsgPackDeserializer()) - def newAPIHadoopRDD(self, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString", - conf = {}): + def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): """ - Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS, - a local file system (available on all nodes), or any Hadoop-supported file system URI, - and return it as an RDD of (String, String), where the key and value representations - are generated using the 'toString()' method of the relevant Java class. + Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, + that is passed in as a Python dict. This will be converted into a Configuration in Java. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side """ jconf = self._jvm.java.util.HashMap() for k, v in conf.iteritems(): jconf[k] = v - jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormat, keyClass, valueClass, keyWrapper, - valueWrapper, jconf) + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, + value_wrapper, jconf) return RDD(jrdd, self, MsgPackDeserializer()) - ### + def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java + """ + jconf = self._jvm.java.util.HashMap() + for k, v in conf.iteritems(): + jconf[k] = v + jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper, + value_wrapper, jconf) + return RDD(jrdd, self, MsgPackDeserializer()) + + def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, + that is passed in as a Python dict. This will be converted into a Configuration in Java. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + """ + jconf = self._jvm.java.util.HashMap() + for k, v in conf.iteritems(): + jconf[k] = v + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, + value_wrapper, jconf) + return RDD(jrdd, self, MsgPackDeserializer()) def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name)