Skip to content

Commit

Permalink
returning dictionaries works
Browse files Browse the repository at this point in the history
  • Loading branch information
ahirreddy committed Apr 15, 2014
1 parent cd5f79f commit be079de
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap}

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -313,11 +314,18 @@ class SchemaRDD(
def analyze = sqlContext.analyzer(logicalPlan)

def javaToPython: JavaRDD[Array[Byte]] = {
//val fieldNames: Seq[String] = logicalPlan.references.map(_.name)
this.mapPartitions { iter =>
val unpickle = new Pickler
val pickle = new Pickler
iter.map { row =>
val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row(i)).toArray
unpickle.dumps(fields)
val fieldNames: Seq[String] = (1 to row.length).map(_.toString + "KEY") //TODO: Temporary
val map: JMap[String, Any] = new java.util.HashMap
val arr: java.util.ArrayList[Any] = new java.util.ArrayList
row.zip(fieldNames).foreach { case (obj, name) =>
map.put(name, obj)
}
arr.add(map)
pickle.dumps(arr)
}
}
}
Expand Down

0 comments on commit be079de

Please sign in to comment.