Skip to content

Commit

Permalink
[SPARK-2177][SQL] describe table result contains only one column
Browse files Browse the repository at this point in the history
```
scala> hql("describe src").collect().foreach(println)

[key                 	string              	None                ]
[value               	string              	None                ]
```

The result should contain 3 columns instead of one. This screws up JDBC or even the downstream consumer of the Scala/Java/Python APIs.

I am providing a workaround. We handle a subset of describe commands in Spark SQL, which are defined by ...
```
DESCRIBE [EXTENDED] [db_name.]table_name
```
All other cases are treated as Hive native commands.

Also, if we upgrade Hive to 0.13, we need to check the results of context.sessionState.isHiveServerQuery() to determine how to split the result. This method is introduced by https://issues.apache.org/jira/browse/HIVE-4545. We may want to set Hive to use JsonMetaDataFormatter for the output of a DDL statement (`set hive.ddl.output.format=json` introduced by https://issues.apache.org/jira/browse/HIVE-2822).

The link to JIRA: https://issues.apache.org/jira/browse/SPARK-2177

Author: Yin Huai <[email protected]>

Closes #1118 from yhuai/SPARK-2177 and squashes the following commits:

fd2534c [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
b9b9aa5 [Yin Huai] rxin's comments.
e7c4e72 [Yin Huai] Fix unit test.
656b068 [Yin Huai] 100 characters.
6387217 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
8003cf3 [Yin Huai] Generate strings with the format like Hive for unit tests.
9787fff [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
440c5af [Yin Huai] rxin's comments.
f1a417e [Yin Huai] Update doc.
83adb2f [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
366f891 [Yin Huai] Add describe command.
74bd1d4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
342fdf7 [Yin Huai] Split to up to 3 parts.
725e88c [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2177
bb8bbef [Yin Huai] Split every string in the result of a describe command.

(cherry picked from commit f397e92)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
yhuai authored and rxin committed Jun 20, 2014
1 parent cc2e4ca commit ff4d5a2
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,19 @@ case class ExplainCommand(plan: LogicalPlan) extends Command {
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
* @param table The table to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
*/
case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends Command {
override def output = Seq(
// Column names are based on Hive.
BoundReference(0, AttributeReference("col_name", StringType, nullable = false)()),
BoundReference(1, AttributeReference("data_type", StringType, nullable = false)()),
BoundReference(2, AttributeReference("comment", StringType, nullable = false)()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,24 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:

override def output: Seq[Attribute] = Seq.empty
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
Seq(("# Registered as a temporary table", null, null)) ++
child.output.map(field => (field.name, field.dataType.toString, null))
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand

/**
* Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
Expand Down Expand Up @@ -291,6 +292,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* execution is simply passed back to Hive.
*/
def stringResult(): Seq[String] = executedPlan match {
case describeHiveTableCommand: DescribeHiveTableCommand =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
describeHiveTableCommand.hiveString
case command: PhysicalCommand =>
command.sideEffectResult.map(_.toString)

Expand Down
70 changes: 54 additions & 16 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ private[hive] case class AddFile(filePath: String) extends Command
private[hive] object HiveQl {
protected val nativeCommands = Seq(
"TOK_DESCFUNCTION",
"TOK_DESCTABLE",
"TOK_DESCDATABASE",
"TOK_SHOW_TABLESTATUS",
"TOK_SHOWDATABASES",
Expand Down Expand Up @@ -120,6 +119,12 @@ private[hive] object HiveQl {
"TOK_SWITCHDATABASE"
)

// Commands that we do not need to explain.
protected val noExplainCommands = Seq(
"TOK_CREATETABLE",
"TOK_DESCTABLE"
) ++ nativeCommands

/**
* A set of implicit transformations that allow Hive ASTNodes to be rewritten by transformations
* similar to [[catalyst.trees.TreeNode]].
Expand Down Expand Up @@ -362,13 +367,20 @@ private[hive] object HiveQl {
}
}

protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
val (db, tableName) =
tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}

(db, tableName)
}

protected def nodeToPlan(node: Node): LogicalPlan = node match {
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs) if nativeCommands contains explainArgs.head.getText =>
ExplainCommand(NoRelation)
// Create tables aren't native commands due to CTAS queries, but we still don't need to
// explain them.
case Token("TOK_EXPLAIN", explainArgs) if explainArgs.head.getText == "TOK_CREATETABLE" =>
case Token("TOK_EXPLAIN", explainArgs)
if noExplainCommands.contains(explainArgs.head.getText) =>
ExplainCommand(NoRelation)
case Token("TOK_EXPLAIN", explainArgs) =>
// Ignore FORMATTED if present.
Expand All @@ -377,6 +389,39 @@ private[hive] object HiveQl {
// TODO: support EXTENDED?
ExplainCommand(nodeToPlan(query))

case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val Some(tableType) :: formatted :: extended :: pretty :: Nil =
getClauses(Seq("TOK_TABTYPE", "FORMATTED", "EXTENDED", "PRETTY"), describeArgs)
if (formatted.isDefined || pretty.isDefined) {
// FORMATTED and PRETTY are not supported and this statement will be treated as
// a Hive native command.
NativePlaceholder
} else {
tableType match {
case Token("TOK_TABTYPE", nameParts) if nameParts.size == 1 => {
nameParts.head match {
case Token(".", dbName :: tableName :: Nil) =>
// It is describing a table with the format like "describe db.table".
// TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue.
val (db, tableName) = extractDbNameTableName(nameParts.head)
DescribeCommand(
UnresolvedRelation(db, tableName, None), extended.isDefined)
case Token(".", dbName :: tableName :: colName :: Nil) =>
// It is describing a column with the format like "describe db.table column".
NativePlaceholder
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
UnresolvedRelation(None, tableName.getText, None),
extended.isDefined)
}
}
// All other cases.
case _ => NativePlaceholder
}
}

case Token("TOK_CREATETABLE", children)
if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty =>
// TODO: Parse other clauses.
Expand Down Expand Up @@ -414,11 +459,8 @@ private[hive] object HiveQl {
s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}")
}

val (db, tableName) =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}
val (db, tableName) = extractDbNameTableName(tableNameParts)

InsertIntoCreatedTable(db, tableName, nodeToPlan(query))

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
Expand Down Expand Up @@ -736,11 +778,7 @@ private[hive] object HiveQl {
val Some(tableNameParts) :: partitionClause :: Nil =
getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)

val (db, tableName) =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}
val (db, tableName) = extractDbNameTableName(tableNameParts)

val partitionKeys = partitionClause.map(_.getChildren.map {
// Parse partitions. We also make keys case insensitive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
case describe: logical.DescribeCommand => {
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
case t: MetastoreRelation =>
Seq(DescribeHiveTableCommand(
t, describe.output, describe.isExtended)(context))
case o: LogicalPlan =>
Seq(DescribeCommand(planLater(o), describe.output)(context))
}
}
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils
import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
Expand Down Expand Up @@ -462,3 +464,61 @@ case class NativeCommand(

override def otherCopyArgs = context :: Nil
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeHiveTableCommand(
table: MetastoreRelation,
output: Seq[Attribute],
isExtended: Boolean)(
@transient context: HiveContext)
extends LeafNode with Command {

// Strings with the format like Hive. It is used for result comparison in our unit tests.
lazy val hiveString: Seq[String] = {
val alignment = 20
val delim = "\t"

sideEffectResult.map {
case (name, dataType, comment) =>
String.format("%-" + alignment + "s", name) + delim +
String.format("%-" + alignment + "s", dataType) + delim +
String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
}
}

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil

val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (!partitionColumns.isEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
partColumnInfo ++
Seq(("# Partition Information", "", "")) ++
Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++
partColumnInfo
}

if (isExtended) {
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}

results
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ abstract class HiveComparisonTest
case _: SetCommand => Seq("0")
case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
case _: DescribeCommand =>
// Filter out non-deterministic lines and lines which do not have actual results but
// can introduce problems because of the way Hive formats these lines.
// Then, remove empty lines. Do not sort the results.
answer.filterNot(
r => nonDeterministicLine(r) || ignoredLine(r)).map(_.trim).filterNot(_ == "")
case plan => if (isSorted(plan)) answer else answer.sorted
}
orderedAnswer.map(cleanPaths)
Expand All @@ -169,6 +175,16 @@ abstract class HiveComparisonTest
protected def nonDeterministicLine(line: String) =
nonDeterministicLineIndicators.exists(line contains _)

// This list contains indicators for those lines which do not have actual results and we
// want to ignore.
lazy val ignoredLineIndicators = Seq(
"# Partition Information",
"# col_name"
)

protected def ignoredLine(line: String) =
ignoredLineIndicators.exists(line contains _)

/**
* Removes non-deterministic paths from `str` so cached answers will compare correctly.
*/
Expand Down Expand Up @@ -329,11 +345,17 @@ abstract class HiveComparisonTest

if ((!hiveQuery.logical.isInstanceOf[ExplainCommand]) && preparedHive != catalyst) {

val hivePrintOut = s"== HIVE - ${hive.size} row(s) ==" +: preparedHive
val hivePrintOut = s"== HIVE - ${preparedHive.size} row(s) ==" +: preparedHive
val catalystPrintOut = s"== CATALYST - ${catalyst.size} row(s) ==" +: catalyst

val resultComparison = sideBySide(hivePrintOut, catalystPrintOut).mkString("\n")

println("hive output")
hive.foreach(println)

println("catalyst printout")
catalyst.foreach(println)

if (recomputeCache) {
logger.warn(s"Clearing cache files for failed test $testCaseName")
hiveCacheFiles.foreach(_.delete())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"alter_merge",
"alter_concatenate_indexed_table",
"protectmode2",
"describe_table",
//"describe_table",
"describe_comment_nonascii",
"udf5",
"udf_java_method",
Expand Down Expand Up @@ -177,7 +177,16 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// After stop taking the `stringOrError` route, exceptions are thrown from these cases.
// See SPARK-2129 for details.
"join_view",
"mergejoins_mixed"
"mergejoins_mixed",

// Returning the result of a describe state as a JSON object is not supported.
"describe_table_json",
"describe_database_json",
"describe_formatted_view_partitioned_json",

// Hive returns the results of describe as plain text. Comments with multiple lines
// introduce extra lines in the Hive results, which make the result comparison fail.
"describe_comment_indent"
)

/**
Expand Down Expand Up @@ -292,11 +301,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"default_partition_name",
"delimiter",
"desc_non_existent_tbl",
"describe_comment_indent",
"describe_database_json",
"describe_formatted_view_partitioned",
"describe_formatted_view_partitioned_json",
"describe_table_json",
"diff_part_input_formats",
"disable_file_format_check",
"drop_function",
Expand Down
Loading

0 comments on commit ff4d5a2

Please sign in to comment.