Skip to content

Commit

Permalink
Recover earlier changes lost in previous merge for serializers.py
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 23, 2014
1 parent 077ecb2 commit 9ef1896
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from itertools import chain, izip, product
import marshal
import struct
import sys
from pyspark import cloudpickle


Expand Down Expand Up @@ -113,6 +114,11 @@ class FramedSerializer(Serializer):
where C{length} is a 32-bit integer and data is C{length} bytes.
"""

def __init__(self):
# On Python 2.6, we can't write bytearrays to streams, so we need to convert them
# to strings first. Check if the version number is that old.
self._only_write_strings = sys.version_info[0:2] <= (2, 6)

def dump_stream(self, iterator, stream):
for obj in iterator:
self._write_with_length(obj, stream)
Expand All @@ -127,7 +133,10 @@ def load_stream(self, stream):
def _write_with_length(self, obj, stream):
serialized = self.dumps(obj)
write_int(len(serialized), stream)
stream.write(serialized)
if self._only_write_strings:
stream.write(str(serialized))
else:
stream.write(serialized)

def _read_with_length(self, stream):
length = read_int(stream)
Expand Down Expand Up @@ -290,7 +299,7 @@ class MarshalSerializer(FramedSerializer):

class UTF8Deserializer(Serializer):
"""
Deserializes streams written by getBytes.
Deserializes streams written by String.getBytes.
"""

def loads(self, stream):
Expand Down

0 comments on commit 9ef1896

Please sign in to comment.