Skip to content

Commit

Permalink
SPARK-1421. Make MLlib work on Python 2.6
Browse files Browse the repository at this point in the history
The reason it wasn't working was passing a bytearray to stream.write(),
which is not supported in Python 2.6 but is in 2.7. (This array came
from NumPy when we converted data to send it over to Java). Now we just
convert those bytearrays to strings of bytes, which preserves
nonprintable characters as well.
  • Loading branch information
mateiz committed Apr 6, 2014
1 parent 2d0150c commit a84d6af
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
6 changes: 1 addition & 5 deletions python/pyspark/mllib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
Python bindings for MLlib.
"""

# MLlib currently needs Python 2.7+ and NumPy 1.7+, so complain if lower

import sys
if sys.version_info[0:2] < (2, 7):
raise Exception("MLlib requires Python 2.7+")
# MLlib currently needs and NumPy 1.7+, so complain if lower

import numpy
if numpy.version.version < '1.7':
Expand Down
11 changes: 10 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from itertools import chain, izip, product
import marshal
import struct
import sys
from pyspark import cloudpickle


Expand Down Expand Up @@ -113,6 +114,11 @@ class FramedSerializer(Serializer):
where C{length} is a 32-bit integer and data is C{length} bytes.
"""

def __init__(self):
# On Python 2.6, we can't write bytearrays to streams, so we need to convert them
# to strings first. Check if the version number is that old.
self._only_write_strings = sys.version_info[0:2] <= (2, 6)

def dump_stream(self, iterator, stream):
for obj in iterator:
self._write_with_length(obj, stream)
Expand All @@ -127,7 +133,10 @@ def load_stream(self, stream):
def _write_with_length(self, obj, stream):
serialized = self.dumps(obj)
write_int(len(serialized), stream)
stream.write(serialized)
if self._only_write_strings:
stream.write(str(serialized))
else:
stream.write(serialized)

def _read_with_length(self, stream):
length = read_int(stream)
Expand Down

0 comments on commit a84d6af

Please sign in to comment.