From 7237263b3f23591c1969df475641aac65bb9ecfe Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Fri, 18 Apr 2014 15:44:08 +0200 Subject: [PATCH] Add back msgpack serializer and hadoop file code lost during merging --- python/pyspark/context.py | 105 +++++++++++++++++++++++++++++++++- python/pyspark/serializers.py | 11 ++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bb18f53ce0110..d697f9ecb6735 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -29,7 +29,7 @@ from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ - PairDeserializer, MsgPackDeserializer + PairDeserializer, MsgpackSerializer from pyspark.storagelevel import StorageLevel from pyspark import rdd from pyspark.rdd import RDD @@ -297,6 +297,109 @@ def wholeTextFiles(self, path): return RDD(self._jsc.wholeTextFiles(path), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) +### + + def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", + key_wrapper="", value_wrapper="", minSplits=None): + """ + Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the SequenceFile, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + + >>> sc.sequenceFile("test_support/data/sfint/").collect() + [(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')] + >>> sc.sequenceFile("test_support/data/sfdouble/").collect() + [(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')] + >>> sc.sequenceFile("test_support/data/sftext/").collect() + [('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')] + """ + minSplits = minSplits or min(self.defaultParallelism, 2) + jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper, + minSplits) + return RDD(jrdd, self, MsgpackSerializer()) + + def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java + """ + jconf = self._jvm.java.util.HashMap() + for k, v in conf.iteritems(): + jconf[k] = v + jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, + key_wrapper, value_wrapper, jconf) + return RDD(jrdd, self, MsgpackSerializer()) + + def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, + that is passed in as a Python dict. This will be converted into a Configuration in Java. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + """ + jconf = self._jvm.java.util.HashMap() + for k, v in conf.iteritems(): + jconf[k] = v + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, + value_wrapper, jconf) + return RDD(jrdd, self, MsgpackSerializer()) + + def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java + """ + jconf = self._jvm.java.util.HashMap() + for k, v in conf.iteritems(): + jconf[k] = v + jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper, + value_wrapper, jconf) + return RDD(jrdd, self, MsgpackSerializer()) + + def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString", + value_wrapper="toString", conf={}): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, + that is passed in as a Python dict. This will be converted into a Configuration in Java. + The mechanism is as follows: + 1. A Java RDD is created from the InputFormat, key and value classes + 2. Serialization is attempted via MsgPack + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{MsgPackDeserializer} is used to deserialize data on the Python side + """ + jconf = self._jvm.java.util.HashMap() + for k, v in conf.iteritems(): + jconf[k] = v + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, + value_wrapper, jconf) + return RDD(jrdd, self, MsgpackSerializer()) + + #### + def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 42b0b847749eb..7a27d9c4343d7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -309,6 +309,17 @@ def load_stream(self, stream): return +class MsgpackSerializer(FramedSerializer): + """ + Deserializes streams written by Scala/Java MsgPack + """ + def loads(self, obj): + return msgpack.loads(obj, use_list=0) + + def dumps(self, obj): + return msgpack.dumps(obj) + + def read_long(stream): length = stream.read(8) if length == "":