From e948bd9b3af1fedff1603df236fed9a723e5d324 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 18:26:48 -0700 Subject: [PATCH] yippie --- project/SparkBuild.scala | 1 + python/pyspark/context.py | 5 +++-- .../org/apache/spark/sql/api/java/JavaSQLContext.scala | 8 +++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b37b31bdac815..0685ff76abcc6 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -522,6 +522,7 @@ object SparkBuild extends Build { def extraAssemblySettings() = Seq( test in assembly := {}, + assemblyOption in assembly ~= { _.copy(cacheOutput = false) }, mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 751ed7d674722..22a98a7ec955e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -473,9 +473,10 @@ def __init__(self, sparkContext): def sql(self, sqlQuery): return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self) - def applySchema(self, rdd): + def applySchema(self, rdd, fieldNames): + fieldNames = ListConverter().convert(fieldNames, self._sc._gateway._gateway_client) jrdd = self._sc._pythonToJava(rdd._jrdd) - srdd = self._jsql_ctx.applySchema(jrdd) + srdd = self._jsql_ctx.applySchema(jrdd, fieldNames) return SchemaRDD(srdd, self) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 92bf5bf20c175..bd9fe7fbb0096 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -85,13 +85,13 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { /** * Applies a schema to an RDD of Array[Any] */ - def applySchema(rdd: JavaRDD[_]): JavaSchemaRDD = { + def applySchema(rdd: JavaRDD[_], fieldNames: java.util.ArrayList[Any]): JavaSchemaRDD = { val fields = rdd.first match { case row: java.util.ArrayList[_] => row.toArray.map(_.getClass) case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}") } - val schema = fields.zipWithIndex.map { case (klass, index) => + val schema = fields.zip(fieldNames.toArray).map { case (klass, fieldName) => val dataType = klass match { case c: Class[_] if c == classOf[java.lang.String] => StringType case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType @@ -104,7 +104,9 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType } - AttributeReference(index.toString, dataType, true)() + println(fieldName.toString) + // TODO: No bueno, fieldName.toString used because I can't figure out the casting + AttributeReference(fieldName.toString, dataType, true)() } val rowRdd = rdd.rdd.mapPartitions { iter =>