Skip to content

Commit

Permalink
doesn't crash
Browse files Browse the repository at this point in the history
  • Loading branch information
ahirreddy committed Apr 15, 2014
1 parent b8b904b commit 5496f9f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private object SpecialLengths {
val TIMING_DATA = -3
}

private[spark] object PythonRDD {
object PythonRDD {
val UTF8 = Charset.forName("UTF-8")

def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
Expand Down Expand Up @@ -289,6 +289,7 @@ private[spark] object PythonRDD {
def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
// TODO: Figure out why flatMap is necessay for pyspark
iter.flatMap { row =>
unpickle.loads(row) match {
case objs: java.util.ArrayList[Any] => objs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,32 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
/**
* Applies a schema to an RDD of Array[Any]
*/
def applySchema(rdd: JavaRDD[Array[Any]]): JavaSchemaRDD = {
val fields = rdd.first.map(_.getClass)
def applySchema(rdd: JavaRDD[_]): JavaSchemaRDD = {
val fields = rdd.first match {
case row: java.util.ArrayList[_] => row.toArray.map(_.getClass)
case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}")
}

val schema = fields.zipWithIndex.map { case (klass, index) =>
val dataType = klass match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
case c: Class[_] if c == java.lang.Long.TYPE => LongType
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
// case c: Class[_] if c == java.lang.Short.TYPE => ShortType
// case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
// case c: Class[_] if c == java.lang.Long.TYPE => LongType
// case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
// case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
// case c: Class[_] if c == java.lang.Float.TYPE => FloatType
// case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
}

AttributeReference(index.toString, dataType, true)()
}

val rowRdd = rdd.rdd.mapPartitions { iter =>
iter.map { row =>
new GenericRow(row): ScalaRow
iter.map {
case row: java.util.ArrayList[_] => new GenericRow(row.toArray.asInstanceOf[Array[Any]]): ScalaRow
case row => throw new Exception(s"Rows must be Lists 2 ${row.getClass}")
}
}
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
Expand Down

0 comments on commit 5496f9f

Please sign in to comment.