From 67ba875f262c054765dfd856da27db4852d707ee Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 16:07:10 -0700 Subject: [PATCH] java to python, and python to java --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 8 ++++++-- python/pyspark/context.py | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0322401765494..31f7cd319a0c0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -289,8 +289,12 @@ private[spark] object PythonRDD { def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { pyRDD.rdd.mapPartitions { iter => val unpickle = new Unpickler - iter.map { row => - unpickle.loads(row) + iter.flatMap { row => + unpickle.loads(row) match { + case objs: java.util.ArrayList[Any] => objs + // Incase the partition doesn't have a collection + case obj => Seq(obj) + } } } } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d8667e84fedff..7a5f9349649f8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -174,6 +174,8 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile + SparkContext._pythonToJava = SparkContext._jvm.PythonRDD.pythonToJava + SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: