Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2406][SQL] Initial support for using ParquetTableScan to read HiveMetaStore tables. #1819

Closed
wants to merge 12 commits into from
Closed
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ object SQL {
object Hive {

lazy val settings = Seq(

javaOptions += "-XX:MaxPermSize=1g",
// Multiple queries rely on the TestHive singleton. See comments there for more details.
parallelExecution in Test := false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}

/**
* :: DeveloperApi ::
* A plan node that does nothing but lie about the output of its child. Used to spice a
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
* resolved tree.
*/
@DeveloperApi
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children = child :: Nil
def execute() = child.execute()
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
private[sql] case class ParquetRelation(
path: String,
@transient conf: Option[Configuration],
@transient sqlContext: SQLContext)
@transient sqlContext: SQLContext,
partitioningAttributes: Seq[Attribute] = Nil)
extends LeafNode with MultiInstanceRelation {

self: Product =>
Expand All @@ -61,12 +62,13 @@ private[sql] case class ParquetRelation(

/** Attributes */
override val output =
partitioningAttributes ++
ParquetTypesConverter.readSchemaFromFile(
new Path(path),
new Path(path.split(",").head),
conf,
sqlContext.isParquetBinaryAsString)

override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]

// Equals must also take into account the output attributes so that we can distinguish between
// different instances of the same relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
Expand All @@ -42,6 +43,7 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
Expand All @@ -60,28 +62,38 @@ case class ParquetTableScan(
// The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
// by exprId. note: output cannot be transient, see
// https://issues.apache.org/jira/browse/SPARK-1367
val output = attributes.map { a =>
relation.output
.find(o => o.exprId == a.exprId)
.getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
}
val normalOutput =
attributes
.filterNot(a => relation.partitioningAttributes.map(_.exprId).contains(a.exprId))
.flatMap(a => relation.output.find(o => o.exprId == a.exprId))

val partOutput =
attributes.flatMap(a => relation.partitioningAttributes.find(o => o.exprId == a.exprId))

def output = partOutput ++ normalOutput

assert(normalOutput.size + partOutput.size == attributes.size,
s"$normalOutput + $partOutput != $attributes, ${relation.output}")

override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])

val conf: Configuration = ContextUtil.getConfiguration(job)
val qualifiedPath = {
val path = new Path(relation.path)
path.getFileSystem(conf).makeQualified(path)

relation.path.split(",").foreach { curPath =>
val qualifiedPath = {
val path = new Path(curPath)
path.getFileSystem(conf).makeQualified(path)
}
NewFileInputFormat.addInputPath(job, qualifiedPath)
}
NewFileInputFormat.addInputPath(job, qualifiedPath)

// Store both requested and original schema in `Configuration`
conf.set(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(output))
ParquetTypesConverter.convertToString(normalOutput))
conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
ParquetTypesConverter.convertToString(relation.output))
Expand All @@ -102,13 +114,41 @@ case class ParquetTableScan(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))

sc.newAPIHadoopRDD(
conf,
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row])
.map(_._2)
.filter(_ != null) // Parquet's record filters may produce null values
val baseRDD =
new org.apache.spark.rdd.NewHadoopRDD(
sc,
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row],
conf)

if (partOutput.nonEmpty) {
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
.getPath
.toString
.split("/")
.flatMap {
case partValue(key, value) => Some(key -> value)
case _ => None
}.toMap

val partitionRowValues =
partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))

new Iterator[Row] {
private[this] val joinedRow = new JoinedRow(Row(partitionRowValues:_*), null)

def hasNext = iter.hasNext

def next() = joinedRow.withRight(iter.next()._2)
}
}
} else {
baseRDD.map(_._2)
}.filter(_ != null) // Parquet's record filters may produce null values
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Change the default SQL dialect to HiveQL
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")

/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
* SerDe.
*/
private[spark] def convertMetastoreParquet: Boolean =
getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to test this PR soon. In the meantime would it make sense to only put this in SQLConf (as well as a field of the key string in the singleton object), making that class the central place that stores SQL configs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about that. The problem being that this only applies to HiveContexts, so it doesn't really make much sense in a SQLContext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a job for HiveConf extends SQLConf! After all, there's nothing better than confusing users trying to use org.apache.hadoop.hive.conf.HiveConf!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When in doubt, make up longer names: SQLConfigOpts, HiveConfigOpts. But this is only possibly relevant in the future and should not block this PR.


override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

Expand Down Expand Up @@ -326,6 +334,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
TakeOrdered,
ParquetOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
DataSinks,
Scripts,
Expand Down
119 changes: 117 additions & 2 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,136 @@

package org.apache.spark.sql.hive

import org.apache.spark.sql.SQLContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTableScan}

import scala.collection.JavaConversions._

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SQLContext#SparkPlanner =>

val hiveContext: HiveContext

/**
* :: Experimental ::
* Finds table scans that would use the Hive SerDe and replaces them with our own native parquet
* table scan operator.
*
* TODO: Much of this logic is duplicated in HiveTableScan. Ideally we would do some refactoring
* but since this is after the code freeze for 1.1 all logic is here to minimize disruption.
*
* Other issues:
* - Much of this logic assumes case insensitive resolution.
*/
@Experimental
object ParquetConversion extends Strategy {
implicit class LogicalPlanHacks(s: SchemaRDD) {
def lowerCase =
new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))

def addPartitioningAttributes(attrs: Seq[Attribute]) =
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
}

implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
def fakeOutput(newOutput: Seq[Attribute]) =
OutputFaker(
originalPlan.output.map(a =>
newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
.getOrElse(
sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
originalPlan)
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, predicates, relation: MetastoreRelation)
if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
hiveContext.convertMetastoreParquet =>

// Filter out all predicates that only deal with partition keys
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.exprId).subsetOf(partitionKeyIds)
}

// We are going to throw the predicates and projection back at the whole optimization
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
// matching parquet attributes.
val unresolvedOtherPredicates = otherPredicates.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
}).reduceOption(And).getOrElse(Literal(true))

val unresolvedProjection = projectList.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
})

if (relation.hiveQlTable.isPartitioned) {
val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true))
// Translate the predicate so that it automatically casts the input values to the correct
// data types during evaluation
val castedPredicate = rawPredicate transform {
case a: AttributeReference =>
val idx = relation.partitionKeys.indexWhere(a.exprId == _.exprId)
val key = relation.partitionKeys(idx)
Cast(BoundReference(idx, StringType, nullable = true), key.dataType)
}

val inputData = new GenericMutableRow(relation.partitionKeys.size)
val pruningCondition =
if(codegenEnabled) {
GeneratePredicate(castedPredicate)
} else {
InterpretedPredicate(castedPredicate)
}

val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
inputData(i) = partitionValues(i)
i += 1
}
pruningCondition(inputData)
}

hiveContext
.parquetFile(partitions.map(_.getLocation).mkString(","))
.addPartitioningAttributes(relation.partitionKeys)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)):: Nil
} else {
hiveContext
.parquetFile(relation.hiveQlTable.getDataLocation.getPath)
.lowerCase
.where(unresolvedOtherPredicates)
.select(unresolvedProjection:_*)
.queryExecution
.executedPlan
.fakeOutput(projectList.map(_.toAttribute)) :: Nil
}
case _ => Nil
}
}

object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.parquet

import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
import org.apache.hadoop.io.Writable

/**
* A placeholder that allows SparkSQL users to create metastore tables that are stored as
* parquet files. It is only intended to pass the checks that the serde is valid and exists
* when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan
* when "spark.sql.hive.convertMetastoreParquet" is set to true.
*/
@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
"placeholder in the Hive MetaStore")
class FakeParquetSerDe extends SerDe {
override def getObjectInspector: ObjectInspector = new ObjectInspector {
override def getCategory: Category = Category.PRIMITIVE

override def getTypeName: String = "string"
}

override def deserialize(p1: Writable): AnyRef = throwError

override def initialize(p1: Configuration, p2: Properties): Unit = {}

override def getSerializedClass: Class[_ <: Writable] = throwError

override def getSerDeStats: SerDeStats = throwError

override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError

private def throwError =
sys.error(
"spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
}
Loading