diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bb4c58f586211..0af4a312b2c3b 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -297,8 +297,6 @@ def wholeTextFiles(self, path): return RDD(self._jsc.wholeTextFiles(path), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) -### - def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", minSplits=None): """ @@ -308,7 +306,7 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class= 1. A Java RDD is created from the SequenceFile, key and value classes 2. Serialization is attempted via MsgPack 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + 4. C{MsgpackSerializer} is used to deserialize data on the Python side >>> sc.sequenceFile("test_support/data/sfint/").collect() [(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')] @@ -316,22 +314,25 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class= [(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')] >>> sc.sequenceFile("test_support/data/sftext/").collect() [('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')] + >>> sc.sequenceFile("test_support/data/sfbool/").collect() + [(1, True), (2, True), (2, False), (3, True), (2, False), (1, False)] + >>> sc.sequenceFile("test_support/data/sfnull/").collect() + [(1, None), (2, None), (2, None), (3, None), (2, None), (1, None)] + >>> sc.sequenceFile("test_support/data/sfmap/").collect() + [(1, {2.0: 'aa'}), (2, {3.0: 'bb'}), (2, {1.0: 'cc'}), (3, {2.0: 'dd'}), (2, {1.0: 'aa'}), (1, {3.0: 'bb'})] """ minSplits = minSplits or min(self.defaultParallelism, 2) jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper, minSplits) return RDD(jrdd, self, MsgpackSerializer()) - def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString", + def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", + value_class="org.apache.hadoop.io.Text", key_wrapper="toString", value_wrapper="toString", conf={}): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. - The mechanism is as follows: - 1. A Java RDD is created from the InputFormat, key and value classes - 2. Serialization is attempted via MsgPack - 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java """ @@ -342,16 +343,12 @@ def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_ key_wrapper, value_wrapper, jconf) return RDD(jrdd, self, MsgpackSerializer()) - def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString", - value_wrapper="toString", conf={}): + def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", + value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, - that is passed in as a Python dict. This will be converted into a Configuration in Java. - The mechanism is as follows: - 1. A Java RDD is created from the InputFormat, key and value classes - 2. Serialization is attempted via MsgPack - 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + which is passed in as a Python dict. This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. """ jconf = self._jvm.java.util.HashMap() for k, v in conf.iteritems(): @@ -360,16 +357,12 @@ def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper value_wrapper, jconf) return RDD(jrdd, self, MsgpackSerializer()) - def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString", - value_wrapper="toString", conf={}): + def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", + value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. - The mechanism is as follows: - 1. A Java RDD is created from the InputFormat, key and value classes - 2. Serialization is attempted via MsgPack - 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java """ @@ -380,16 +373,12 @@ def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrappe value_wrapper, jconf) return RDD(jrdd, self, MsgpackSerializer()) - def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString", - value_wrapper="toString", conf={}): + def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", + value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, - that is passed in as a Python dict. This will be converted into a Configuration in Java. - The mechanism is as follows: - 1. A Java RDD is created from the InputFormat, key and value classes - 2. Serialization is attempted via MsgPack - 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + which is passed in as a Python dict. This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. """ jconf = self._jvm.java.util.HashMap() for k, v in conf.iteritems(): @@ -398,8 +387,6 @@ def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toSt value_wrapper, jconf) return RDD(jrdd, self, MsgpackSerializer()) - #### - def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer)