Skip to content

Commit

Permalink
Merge pull request alteryx#218 from JoshRosen/spark-970-pyspark-unico…
Browse files Browse the repository at this point in the history
…de-error

Fix UnicodeEncodeError in PySpark saveAsTextFile() (SPARK-970)

This fixes [SPARK-970](https://spark-project.atlassian.net/browse/SPARK-970), an issue where PySpark's saveAsTextFile() could throw UnicodeEncodeError when called on an RDD of Unicode strings.

Please merge this into master and branch-0.8.
  • Loading branch information
rxin committed Dec 3, 2013
2 parents 58d9bbc + 3787f51 commit 8a3475a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
5 changes: 4 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,10 @@ def saveAsTextFile(self, path):
'0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
"""
def func(split, iterator):
return (str(x).encode("utf-8") for x in iterator)
for x in iterator:
if not isinstance(x, basestring):
x = unicode(x)
yield x.encode("utf-8")
keyed = PipelinedRDD(self, func)
keyed._bypass_serializer = True
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
"""
from fileinput import input
from glob import glob
import os
import shutil
import sys
Expand Down Expand Up @@ -138,6 +140,19 @@ def func():
self.assertEqual("Hello World from inside a package!", UserClass().hello())


class TestRDDFunctions(PySparkTestCase):

def test_save_as_textfile_with_unicode(self):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"
data = self.sc.parallelize([x])
tempFile = NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsTextFile(tempFile.name)
raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
self.assertEqual(x, unicode(raw_contents.strip(), "utf-8"))


class TestIO(PySparkTestCase):

def test_stdout_redirection(self):
Expand Down

0 comments on commit 8a3475a

Please sign in to comment.