diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 0af4a312b2c3b..7edc5aaf8ba93 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -28,8 +28,7 @@ from pyspark.conf import SparkConf from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway -from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ - PairDeserializer, MsgpackSerializer +from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, PairDeserializer from pyspark.storagelevel import StorageLevel from pyspark import rdd from pyspark.rdd import RDD @@ -303,28 +302,30 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class= Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows: - 1. A Java RDD is created from the SequenceFile, key and value classes - 2. Serialization is attempted via MsgPack + 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes + 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{MsgpackSerializer} is used to deserialize data on the Python side + 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side >>> sc.sequenceFile("test_support/data/sfint/").collect() - [(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')] + [(1, u'aa'), (2, u'bb'), (2, u'aa'), (3, u'cc'), (2, u'bb'), (1, u'aa')] >>> sc.sequenceFile("test_support/data/sfdouble/").collect() - [(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')] + [(1.0, u'aa'), (2.0, u'bb'), (2.0, u'aa'), (3.0, u'cc'), (2.0, u'bb'), (1.0, u'aa')] >>> sc.sequenceFile("test_support/data/sftext/").collect() - [('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')] + [(u'1', u'aa'), (u'2', u'bb'), (u'2', u'aa'), (u'3', u'cc'), (u'2', u'bb'), (u'1', u'aa')] >>> sc.sequenceFile("test_support/data/sfbool/").collect() [(1, True), (2, True), (2, False), (3, True), (2, False), (1, False)] >>> sc.sequenceFile("test_support/data/sfnull/").collect() [(1, None), (2, None), (2, None), (3, None), (2, None), (1, None)] >>> sc.sequenceFile("test_support/data/sfmap/").collect() - [(1, {2.0: 'aa'}), (2, {3.0: 'bb'}), (2, {1.0: 'cc'}), (3, {2.0: 'dd'}), (2, {1.0: 'aa'}), (1, {3.0: 'bb'})] + [(1, {2.0: u'aa'}), (2, {3.0: u'bb'}), (2, {1.0: u'cc'}), (3, {2.0: u'dd'}), (2, {1.0: u'aa'}), (1, {3.0: u'bb'})] + >>> sc.sequenceFile("test_support/data/sfclass").first() + (u'1', {u'int': 123, u'double': 54.0, u'__class__': u'org.apache.spark.api.python.TestWritable', u'str': u'test1'}) """ minSplits = minSplits or min(self.defaultParallelism, 2) jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper, minSplits) - return RDD(jrdd, self, MsgpackSerializer()) + return RDD(jrdd, self, PickleSerializer()) def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", key_wrapper="toString", @@ -341,7 +342,7 @@ def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop jconf[k] = v jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper, value_wrapper, jconf) - return RDD(jrdd, self, MsgpackSerializer()) + return RDD(jrdd, self, PickleSerializer()) def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): @@ -355,7 +356,7 @@ def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Tex jconf[k] = v jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, value_wrapper, jconf) - return RDD(jrdd, self, MsgpackSerializer()) + return RDD(jrdd, self, PickleSerializer()) def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): @@ -371,7 +372,7 @@ def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Te jconf[k] = v jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper, value_wrapper, jconf) - return RDD(jrdd, self, MsgpackSerializer()) + return RDD(jrdd, self, PickleSerializer()) def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}): @@ -385,7 +386,7 @@ def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text", jconf[k] = v jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper, value_wrapper, jconf) - return RDD(jrdd, self, MsgpackSerializer()) + return RDD(jrdd, self, PickleSerializer()) def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 7a27d9c4343d7..12c63f186a2b7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -64,8 +64,6 @@ from itertools import chain, izip, product import marshal import struct -import msgpack -import sys from pyspark import cloudpickle @@ -309,17 +307,6 @@ def load_stream(self, stream): return -class MsgpackSerializer(FramedSerializer): - """ - Deserializes streams written by Scala/Java MsgPack - """ - def loads(self, obj): - return msgpack.loads(obj, use_list=0) - - def dumps(self, obj): - return msgpack.dumps(obj) - - def read_long(stream): length = stream.read(8) if length == "":