Skip to content

Commit

Permalink
even better
Browse files Browse the repository at this point in the history
  • Loading branch information
ahirreddy committed Apr 15, 2014
1 parent c0fb1c6 commit 4886052
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
3 changes: 2 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,8 @@ def registerAsTable(self, name):
self._jschema_rdd.registerAsTable(name)

def toPython(self):
jrdd = self._sc._javaToPython(self._jschema_rdd)
jrdd = self._jschema_rdd.javaToPython()
#jrdd = self._sc._javaToPython(self._jschema_rdd)
return RDD(jrdd, self._sc, self._sc.serializer)

def _test():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.api.java

import net.razorvine.pickle.{Pickler, Unpickler}

import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -45,4 +47,14 @@ class JavaSchemaRDD(
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)

val rdd = baseSchemaRDD.map(new Row(_))

def javaToPython: JavaRDD[Array[Byte]] = {
this.rdd.mapPartitions { iter =>
val unpickle = new Pickler
iter.map { row =>
val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row.get(i)).toArray
unpickle.dumps(fields)
}
}
}
}

0 comments on commit 4886052

Please sign in to comment.