Skip to content

Commit

Permalink
Remove msgpack dependency and serializer from PySpark
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 21, 2014
1 parent 450e0a2 commit f60959e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 27 deletions.
29 changes: 15 additions & 14 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, MsgpackSerializer
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
Expand Down Expand Up @@ -303,28 +302,30 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class=
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the SequenceFile, key and value classes
2. Serialization is attempted via MsgPack
1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
2. Serialization is attempted via Pyrolite pickling
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgpackSerializer} is used to deserialize data on the Python side
4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
>>> sc.sequenceFile("test_support/data/sfint/").collect()
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
[(1, u'aa'), (2, u'bb'), (2, u'aa'), (3, u'cc'), (2, u'bb'), (1, u'aa')]
>>> sc.sequenceFile("test_support/data/sfdouble/").collect()
[(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')]
[(1.0, u'aa'), (2.0, u'bb'), (2.0, u'aa'), (3.0, u'cc'), (2.0, u'bb'), (1.0, u'aa')]
>>> sc.sequenceFile("test_support/data/sftext/").collect()
[('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')]
[(u'1', u'aa'), (u'2', u'bb'), (u'2', u'aa'), (u'3', u'cc'), (u'2', u'bb'), (u'1', u'aa')]
>>> sc.sequenceFile("test_support/data/sfbool/").collect()
[(1, True), (2, True), (2, False), (3, True), (2, False), (1, False)]
>>> sc.sequenceFile("test_support/data/sfnull/").collect()
[(1, None), (2, None), (2, None), (3, None), (2, None), (1, None)]
>>> sc.sequenceFile("test_support/data/sfmap/").collect()
[(1, {2.0: 'aa'}), (2, {3.0: 'bb'}), (2, {1.0: 'cc'}), (3, {2.0: 'dd'}), (2, {1.0: 'aa'}), (1, {3.0: 'bb'})]
[(1, {2.0: u'aa'}), (2, {3.0: u'bb'}), (2, {1.0: u'cc'}), (3, {2.0: u'dd'}), (2, {1.0: u'aa'}), (1, {3.0: u'bb'})]
>>> sc.sequenceFile("test_support/data/sfclass").first()
(u'1', {u'int': 123, u'double': 54.0, u'__class__': u'org.apache.spark.api.python.TestWritable', u'str': u'test1'})
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
minSplits)
return RDD(jrdd, self, MsgpackSerializer())
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="toString",
Expand All @@ -341,7 +342,7 @@ def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
key_wrapper, value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
Expand All @@ -355,7 +356,7 @@ def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Tex
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())
return RDD(jrdd, self, PickleSerializer())

def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
Expand All @@ -371,7 +372,7 @@ def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Te
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())
return RDD(jrdd, self, PickleSerializer())

def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
Expand All @@ -385,7 +386,7 @@ def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())
return RDD(jrdd, self, PickleSerializer())

def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
Expand Down
13 changes: 0 additions & 13 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@
from itertools import chain, izip, product
import marshal
import struct
import msgpack
import sys
from pyspark import cloudpickle


Expand Down Expand Up @@ -309,17 +307,6 @@ def load_stream(self, stream):
return


class MsgpackSerializer(FramedSerializer):
"""
Deserializes streams written by Scala/Java MsgPack
"""
def loads(self, obj):
return msgpack.loads(obj, use_list=0)

def dumps(self, obj):
return msgpack.dumps(obj)


def read_long(stream):
length = stream.read(8)
if length == "":
Expand Down

0 comments on commit f60959e

Please sign in to comment.