diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index f15fb113d3754..6aa4c7675674c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType import org.apache.spark.api.java.JavaRDD +import java.util.{Map => JMap} /** * :: AlphaComponent :: @@ -313,11 +314,18 @@ class SchemaRDD( def analyze = sqlContext.analyzer(logicalPlan) def javaToPython: JavaRDD[Array[Byte]] = { + //val fieldNames: Seq[String] = logicalPlan.references.map(_.name) this.mapPartitions { iter => - val unpickle = new Pickler + val pickle = new Pickler iter.map { row => - val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row(i)).toArray - unpickle.dumps(fields) + val fieldNames: Seq[String] = (1 to row.length).map(_.toString + "KEY") //TODO: Temporary + val map: JMap[String, Any] = new java.util.HashMap + val arr: java.util.ArrayList[Any] = new java.util.ArrayList + row.zip(fieldNames).foreach { case (obj, name) => + map.put(name, obj) + } + arr.add(map) + pickle.dumps(arr) } } }