Skip to content

Commit

Permalink
[SPARK-19129][SQL] SessionCatalog: Disallow empty part col values in …
Browse files Browse the repository at this point in the history
…partition spec

### What changes were proposed in this pull request?
Empty partition column values are not valid for partition specification. Before this PR, we accept users to do it; however, Hive metastore does not detect and disallow it too. Thus, users hit the following strange error.

```Scala
val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
spark.sql("alter table partitionedTable drop partition(partCol1='')")
spark.table("partitionedTable").show()
```

In the above example, the WHOLE table is DROPPED when users specify a partition spec containing only one partition column with empty values.

When the partition columns contains more than one, Hive metastore APIs simply ignore the columns with empty values and treat it as partial spec. This is also not expected. This does not follow the actual Hive behaviors. This PR is to disallow users to specify such an invalid partition spec in the `SessionCatalog` APIs.

### How was this patch tested?
Added test cases

Author: gatorsmile <[email protected]>

Closes #16583 from gatorsmile/disallowEmptyPartColValue.
  • Loading branch information
gatorsmile authored and cloud-fan committed Jan 17, 2017
1 parent a83accf commit a23debd
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class SessionCatalog(
def loadPartition(
name: TableIdentifier,
loadPath: String,
partition: TablePartitionSpec,
spec: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
Expand All @@ -340,8 +340,9 @@ class SessionCatalog(
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.loadPartition(
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
db, table, loadPath, spec, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
}

def defaultTablePath(tableIdent: TableIdentifier): String = {
Expand Down Expand Up @@ -693,6 +694,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}

Expand All @@ -711,6 +713,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(specs)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
}

Expand All @@ -731,6 +734,8 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
requireNonEmptyValueInPartitionSpec(specs)
requireNonEmptyValueInPartitionSpec(newSpecs)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}

Expand All @@ -749,6 +754,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.alterPartitions(db, table, parts)
}

Expand All @@ -762,6 +768,7 @@ class SessionCatalog(
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.getPartition(db, table, spec)
}

Expand All @@ -781,6 +788,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitionNames(db, table, partialSpec)
}
Expand All @@ -801,6 +809,7 @@ class SessionCatalog(
requireTableExists(TableIdentifier(table, Option(db)))
partialSpec.foreach { spec =>
requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
externalCatalog.listPartitions(db, table, partialSpec)
}
Expand All @@ -819,6 +828,19 @@ class SessionCatalog(
externalCatalog.listPartitionsByFilter(db, table, predicates)
}

/**
* Verify if the input partition spec has any empty value.
*/
private def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = {
specs.foreach { s =>
if (s.values.exists(_.isEmpty)) {
val spec = s.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")
throw new AnalysisException(
s"Partition spec is invalid. The spec ($spec) contains an empty partition column value")
}
}
}

/**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ abstract class CatalogTestUtils {
CatalogTablePartition(Map("a" -> "5", "b" -> "6", "c" -> "7"), storageFormat)
lazy val partWithUnknownColumns =
CatalogTablePartition(Map("a" -> "5", "unknown" -> "6"), storageFormat)
lazy val partWithEmptyValue =
CatalogTablePartition(Map("a" -> "3", "b" -> ""), storageFormat)
lazy val funcClass = "org.apache.spark.myFunc"

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.createPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(partWithEmptyValue, part1), ignoreIfExists = true)
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("drop partitions") {
Expand Down Expand Up @@ -722,6 +729,16 @@ class SessionCatalogSuite extends PlanTest {
assert(e.getMessage.contains(
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
"the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.dropPartitions(
TableIdentifier("tbl2", Some("db2")),
Seq(partWithEmptyValue.spec, part1.spec),
ignoreIfNotExists = false,
purge = false,
retainData = false)
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("get partition") {
Expand Down Expand Up @@ -767,6 +784,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
e = intercept[AnalysisException] {
catalog.getPartition(TableIdentifier("tbl1", Some("db2")), partWithEmptyValue.spec)
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("rename partitions") {
Expand Down Expand Up @@ -834,6 +856,13 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
e = intercept[AnalysisException] {
catalog.renamePartitions(
TableIdentifier("tbl1", Some("db2")),
Seq(part1.spec), Seq(partWithEmptyValue.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("alter partitions") {
Expand Down Expand Up @@ -893,6 +922,11 @@ class SessionCatalogSuite extends PlanTest {
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must match " +
"the partition spec (a, b) defined in table '`db2`.`tbl1`'"))
e = intercept[AnalysisException] {
catalog.alterPartitions(TableIdentifier("tbl1", Some("db2")), Seq(partWithEmptyValue))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("list partition names") {
Expand All @@ -914,10 +948,24 @@ class SessionCatalogSuite extends PlanTest {

test("list partition names with invalid partial partition spec") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
var e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
Some(partWithMoreColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
Some(partWithUnknownColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitionNames(TableIdentifier("tbl2", Some("db2")),
Some(Map("unknown" -> "unknown")))
Some(partWithEmptyValue.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("list partitions") {
Expand All @@ -937,10 +985,22 @@ class SessionCatalogSuite extends PlanTest {

test("list partitions with invalid partial partition spec") {
val catalog = new SessionCatalog(newBasicCatalog())
intercept[AnalysisException] {
catalog.listPartitions(
TableIdentifier("tbl2", Some("db2")), Some(Map("unknown" -> "unknown")))
var e = intercept[AnalysisException] {
catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithMoreColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, b, c) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitions(TableIdentifier("tbl2", Some("db2")),
Some(partWithUnknownColumns.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec (a, unknown) must be " +
"contained within the partition spec (a, b) defined in table '`db2`.`tbl2`'"))
e = intercept[AnalysisException] {
catalog.listPartitions(TableIdentifier("tbl2", Some("db2")), Some(partWithEmptyValue.spec))
}
assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " +
"empty partition column value"))
}

test("list partitions when database/table does not exist") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ private[hive] class HiveClientImpl(
// do the check at first and collect all the matching partitions
val matchingParts =
specs.flatMap { s =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
// The provided spec here can be a partial spec, i.e. it will match all partitions
// whose specs are supersets of this partial spec. E.g. If a table has partitions
// (b='1', c='1') and (b='1', c='2'), a partial spec of (b='1') will match both.
Expand Down Expand Up @@ -545,6 +546,7 @@ private[hive] class HiveClientImpl(
// -1 for result limit means "no limit/return all"
client.getPartitionNames(table.database, table.identifier.table, -1)
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
client.getPartitionNames(table.database, table.identifier.table, s.asJava, -1)
}
hivePartitionNames.asScala.sorted
Expand All @@ -568,7 +570,9 @@ private[hive] class HiveClientImpl(
val hiveTable = toHiveTable(table)
val parts = spec match {
case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition)
case Some(s) => client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
case Some(s) =>
assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
client.getPartitions(hiveTable, s.asJava).asScala.map(fromHivePartition)
}
HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
parts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ class HiveDDLSuite
}
}

test("SPARK-19129: drop partition with a empty string will drop the whole table") {
val df = spark.createDataFrame(Seq((0, "a"), (1, "b"))).toDF("partCol1", "name")
df.write.mode("overwrite").partitionBy("partCol1").saveAsTable("partitionedTable")
val e = intercept[AnalysisException] {
spark.sql("alter table partitionedTable drop partition(partCol1='')")
}.getMessage
assert(e.contains("Partition spec is invalid. The spec ([partCol1=]) contains an empty " +
"partition column value"))
}

test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
Expand Down

0 comments on commit a23debd

Please sign in to comment.