Skip to content

Commit

Permalink
[SPARK-29512][SQL] REPAIR TABLE should look up catalog/table like v2 …
Browse files Browse the repository at this point in the history
…commands

### What changes were proposed in this pull request?

Add RepairTableStatement and make REPAIR TABLE go through the same catalog/table resolution framework of v2 commands.

### Why are the changes needed?

It's important to make all the commands have the same table resolution behavior, to avoid confusing end-users. e.g.

```
USE my_catalog
DESC t // success and describe the table t from my_catalog
MSCK REPAIR TABLE t // report table not found as there is no table t in the session catalog
```
### Does this PR introduce any user-facing change?

yes. When running MSCK REPAIR TABLE, Spark fails the command if the current catalog is set to a v2 catalog, or the table name specified a v2 catalog.

### How was this patch tested?

New unit tests

Closes #26168 from imback82/repair_table.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
imback82 authored and viirya committed Oct 19, 2019
1 parent 2437878 commit ab92e17
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ statement
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE tableIdentifier #repairTable
| MSCK REPAIR TABLE multipartIdentifier #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2716,4 +2716,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
tableName, Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
}
}

/**
* Create a [[RepairTableStatement]].
*
* For example:
* {{{
* MSCK REPAIR TABLE multi_part_name
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,8 @@ case class AnalyzeColumnStatement(
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
"mutually exclusive. Only one of them should be specified.")
}

/**
* A REPAIR TABLE statement, as parsed from SQL
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@ class DDLParserSuite extends AnalysisTest {
"missing 'COLUMNS' at '<EOF>'")
}

test("MSCK REPAIR table") {
comparePlans(
parsePlan("MSCK REPAIR TABLE a.b.c"),
RepairTableStatement(Seq("a", "b", "c")))
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -266,22 +266,30 @@ class ResolveSessionCatalog(
ShowTablesCommand(None, pattern)

case AnalyzeTableStatement(tableName, partitionSpec, noScan) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
val v1TableName = parseV1Table(tableName, "ANALYZE TABLE")
if (partitionSpec.isEmpty) {
AnalyzeTableCommand(parts.asTableIdentifier, noScan)
AnalyzeTableCommand(v1TableName.asTableIdentifier, noScan)
} else {
AnalyzePartitionCommand(parts.asTableIdentifier, partitionSpec, noScan)
AnalyzePartitionCommand(v1TableName.asTableIdentifier, partitionSpec, noScan)
}

case AnalyzeColumnStatement(tableName, columnNames, allColumns) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
AnalyzeColumnCommand(parts.asTableIdentifier, columnNames, allColumns)
val v1TableName = parseV1Table(tableName, "ANALYZE TABLE")
AnalyzeColumnCommand(v1TableName.asTableIdentifier, columnNames, allColumns)

case RepairTableStatement(tableName) =>
val v1TableName = parseV1Table(tableName, "MSCK REPAIR TABLE")
AlterTableRecoverPartitionsCommand(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException(s"$sql is only supported with v1 tables.")
}
parts
}

private def buildCatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,20 +360,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Create a [[AlterTableRecoverPartitionsCommand]] command.
*
* For example:
* {{{
* MSCK REPAIR TABLE tablename
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableRecoverPartitionsCommand(
visitTableIdentifier(ctx.tableIdentifier),
"MSCK REPAIR TABLE")
}

/**
* Create a [[CreateDatabaseCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,18 @@ class DataSourceV2SQLSuite
}
}

test("MSCK REPAIR TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")

val e = intercept[AnalysisException] {
sql(s"MSCK REPAIR TABLE $t")
}
assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables"))
}
}

private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,15 +1444,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
intercept(sql2, "Found duplicate clauses: TBLPROPERTIES")
}

test("MSCK REPAIR table") {
val sql = "MSCK REPAIR TABLE tab1"
val parsed = parser.parsePlan(sql)
val expected = AlterTableRecoverPartitionsCommand(
TableIdentifier("tab1", None),
"MSCK REPAIR TABLE")
comparePlans(parsed, expected)
}

test("create table like") {
val v1 = "CREATE TABLE table1 LIKE table2"
val (target, source, location, exists) = parser.parsePlan(v1).collect {
Expand Down

0 comments on commit ab92e17

Please sign in to comment.