From 212d5cd2bda5f0a5c9899923c7257ea99e9077bc Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 6 Aug 2014 16:25:12 -0700 Subject: [PATCH 1/9] Initial support for using ParquetTableScan to read HiveMetaStore tables. --- .../spark/sql/execution/basicOperators.scala | 12 ++ .../apache/spark/sql/hive/HiveContext.scala | 9 ++ .../spark/sql/hive/HiveStrategies.scala | 105 ++++++++++++++- .../sql/parquet/ParquetMetastoreSuite.scala | 126 ++++++++++++++++++ 4 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 0027f3cf1fc79..f9dfa3c92f1eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } } + +/** + * :: DeveloperApi :: + * A plan node that does nothing but lie about the output of its child. Used to spice a + * (hopefully structurally equivalent) tree from a different optimization sequence into an already + * resolved tree. + */ +@DeveloperApi +case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan { + def children = child :: Nil + def execute() = child.execute() +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 53f3dc11dbb9f..cae3be4d79490 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -78,6 +78,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // Change the default SQL dialect to HiveQL override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql") + /** + * When true, enables an experimental feature where metastore tables that use the parquet SerDe + * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive + * SerDe. + */ + private[spark] def convertMetastoreParquet: Boolean = + getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true" + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan } @@ -328,6 +336,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { TakeOrdered, ParquetOperations, InMemoryScans, + ParquetConversion, // Must be before HiveTableScans HiveTableScans, DataSinks, Scripts, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 2175c5f3835a6..e90fad6de93b6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,14 +17,20 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.SQLContext +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.parquet.ParquetTableScan + +import scala.collection.JavaConversions._ private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -32,6 +38,101 @@ private[hive] trait HiveStrategies { val hiveContext: HiveContext + /** + * :: Experimental :: + * Finds table scans that would use the Hive SerDe and replaces them with our own native parquet + * table scan operator. + * + * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring + * but since this is after the code freeze for 1.1 all logic is here to minimize disruption. + */ + @Experimental + object ParquetConversion extends Strategy { + implicit class LogicalPlanHacks(s: SchemaRDD) { + def lowerCase = + new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan)) + } + + implicit class PhysicalPlanHacks(s: SparkPlan) { + def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker(newOutput, s) + } + + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) + if relation.tableDesc.getSerdeClassName.contains("Parquet") && + hiveContext.convertMetastoreParquet => + + // Filter out all predicates that only deal with partition keys + val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet + val (pruningPredicates, otherPredicates) = predicates.partition { + _.references.map(_.exprId).subsetOf(partitionKeyIds) + } + + // We are going to throw the predicates and projection back at the whole optimization + // sequence so lets unresolve all the attributes, allowing them to be rebound to the + // matching parquet attributes. + val unresolvedOtherPredicates = otherPredicates.map(_ transform { + case a: AttributeReference => UnresolvedAttribute(a.name) + }).reduceOption(And).getOrElse(Literal(true)) + + val unresolvedProjection = projectList.map(_ transform { + case a: AttributeReference => UnresolvedAttribute(a.name) + }) + + if (relation.hiveQlTable.isPartitioned) { + val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) + // Translate the predicate so that it automatically casts the input values to the correct + // data types during evaluation + val castedPredicate = rawPredicate transform { + case a: AttributeReference => + val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) + val key = relation.partitionKeys(idx) + Cast(BoundReference(idx, StringType, nullable = true), key.dataType) + } + + val inputData = new GenericMutableRow(relation.partitionKeys.size) + val pruningCondition = + if(codegenEnabled) { + GeneratePredicate(castedPredicate) + } else { + InterpretedPredicate(castedPredicate) + } + + val partitions = relation.hiveQlPartitions.filter { part => + val partitionValues = part.getValues + var i = 0 + while (i < partitionValues.size()) { + inputData(i) = partitionValues(i) + i += 1 + } + pruningCondition(inputData) + } + + org.apache.spark.sql.execution.Union( + partitions.par.map(p => + hiveContext + .parquetFile(p.getLocation) + .lowerCase + .where(unresolvedOtherPredicates) + .select(unresolvedProjection:_*) + .queryExecution + .executedPlan + .fakeOutput(projectList.map(_.toAttribute))).seq) :: Nil + + } else { + hiveContext + .parquetFile(relation.hiveQlTable.getDataLocation.getPath) + .lowerCase + .where(unresolvedOtherPredicates) + .select(unresolvedProjection:_*) + .queryExecution + .executedPlan + .fakeOutput(projectList.map(_.toAttribute)) :: Nil + } + case _ => Nil + } + } + object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala new file mode 100644 index 0000000000000..3494c2fd14c29 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -0,0 +1,126 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.io.File + +import org.scalatest.BeforeAndAfterAll + +import scala.reflect.ClassTag + +import org.apache.spark.sql.{SQLConf, QueryTest} +import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ + +case class ParquetData(intField: Int, stringField: String) + +/** + * Tests for our SerDe -> Native parquet scan conversion. + */ +class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + setConf("spark.sql.hive.convertMetastoreParquet", "true") + } + + override def afterAll(): Unit = { + setConf("spark.sql.hive.convertMetastoreParquet", "false") + } + + val partitionedTableDir = File.createTempFile("parquettests", "sparksql") + partitionedTableDir.delete() + partitionedTableDir.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-$p")) + .saveAsParquetFile(partDir.getCanonicalPath) + } + + sql(s""" + create external table partitioned_parquet + ( + intField INT, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDir.getCanonicalPath}' + """) + + sql(s""" + create external table normal_parquet + ( + intField INT, + stringField STRING + ) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${new File(partitionedTableDir, "p=1").getCanonicalPath}' + """) + + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)") + } + + test("simple count") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet"), + 100) + } + + test("pruned count") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"), + 10) + } + + test("multi-partition pruned count") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"), + 30) + } + + test("non-partition predicates") { + checkAnswer( + sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"), + 30) + } + + test("sum") { + checkAnswer( + sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"), + 1 + 2 + 3 + ) + } + + test("non-part select(*)") { + checkAnswer( + sql("SELECT COUNT(*) FROM normal_parquet"), + 10 + ) + } +} From 1161338ad9a103d6da0ae546d2cba0ea4b3986a6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 6 Aug 2014 17:30:20 -0700 Subject: [PATCH 2/9] Add a test to make sure conversion is actually happening --- .../spark/sql/parquet/ParquetMetastoreSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala index 3494c2fd14c29..e5d76c8d71b48 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.parquet import java.io.File +import org.apache.spark.sql.hive.execution.HiveTableScan import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag @@ -123,4 +124,15 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { 10 ) } + + test("conversion is working") { + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: HiveTableScan => true + }.isEmpty) + assert( + sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { + case _: ParquetTableScan => true + }.nonEmpty) + } } From a0baec75dffbdd262e90ec5e59363e0d56ea882b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 7 Aug 2014 15:51:04 -0700 Subject: [PATCH 3/9] Partitioning columns can be resolved. --- .../spark/sql/hive/HiveStrategies.scala | 22 ++++++++++++++----- .../sql/parquet/ParquetMetastoreSuite.scala | 16 ++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index e90fad6de93b6..de301fc8d6254 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -76,7 +76,8 @@ private[hive] trait HiveStrategies { }).reduceOption(And).getOrElse(Literal(true)) val unresolvedProjection = projectList.map(_ transform { - case a: AttributeReference => UnresolvedAttribute(a.name) + // Handle non-partitioning columns + case a: AttributeReference if !partitionKeyIds.contains(a.exprId) => UnresolvedAttribute(a.name) }) if (relation.hiveQlTable.isPartitioned) { @@ -109,16 +110,27 @@ private[hive] trait HiveStrategies { } org.apache.spark.sql.execution.Union( - partitions.par.map(p => + partitions.par.map { p => + val partValues = p.getValues() + val internalProjection = unresolvedProjection.map(_ transform { + // Handle partitioning columns + case a: AttributeReference if partitionKeyIds.contains(a.exprId) => { + val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) + val key = relation.partitionKeys(idx) + + Alias(Cast(Literal(partValues.get(idx), StringType), key.dataType), a.name)() + } + }) + hiveContext .parquetFile(p.getLocation) .lowerCase .where(unresolvedOtherPredicates) - .select(unresolvedProjection:_*) + .select(internalProjection:_*) .queryExecution .executedPlan - .fakeOutput(projectList.map(_.toAttribute))).seq) :: Nil - + .fakeOutput(projectList.map(_.toAttribute)) + }.seq) :: Nil } else { hiveContext .parquetFile(relation.hiveQlTable.getDataLocation.getPath) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala index e5d76c8d71b48..9368536a11e5b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -87,6 +87,22 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)") } + test("project the partitioning column") { + checkAnswer( + sql("SELECT p, count(*) FROM partitioned_parquet group by p"), + (1, 10) :: + (2, 10) :: + (3, 10) :: + (4, 10) :: + (5, 10) :: + (6, 10) :: + (7, 10) :: + (8, 10) :: + (9, 10) :: + (10, 10) :: Nil + ) + } + test("simple count") { checkAnswer( sql("SELECT COUNT(*) FROM partitioned_parquet"), From c0d9b726f210f67e290c790c4c4165eae45fc8d3 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 7 Aug 2014 20:23:20 -0700 Subject: [PATCH 4/9] Avoid creating a HadoopRDD per partition. Add dirty hacks to retrieve partition values from the InputSplit. --- .../spark/sql/parquet/ParquetRelation.scala | 7 +- .../sql/parquet/ParquetTableOperations.scala | 74 ++++++++++++++----- .../spark/sql/hive/HiveStrategies.scala | 43 +++++------ 3 files changed, 79 insertions(+), 45 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index b3bae5db0edbc..b3a12cdc74035 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -47,7 +47,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} private[sql] case class ParquetRelation( path: String, @transient conf: Option[Configuration], - @transient sqlContext: SQLContext) + @transient sqlContext: SQLContext, + partitioningAttributes: Seq[Attribute] = Nil) extends LeafNode with MultiInstanceRelation { self: Product => @@ -60,7 +61,9 @@ private[sql] case class ParquetRelation( .getSchema /** Attributes */ - override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf) + override val output = + partitioningAttributes ++ + ParquetTypesConverter.readSchemaFromFile(new Path(path.split(",").head), conf) override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 759a2a586b926..68141ce83c796 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -42,7 +42,7 @@ import parquet.schema.MessageType import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} import org.apache.spark.{Logging, SerializableWritable, TaskContext} @@ -59,11 +59,18 @@ case class ParquetTableScan( // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes // by exprId. note: output cannot be transient, see // https://issues.apache.org/jira/browse/SPARK-1367 - val output = attributes.map { a => - relation.output - .find(o => o.exprId == a.exprId) - .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}")) - } + val normalOutput = + attributes + .filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId)) + .flatMap(a => relation.output.find(o => o.exprId == a.exprId)) + + val partOutput = + attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId)) + + def output = partOutput ++ normalOutput + + assert(normalOutput.size + partOutput.size == attributes.size, + s"$normalOutput + $partOutput != $attributes, ${relation.output}") override def execute(): RDD[Row] = { val sc = sqlContext.sparkContext @@ -71,16 +78,19 @@ case class ParquetTableScan( ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport]) val conf: Configuration = ContextUtil.getConfiguration(job) - val qualifiedPath = { - val path = new Path(relation.path) - path.getFileSystem(conf).makeQualified(path) + + relation.path.split(",").foreach { curPath => + val qualifiedPath = { + val path = new Path(curPath) + path.getFileSystem(conf).makeQualified(path) + } + NewFileInputFormat.addInputPath(job, qualifiedPath) } - NewFileInputFormat.addInputPath(job, qualifiedPath) // Store both requested and original schema in `Configuration` conf.set( RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, - ParquetTypesConverter.convertToString(output)) + ParquetTypesConverter.convertToString(normalOutput)) conf.set( RowWriteSupport.SPARK_ROW_SCHEMA, ParquetTypesConverter.convertToString(relation.output)) @@ -96,13 +106,41 @@ case class ParquetTableScan( ParquetFilters.serializeFilterExpressions(columnPruningPred, conf) } - sc.newAPIHadoopRDD( - conf, - classOf[FilteringParquetRowInputFormat], - classOf[Void], - classOf[Row]) - .map(_._2) - .filter(_ != null) // Parquet's record filters may produce null values + val baseRDD = + new org.apache.spark.rdd.NewHadoopRDD( + sc, + classOf[FilteringParquetRowInputFormat], + classOf[Void], + classOf[Row], + conf) + + if (partOutput.nonEmpty) { + baseRDD.mapPartitionsWithInputSplit { case (split, iter) => + val partValue = "([^=]+)=([^=]+)".r + val partValues = + split.asInstanceOf[parquet.hadoop.ParquetInputSplit] + .getPath + .toString + .split("/") + .flatMap { + case partValue(key, value) => Some(key -> value) + case _ => None + }.toMap + + val partitionRowValues = + partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) + + new Iterator[Row] { + private[this] val joinedRow = new JoinedRow(Row(partitionRowValues:_*), null) + + def hasNext = iter.hasNext + + def next() = joinedRow.withRight(iter.next()._2) + } + } + } else { + baseRDD.map(_._2) + }.filter(_ != null) // Parquet's record filters may produce null values } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index de301fc8d6254..f2be1eae410ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.columnar.InMemoryRelation -import org.apache.spark.sql.parquet.ParquetTableScan +import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan} import scala.collection.JavaConversions._ @@ -51,6 +51,13 @@ private[hive] trait HiveStrategies { implicit class LogicalPlanHacks(s: SchemaRDD) { def lowerCase = new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan)) + + def addPartitioningAttributes(attrs: Seq[Attribute]) = + new SchemaRDD( + s.sqlContext, + s.logicalPlan transform { + case p: ParquetRelation => p.copy(partitioningAttributes = attrs) + }) } implicit class PhysicalPlanHacks(s: SparkPlan) { @@ -76,8 +83,7 @@ private[hive] trait HiveStrategies { }).reduceOption(And).getOrElse(Literal(true)) val unresolvedProjection = projectList.map(_ transform { - // Handle non-partitioning columns - case a: AttributeReference if !partitionKeyIds.contains(a.exprId) => UnresolvedAttribute(a.name) + case a: AttributeReference => UnresolvedAttribute(a.name) }) if (relation.hiveQlTable.isPartitioned) { @@ -109,28 +115,15 @@ private[hive] trait HiveStrategies { pruningCondition(inputData) } - org.apache.spark.sql.execution.Union( - partitions.par.map { p => - val partValues = p.getValues() - val internalProjection = unresolvedProjection.map(_ transform { - // Handle partitioning columns - case a: AttributeReference if partitionKeyIds.contains(a.exprId) => { - val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId) - val key = relation.partitionKeys(idx) - - Alias(Cast(Literal(partValues.get(idx), StringType), key.dataType), a.name)() - } - }) - - hiveContext - .parquetFile(p.getLocation) - .lowerCase - .where(unresolvedOtherPredicates) - .select(internalProjection:_*) - .queryExecution - .executedPlan - .fakeOutput(projectList.map(_.toAttribute)) - }.seq) :: Nil + hiveContext + .parquetFile(partitions.map(_.getLocation).mkString(",")) + .addPartitioningAttributes(relation.partitionKeys) + .lowerCase + .where(unresolvedOtherPredicates) + .select(unresolvedProjection:_*) + .queryExecution + .executedPlan + .fakeOutput(projectList.map(_.toAttribute)):: Nil } else { hiveContext .parquetFile(relation.hiveQlTable.getDataLocation.getPath) From ebb267e4b6b9fdf7374540fc3ab51a7da4badc01 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 7 Aug 2014 20:23:34 -0700 Subject: [PATCH 5/9] include parquet hive to tests pass (Remove this later). --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 40b588512ff08..078ff20fe4f47 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -221,7 +221,7 @@ object SQL { object Hive { lazy val settings = Seq( - + libraryDependencies += "com.twitter" % "parquet-hive-bundle" % "1.5.0", javaOptions += "-XX:MaxPermSize=1g", // Multiple queries rely on the TestHive singleton. See comments there for more details. parallelExecution in Test := false, From 4c4dc19e29e419400c8894e9b3b1608038b15c18 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 8 Aug 2014 13:23:51 -0700 Subject: [PATCH 6/9] Fix bug with tree splicing. --- .../apache/spark/sql/hive/HiveStrategies.scala | 13 +++++++++++-- .../sql/parquet/ParquetMetastoreSuite.scala | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index f2be1eae410ef..d644061215e0c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -45,6 +45,9 @@ private[hive] trait HiveStrategies { * * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring * but since this is after the code freeze for 1.1 all logic is here to minimize disruption. + * + * Other issues: + * - Much of this logic assumes case insensitive resolution. */ @Experimental object ParquetConversion extends Strategy { @@ -60,8 +63,14 @@ private[hive] trait HiveStrategies { }) } - implicit class PhysicalPlanHacks(s: SparkPlan) { - def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker(newOutput, s) + implicit class PhysicalPlanHacks(originalPlan: SparkPlan) { + def fakeOutput(newOutput: Seq[Attribute]) = + OutputFaker( + originalPlan.output.map(a => + newOutput.find(a.name.toLowerCase == _.name.toLowerCase) + .getOrElse( + sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))), + originalPlan) } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala index 9368536a11e5b..0723be7298e15 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -103,6 +103,23 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { ) } + test("project partitioning and non-partitioning columns") { + checkAnswer( + sql("SELECT stringField, p, count(intField) " + + "FROM partitioned_parquet GROUP BY p, stringField"), + ("part-1", 1, 10) :: + ("part-2", 2, 10) :: + ("part-3", 3, 10) :: + ("part-4", 4, 10) :: + ("part-5", 5, 10) :: + ("part-6", 6, 10) :: + ("part-7", 7, 10) :: + ("part-8", 8, 10) :: + ("part-9", 9, 10) :: + ("part-10", 10, 10) :: Nil + ) + } + test("simple count") { checkAnswer( sql("SELECT COUNT(*) FROM partitioned_parquet"), From 41ebc5f912093fdf7b21808ce19da1bae514435e Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 16 Aug 2014 13:09:10 -0700 Subject: [PATCH 7/9] remove hive parquet bundle --- project/SparkBuild.scala | 1 - .../sql/hive/parquet/FakeParquetSerDe.scala | 56 +++++++++++++++++++ .../sql/parquet/ParquetMetastoreSuite.scala | 10 +--- 3 files changed, 58 insertions(+), 9 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c29b4711094d7..49d52aefca17a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -228,7 +228,6 @@ object SQL { object Hive { lazy val settings = Seq( - libraryDependencies += "com.twitter" % "parquet-hive-bundle" % "1.5.0", javaOptions += "-XX:MaxPermSize=1g", // Multiple queries rely on the TestHive singleton. See comments there for more details. parallelExecution in Test := false, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala new file mode 100644 index 0000000000000..4494bf37747a2 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.parquet + +import java.util.Properties + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category +import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector +import org.apache.hadoop.io.Writable + +/** + * A placeholder that allows SparkSQL users to create metastore tables that are stored as + * parquet files. It is only intended to pass the checks that the serde is valid and exists + * when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan + * when "spark.sql.hive.convertMetastoreParquet" is set to true. + */ +@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " + + "placeholder in the Hive MetaStore") +class FakeParquetSerDe extends SerDe { + override def getObjectInspector: ObjectInspector = new ObjectInspector { + override def getCategory: Category = Category.PRIMITIVE + + override def getTypeName: String = "string" + } + + override def deserialize(p1: Writable): AnyRef = throwError + + override def initialize(p1: Configuration, p2: Properties): Unit = {} + + override def getSerializedClass: Class[_ <: Writable] = throwError + + override def getSerDeStats: SerDeStats = throwError + + override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError + + private def throwError = + sys.error( + "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe") +} \ No newline at end of file diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala index 0723be7298e15..8bc4b7927fd34 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -63,10 +63,7 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { stringField STRING ) PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + ROW FORMAT SERDE 'org.apache.spark.sql.hive.parquet.FakeParquetSerDe' location '${partitionedTableDir.getCanonicalPath}' """) @@ -76,10 +73,7 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { intField INT, stringField STRING ) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + ROW FORMAT SERDE 'org.apache.spark.sql.hive.parquet.FakeParquetSerDe' location '${new File(partitionedTableDir, "p=1").getCanonicalPath}' """) From 4f3d54ff8a9111ac3c340bc077dfefd62eb1dce2 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 17 Aug 2014 18:58:23 -0700 Subject: [PATCH 8/9] fix style --- .../org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala index 4494bf37747a2..544abfc32423c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala @@ -53,4 +53,4 @@ class FakeParquetSerDe extends SerDe { private def throwError = sys.error( "spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe") -} \ No newline at end of file +} From 162007913f962910d40be7f03a39cf2541ab8dcc Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 18 Aug 2014 11:55:57 -0700 Subject: [PATCH 9/9] Revert "remove hive parquet bundle" This reverts commit 41ebc5f912093fdf7b21808ce19da1bae514435e. Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/parquet/FakeParquetSerDe.scala --- .../spark/sql/parquet/ParquetMetastoreSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala index 8bc4b7927fd34..0723be7298e15 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -63,7 +63,10 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { stringField STRING ) PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.spark.sql.hive.parquet.FakeParquetSerDe' + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' location '${partitionedTableDir.getCanonicalPath}' """) @@ -73,7 +76,10 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { intField INT, stringField STRING ) - ROW FORMAT SERDE 'org.apache.spark.sql.hive.parquet.FakeParquetSerDe' + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' location '${new File(partitionedTableDir, "p=1").getCanonicalPath}' """)