Skip to content

Commit

Permalink
[SPARK-26307][SQL] Fix CTAS when INSERT a partitioned table using Hiv…
Browse files Browse the repository at this point in the history
…e serde

## What changes were proposed in this pull request?

This is a  Spark 2.3 regression introduced in apache#20521. We should add the partition info for InsertIntoHiveTable in CreateHiveTableAsSelectCommand. Otherwise, we will hit the following error by running the newly added test case:

```
[info] - CTAS: INSERT a partitioned table using Hive serde *** FAILED *** (829 milliseconds)
[info]   org.apache.spark.SparkException: Requested partitioning does not match the tab1 table:
[info] Requested partitions:
[info] Table partitions: part
[info]   at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:179)
[info]   at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:107)
```

## How was this patch tested?

Added a test case.

Closes apache#23255 from gatorsmile/fixCTAS.

Authored-by: gatorsmile <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
gatorsmile authored and cloud-fan committed Dec 10, 2018
1 parent 403c8d5 commit 3bc83de
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ case class CreateHiveTableAsSelectCommand(
return Seq.empty
}

// For CTAS, there is no static partition values to insert.
val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
tableDesc,
Map.empty,
partition,
query,
overwrite = false,
ifPartitionNotExists = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,17 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}

test("SPARK-26307: CTAS - INSERT a partitioned table using Hive serde") {
withTable("tab1") {
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
val df = Seq(("a", 100)).toDF("part", "id")
df.write.format("hive").partitionBy("part").mode("overwrite").saveAsTable("tab1")
df.write.format("hive").partitionBy("part").mode("append").saveAsTable("tab1")
}
}
}


Seq("LOCAL", "").foreach { local =>
Seq(true, false).foreach { caseSensitivity =>
Seq("orc", "parquet").foreach { format =>
Expand Down

0 comments on commit 3bc83de

Please sign in to comment.