diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 3299e86b85941..1d5f033f0d274 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -60,3 +60,19 @@ case class ExplainCommand(plan: LogicalPlan) extends Command { * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command. */ case class CacheCommand(tableName: String, doCache: Boolean) extends Command + +/** + * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. + * @param table The table to be described. + * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. + * It is effective only when the table is a Hive table. + */ +case class DescribeCommand( + table: LogicalPlan, + isExtended: Boolean) extends Command { + override def output = Seq( + // Column names are based on Hive. + BoundReference(0, AttributeReference("col_name", StringType, nullable = false)()), + BoundReference(1, AttributeReference("data_type", StringType, nullable = false)()), + BoundReference(2, AttributeReference("comment", StringType, nullable = false)())) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index f5d0834a4993d..acb1b0f4dc229 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -121,3 +121,24 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: override def output: Seq[Attribute] = Seq.empty } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( + @transient context: SQLContext) + extends LeafNode with Command { + + override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { + Seq(("# Registered as a temporary table", null, null)) ++ + child.output.map(field => (field.name, field.dataType.toString, null)) + } + + override def execute(): RDD[Row] = { + val rows = sideEffectResult.map { + case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) + } + context.sparkContext.parallelize(rows, 1) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index cc95b7af0abf6..7695242a81601 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.{Command => PhysicalCommand} +import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand /** * Starts up an instance of hive where metadata is stored locally. An in-process metadata data is @@ -291,6 +292,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * execution is simply passed back to Hive. */ def stringResult(): Seq[String] = executedPlan match { + case describeHiveTableCommand: DescribeHiveTableCommand => + // If it is a describe command for a Hive table, we want to have the output format + // be similar with Hive. + describeHiveTableCommand.hiveString case command: PhysicalCommand => command.sideEffectResult.map(_.toString) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 844673f66d103..df761b073a75a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -52,7 +52,6 @@ private[hive] case class AddFile(filePath: String) extends Command private[hive] object HiveQl { protected val nativeCommands = Seq( "TOK_DESCFUNCTION", - "TOK_DESCTABLE", "TOK_DESCDATABASE", "TOK_SHOW_TABLESTATUS", "TOK_SHOWDATABASES", @@ -120,6 +119,12 @@ private[hive] object HiveQl { "TOK_SWITCHDATABASE" ) + // Commands that we do not need to explain. + protected val noExplainCommands = Seq( + "TOK_CREATETABLE", + "TOK_DESCTABLE" + ) ++ nativeCommands + /** * A set of implicit transformations that allow Hive ASTNodes to be rewritten by transformations * similar to [[catalyst.trees.TreeNode]]. @@ -362,13 +367,20 @@ private[hive] object HiveQl { } } + protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = { + val (db, tableName) = + tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match { + case Seq(tableOnly) => (None, tableOnly) + case Seq(databaseName, table) => (Some(databaseName), table) + } + + (db, tableName) + } + protected def nodeToPlan(node: Node): LogicalPlan = node match { // Just fake explain for any of the native commands. - case Token("TOK_EXPLAIN", explainArgs) if nativeCommands contains explainArgs.head.getText => - ExplainCommand(NoRelation) - // Create tables aren't native commands due to CTAS queries, but we still don't need to - // explain them. - case Token("TOK_EXPLAIN", explainArgs) if explainArgs.head.getText == "TOK_CREATETABLE" => + case Token("TOK_EXPLAIN", explainArgs) + if noExplainCommands.contains(explainArgs.head.getText) => ExplainCommand(NoRelation) case Token("TOK_EXPLAIN", explainArgs) => // Ignore FORMATTED if present. @@ -377,6 +389,39 @@ private[hive] object HiveQl { // TODO: support EXTENDED? ExplainCommand(nodeToPlan(query)) + case Token("TOK_DESCTABLE", describeArgs) => + // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL + val Some(tableType) :: formatted :: extended :: pretty :: Nil = + getClauses(Seq("TOK_TABTYPE", "FORMATTED", "EXTENDED", "PRETTY"), describeArgs) + if (formatted.isDefined || pretty.isDefined) { + // FORMATTED and PRETTY are not supported and this statement will be treated as + // a Hive native command. + NativePlaceholder + } else { + tableType match { + case Token("TOK_TABTYPE", nameParts) if nameParts.size == 1 => { + nameParts.head match { + case Token(".", dbName :: tableName :: Nil) => + // It is describing a table with the format like "describe db.table". + // TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue. + val (db, tableName) = extractDbNameTableName(nameParts.head) + DescribeCommand( + UnresolvedRelation(db, tableName, None), extended.isDefined) + case Token(".", dbName :: tableName :: colName :: Nil) => + // It is describing a column with the format like "describe db.table column". + NativePlaceholder + case tableName => + // It is describing a table with the format like "describe table". + DescribeCommand( + UnresolvedRelation(None, tableName.getText, None), + extended.isDefined) + } + } + // All other cases. + case _ => NativePlaceholder + } + } + case Token("TOK_CREATETABLE", children) if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty => // TODO: Parse other clauses. @@ -414,11 +459,8 @@ private[hive] object HiveQl { s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}") } - val (db, tableName) = - tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match { - case Seq(tableOnly) => (None, tableOnly) - case Seq(databaseName, table) => (Some(databaseName), table) - } + val (db, tableName) = extractDbNameTableName(tableNameParts) + InsertIntoCreatedTable(db, tableName, nodeToPlan(query)) // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command. @@ -736,11 +778,7 @@ private[hive] object HiveQl { val Some(tableNameParts) :: partitionClause :: Nil = getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs) - val (db, tableName) = - tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match { - case Seq(tableOnly) => (None, tableOnly) - case Seq(databaseName, table) => (Some(databaseName), table) - } + val (db, tableName) = extractDbNameTableName(tableNameParts) val partitionKeys = partitionClause.map(_.getChildren.map { // Parse partitions. We also make keys case insensitive. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 0ac0ee9071f36..af7687b40429b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -81,6 +81,16 @@ private[hive] trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil + case describe: logical.DescribeCommand => { + val resolvedTable = context.executePlan(describe.table).analyzed + resolvedTable match { + case t: MetastoreRelation => + Seq(DescribeHiveTableCommand( + t, describe.output, describe.isExtended)(context)) + case o: LogicalPlan => + Seq(DescribeCommand(planLater(o), describe.output)(context)) + } + } case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala index 240aa0df4935a..9d4bb6534e3f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.MetaStoreUtils +import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} +import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -456,3 +458,61 @@ case class NativeCommand( override def otherCopyArgs = context :: Nil } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class DescribeHiveTableCommand( + table: MetastoreRelation, + output: Seq[Attribute], + isExtended: Boolean)( + @transient context: HiveContext) + extends LeafNode with Command { + + // Strings with the format like Hive. It is used for result comparison in our unit tests. + lazy val hiveString: Seq[String] = { + val alignment = 20 + val delim = "\t" + + sideEffectResult.map { + case (name, dataType, comment) => + String.format("%-" + alignment + "s", name) + delim + + String.format("%-" + alignment + "s", dataType) + delim + + String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) + } + } + + override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { + // Trying to mimic the format of Hive's output. But not exactly the same. + var results: Seq[(String, String, String)] = Nil + + val columns: Seq[FieldSchema] = table.hiveQlTable.getCols + val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols + results ++= columns.map(field => (field.getName, field.getType, field.getComment)) + if (!partitionColumns.isEmpty) { + val partColumnInfo = + partitionColumns.map(field => (field.getName, field.getType, field.getComment)) + results ++= + partColumnInfo ++ + Seq(("# Partition Information", "", "")) ++ + Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++ + partColumnInfo + } + + if (isExtended) { + results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) + } + + results + } + + override def execute(): RDD[Row] = { + val rows = sideEffectResult.map { + case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) + } + context.sparkContext.parallelize(rows, 1) + } + + override def otherCopyArgs = context :: Nil +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 24c929ff7430d..08ef4d9b6bb93 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -144,6 +144,12 @@ abstract class HiveComparisonTest case _: SetCommand => Seq("0") case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer + case _: DescribeCommand => + // Filter out non-deterministic lines and lines which do not have actual results but + // can introduce problems because of the way Hive formats these lines. + // Then, remove empty lines. Do not sort the results. + answer.filterNot( + r => nonDeterministicLine(r) || ignoredLine(r)).map(_.trim).filterNot(_ == "") case plan => if (isSorted(plan)) answer else answer.sorted } orderedAnswer.map(cleanPaths) @@ -169,6 +175,16 @@ abstract class HiveComparisonTest protected def nonDeterministicLine(line: String) = nonDeterministicLineIndicators.exists(line contains _) + // This list contains indicators for those lines which do not have actual results and we + // want to ignore. + lazy val ignoredLineIndicators = Seq( + "# Partition Information", + "# col_name" + ) + + protected def ignoredLine(line: String) = + ignoredLineIndicators.exists(line contains _) + /** * Removes non-deterministic paths from `str` so cached answers will compare correctly. */ @@ -329,11 +345,17 @@ abstract class HiveComparisonTest if ((!hiveQuery.logical.isInstanceOf[ExplainCommand]) && preparedHive != catalyst) { - val hivePrintOut = s"== HIVE - ${hive.size} row(s) ==" +: preparedHive + val hivePrintOut = s"== HIVE - ${preparedHive.size} row(s) ==" +: preparedHive val catalystPrintOut = s"== CATALYST - ${catalyst.size} row(s) ==" +: catalyst val resultComparison = sideBySide(hivePrintOut, catalystPrintOut).mkString("\n") + println("hive output") + hive.foreach(println) + + println("catalyst printout") + catalyst.foreach(println) + if (recomputeCache) { logger.warn(s"Clearing cache files for failed test $testCaseName") hiveCacheFiles.foreach(_.delete()) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index ee194dbcb77b2..cdfc2d0c17384 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -78,7 +78,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "alter_merge", "alter_concatenate_indexed_table", "protectmode2", - "describe_table", + //"describe_table", "describe_comment_nonascii", "udf5", "udf_java_method", @@ -177,7 +177,16 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // After stop taking the `stringOrError` route, exceptions are thrown from these cases. // See SPARK-2129 for details. "join_view", - "mergejoins_mixed" + "mergejoins_mixed", + + // Returning the result of a describe state as a JSON object is not supported. + "describe_table_json", + "describe_database_json", + "describe_formatted_view_partitioned_json", + + // Hive returns the results of describe as plain text. Comments with multiple lines + // introduce extra lines in the Hive results, which make the result comparison fail. + "describe_comment_indent" ) /** @@ -292,11 +301,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "default_partition_name", "delimiter", "desc_non_existent_tbl", - "describe_comment_indent", - "describe_database_json", "describe_formatted_view_partitioned", - "describe_formatted_view_partitioned_json", - "describe_table_json", "diff_part_input_formats", "disable_file_format_check", "drop_function", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 5118f4b3f99fd..9f5cf282f7c48 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -21,7 +21,9 @@ import scala.util.Try import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.{SchemaRDD, execution, Row} +import org.apache.spark.sql.{SchemaRDD, Row} + +case class TestData(a: Int, b: String) /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. @@ -240,13 +242,6 @@ class HiveQuerySuite extends HiveComparisonTest { .map(_.getString(0)) .contains(tableName)) - assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) { - hql(s"DESCRIBE $tableName") - .select('result) - .collect() - .map(_.getString(0).split("\t").map(_.trim)) - } - assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key"))) TestHive.reset() @@ -263,6 +258,97 @@ class HiveQuerySuite extends HiveComparisonTest { assert(Try(q0.count()).isSuccess) } + test("DESCRIBE commands") { + hql(s"CREATE TABLE test_describe_commands1 (key INT, value STRING) PARTITIONED BY (dt STRING)") + + hql( + """FROM src INSERT OVERWRITE TABLE test_describe_commands1 PARTITION (dt='2008-06-08') + |SELECT key, value + """.stripMargin) + + // Describe a table + assertResult( + Array( + Array("key", "int", null), + Array("value", "string", null), + Array("dt", "string", null), + Array("# Partition Information", "", ""), + Array("# col_name", "data_type", "comment"), + Array("dt", "string", null)) + ) { + hql("DESCRIBE test_describe_commands1") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a table with a fully qualified table name + assertResult( + Array( + Array("key", "int", null), + Array("value", "string", null), + Array("dt", "string", null), + Array("# Partition Information", "", ""), + Array("# col_name", "data_type", "comment"), + Array("dt", "string", null)) + ) { + hql("DESCRIBE default.test_describe_commands1") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a column is a native command + assertResult(Array(Array("value", "string", "from deserializer"))) { + hql("DESCRIBE test_describe_commands1 value") + .select('result) + .collect() + .map(_.getString(0).split("\t").map(_.trim)) + } + + // Describe a column is a native command + assertResult(Array(Array("value", "string", "from deserializer"))) { + hql("DESCRIBE default.test_describe_commands1 value") + .select('result) + .collect() + .map(_.getString(0).split("\t").map(_.trim)) + } + + // Describe a partition is a native command + assertResult( + Array( + Array("key", "int", "None"), + Array("value", "string", "None"), + Array("dt", "string", "None"), + Array("", "", ""), + Array("# Partition Information", "", ""), + Array("# col_name", "data_type", "comment"), + Array("", "", ""), + Array("dt", "string", "None")) + ) { + hql("DESCRIBE test_describe_commands1 PARTITION (dt='2008-06-08')") + .select('result) + .collect() + .map(_.getString(0).split("\t").map(_.trim)) + } + + // Describe a registered temporary table. + val testData: SchemaRDD = + TestHive.sparkContext.parallelize( + TestData(1, "str1") :: + TestData(1, "str2") :: Nil) + testData.registerAsTable("test_describe_commands2") + + assertResult( + Array( + Array("# Registered as a temporary table", null, null), + Array("a", "IntegerType", null), + Array("b", "StringType", null)) + ) { + hql("DESCRIBE test_describe_commands2") + .select('col_name, 'data_type, 'comment) + .collect() + } + } + test("parse HQL set commands") { // Adapted from its SQL counterpart. val testKey = "spark.sql.key.usedfortestonly"