Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2177][SQL] describe table result contains only one column #1118

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,19 @@ case class ExplainCommand(plan: LogicalPlan) extends Command {
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this block

* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
* @param table The table to be described.
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
*/
case class DescribeCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to explain isFormatted / isExtended in @param.

table: LogicalPlan,
isExtended: Boolean) extends Command {
override def output = Seq(
// Column names are based on Hive.
BoundReference(0, AttributeReference("col_name", StringType, nullable = false)()),
BoundReference(1, AttributeReference("data_type", StringType, nullable = false)()),
BoundReference(2, AttributeReference("comment", StringType, nullable = false)()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,24 @@ case class CacheCommand(tableName: String, doCache: Boolean)(@transient context:

override def output: Seq[Attribute] = Seq.empty
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
Seq(("# Registered as a temporary table", null, null)) ++
child.output.map(field => (field.name, field.dataType.toString, null))
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand

/**
* Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
Expand Down Expand Up @@ -291,6 +292,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* execution is simply passed back to Hive.
*/
def stringResult(): Seq[String] = executedPlan match {
case describeHiveTableCommand: DescribeHiveTableCommand =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
describeHiveTableCommand.hiveString
case command: PhysicalCommand =>
command.sideEffectResult.map(_.toString)

Expand Down
70 changes: 54 additions & 16 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ private[hive] case class AddFile(filePath: String) extends Command
private[hive] object HiveQl {
protected val nativeCommands = Seq(
"TOK_DESCFUNCTION",
"TOK_DESCTABLE",
"TOK_DESCDATABASE",
"TOK_SHOW_TABLESTATUS",
"TOK_SHOWDATABASES",
Expand Down Expand Up @@ -120,6 +119,12 @@ private[hive] object HiveQl {
"TOK_SWITCHDATABASE"
)

// Commands that we do not need to explain.
protected val noExplainCommands = Seq(
"TOK_CREATETABLE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does ctas fall in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noExplainCommands is for those commands which we do not need to explain. For example, we will basically do nothing for "EXPLAIN CTAS". A regular CTAS query will not fall in here.

"TOK_DESCTABLE"
) ++ nativeCommands

/**
* A set of implicit transformations that allow Hive ASTNodes to be rewritten by transformations
* similar to [[catalyst.trees.TreeNode]].
Expand Down Expand Up @@ -362,13 +367,20 @@ private[hive] object HiveQl {
}
}

protected def extractDbNameTableName(tableNameParts: Node): (Option[String], String) = {
val (db, tableName) =
tableNameParts.getChildren.map { case Token(part, Nil) => cleanIdentifier(part) } match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}

(db, tableName)
}

protected def nodeToPlan(node: Node): LogicalPlan = node match {
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs) if nativeCommands contains explainArgs.head.getText =>
ExplainCommand(NoRelation)
// Create tables aren't native commands due to CTAS queries, but we still don't need to
// explain them.
case Token("TOK_EXPLAIN", explainArgs) if explainArgs.head.getText == "TOK_CREATETABLE" =>
case Token("TOK_EXPLAIN", explainArgs)
if noExplainCommands.contains(explainArgs.head.getText) =>
ExplainCommand(NoRelation)
case Token("TOK_EXPLAIN", explainArgs) =>
// Ignore FORMATTED if present.
Expand All @@ -377,6 +389,39 @@ private[hive] object HiveQl {
// TODO: support EXTENDED?
ExplainCommand(nodeToPlan(query))

case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val Some(tableType) :: formatted :: extended :: pretty :: Nil =
getClauses(Seq("TOK_TABTYPE", "FORMATTED", "EXTENDED", "PRETTY"), describeArgs)
if (formatted.isDefined || pretty.isDefined) {
// FORMATTED and PRETTY are not supported and this statement will be treated as
// a Hive native command.
NativePlaceholder
} else {
tableType match {
case Token("TOK_TABTYPE", nameParts) if nameParts.size == 1 => {
nameParts.head match {
case Token(".", dbName :: tableName :: Nil) =>
// It is describing a table with the format like "describe db.table".
// TODO: Actually, a user may mean tableName.columnName. Need to resolve this issue.
val (db, tableName) = extractDbNameTableName(nameParts.head)
DescribeCommand(
UnresolvedRelation(db, tableName, None), extended.isDefined)
case Token(".", dbName :: tableName :: colName :: Nil) =>
// It is describing a column with the format like "describe db.table column".
NativePlaceholder
case tableName =>
// It is describing a table with the format like "describe table".
DescribeCommand(
UnresolvedRelation(None, tableName.getText, None),
extended.isDefined)
}
}
// All other cases.
case _ => NativePlaceholder
}
}

case Token("TOK_CREATETABLE", children)
if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty =>
// TODO: Parse other clauses.
Expand Down Expand Up @@ -414,11 +459,8 @@ private[hive] object HiveQl {
s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}")
}

val (db, tableName) =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}
val (db, tableName) = extractDbNameTableName(tableNameParts)

InsertIntoCreatedTable(db, tableName, nodeToPlan(query))

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
Expand Down Expand Up @@ -736,11 +778,7 @@ private[hive] object HiveQl {
val Some(tableNameParts) :: partitionClause :: Nil =
getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)

val (db, tableName) =
tableNameParts.getChildren.map{ case Token(part, Nil) => cleanIdentifier(part)} match {
case Seq(tableOnly) => (None, tableOnly)
case Seq(databaseName, table) => (Some(databaseName), table)
}
val (db, tableName) = extractDbNameTableName(tableNameParts)

val partitionKeys = partitionClause.map(_.getChildren.map {
// Parse partitions. We also make keys case insensitive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
case describe: logical.DescribeCommand => {
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
case t: MetastoreRelation =>
Seq(DescribeHiveTableCommand(
t, describe.output, describe.isExtended)(context))
case o: LogicalPlan =>
Seq(DescribeCommand(planLater(o), describe.output)(context))
}
}
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils
import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
Expand Down Expand Up @@ -456,3 +458,61 @@ case class NativeCommand(

override def otherCopyArgs = context :: Nil
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class DescribeHiveTableCommand(
table: MetastoreRelation,
output: Seq[Attribute],
isExtended: Boolean)(
@transient context: HiveContext)
extends LeafNode with Command {

// Strings with the format like Hive. It is used for result comparison in our unit tests.
lazy val hiveString: Seq[String] = {
val alignment = 20
val delim = "\t"

sideEffectResult.map {
case (name, dataType, comment) =>
String.format("%-" + alignment + "s", name) + delim +
String.format("%-" + alignment + "s", dataType) + delim +
String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
}
}

override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
// Trying to mimic the format of Hive's output. But not exactly the same.
var results: Seq[(String, String, String)] = Nil

val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
results ++= columns.map(field => (field.getName, field.getType, field.getComment))
if (!partitionColumns.isEmpty) {
val partColumnInfo =
partitionColumns.map(field => (field.getName, field.getType, field.getComment))
results ++=
partColumnInfo ++
Seq(("# Partition Information", "", "")) ++
Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++
partColumnInfo
}

if (isExtended) {
results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
}

results
}

override def execute(): RDD[Row] = {
val rows = sideEffectResult.map {
case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
}
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ abstract class HiveComparisonTest
case _: SetCommand => Seq("0")
case _: LogicalNativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
case _: ExplainCommand => answer
case _: DescribeCommand =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some inline comment explaining what you are filtering

// Filter out non-deterministic lines and lines which do not have actual results but
// can introduce problems because of the way Hive formats these lines.
// Then, remove empty lines. Do not sort the results.
answer.filterNot(
r => nonDeterministicLine(r) || ignoredLine(r)).map(_.trim).filterNot(_ == "")
case plan => if (isSorted(plan)) answer else answer.sorted
}
orderedAnswer.map(cleanPaths)
Expand All @@ -169,6 +175,16 @@ abstract class HiveComparisonTest
protected def nonDeterministicLine(line: String) =
nonDeterministicLineIndicators.exists(line contains _)

// This list contains indicators for those lines which do not have actual results and we
// want to ignore.
lazy val ignoredLineIndicators = Seq(
"# Partition Information",
"# col_name"
)

protected def ignoredLine(line: String) =
ignoredLineIndicators.exists(line contains _)

/**
* Removes non-deterministic paths from `str` so cached answers will compare correctly.
*/
Expand Down Expand Up @@ -329,11 +345,17 @@ abstract class HiveComparisonTest

if ((!hiveQuery.logical.isInstanceOf[ExplainCommand]) && preparedHive != catalyst) {

val hivePrintOut = s"== HIVE - ${hive.size} row(s) ==" +: preparedHive
val hivePrintOut = s"== HIVE - ${preparedHive.size} row(s) ==" +: preparedHive
val catalystPrintOut = s"== CATALYST - ${catalyst.size} row(s) ==" +: catalyst

val resultComparison = sideBySide(hivePrintOut, catalystPrintOut).mkString("\n")

println("hive output")
hive.foreach(println)

println("catalyst printout")
catalyst.foreach(println)

if (recomputeCache) {
logger.warn(s"Clearing cache files for failed test $testCaseName")
hiveCacheFiles.foreach(_.delete())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"alter_merge",
"alter_concatenate_indexed_table",
"protectmode2",
"describe_table",
//"describe_table",
"describe_comment_nonascii",
"udf5",
"udf_java_method",
Expand Down Expand Up @@ -177,7 +177,16 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// After stop taking the `stringOrError` route, exceptions are thrown from these cases.
// See SPARK-2129 for details.
"join_view",
"mergejoins_mixed"
"mergejoins_mixed",

// Returning the result of a describe state as a JSON object is not supported.
"describe_table_json",
"describe_database_json",
"describe_formatted_view_partitioned_json",

// Hive returns the results of describe as plain text. Comments with multiple lines
// introduce extra lines in the Hive results, which make the result comparison fail.
"describe_comment_indent"
)

/**
Expand Down Expand Up @@ -292,11 +301,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"default_partition_name",
"delimiter",
"desc_non_existent_tbl",
"describe_comment_indent",
"describe_database_json",
"describe_formatted_view_partitioned",
"describe_formatted_view_partitioned_json",
"describe_table_json",
"diff_part_input_formats",
"disable_file_format_check",
"drop_function",
Expand Down
Loading