Skip to content

Commit

Permalink
Add back msgpack serializer and hadoop file code lost during merging
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 18, 2014
1 parent 25da1ca commit 7237263
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 1 deletion.
105 changes: 104 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, MsgPackDeserializer
PairDeserializer, MsgpackSerializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
Expand Down Expand Up @@ -297,6 +297,109 @@ def wholeTextFiles(self, path):
return RDD(self._jsc.wholeTextFiles(path), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

###

def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
key_wrapper="", value_wrapper="", minSplits=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the SequenceFile, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
>>> sc.sequenceFile("test_support/data/sfint/").collect()
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
>>> sc.sequenceFile("test_support/data/sfdouble/").collect()
[(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')]
>>> sc.sequenceFile("test_support/data/sftext/").collect()
[('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')]
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
minSplits)
return RDD(jrdd, self, MsgpackSerializer())

def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
key_wrapper, value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
that is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
that is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

####

def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ def load_stream(self, stream):
return


class MsgpackSerializer(FramedSerializer):
"""
Deserializes streams written by Scala/Java MsgPack
"""
def loads(self, obj):
return msgpack.loads(obj, use_list=0)

def dumps(self, obj):
return msgpack.dumps(obj)


def read_long(stream):
length = stream.read(8)
if length == "":
Expand Down

0 comments on commit 7237263

Please sign in to comment.