-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when append data to an existing table #16313
Conversation
Test build #70257 has finished for PR 16313 at commit
|
Test build #70307 has finished for PR 16313 at commit
|
Test build #70309 has finished for PR 16313 at commit
|
@@ -157,39 +156,74 @@ case class CreateDataSourceTableAsSelectCommand( | |||
// Since the table already exists and the save mode is Ignore, we will just return. | |||
return Seq.empty[Row] | |||
case SaveMode.Append => | |||
val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB) | |||
if (existingTable.tableType == CatalogTableType.VIEW) { | |||
throw new AnalysisException("Saving data into a view is not allowed.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have an assert at the beginning of this function. We need to improve the error message there. That can cover more cases.
So far, the error message is pretty confusing.
assertion failed
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
We can add a test case for it.
val df = spark.range(1, 10).toDF("id1")
df.write.saveAsTable("tab1")
spark.sql("create view view1 as select * from tab1")
df.write.mode(SaveMode.Append).format("parquet").saveAsTable("view1")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this check as it's unreachable now, and fix the error message in another PR.
@@ -157,39 +156,74 @@ case class CreateDataSourceTableAsSelectCommand( | |||
// Since the table already exists and the save mode is Ignore, we will just return. | |||
return Seq.empty[Row] | |||
case SaveMode.Append => | |||
val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that possible we can directly use the input parameter table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they are different. The input table just contains some user-specified information and the schema is always empty. https://github.com/cloud-fan/spark/blob/b1dbd0a19a174eaae1aaf114e04f6d3683ea65c4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala#L135
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uh, I see. Will do the final review today.
@@ -133,6 +133,16 @@ case class BucketSpec( | |||
if (numBuckets <= 0) { | |||
throw new AnalysisException(s"Expected positive number of buckets, but got `$numBuckets`.") | |||
} | |||
|
|||
override def toString: String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we implement toString
here, we can simplify our logics in describeBucketingInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how? the toString
returns a single line, while describeBucketingInfo
generates 3 result lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: ) If we want to keep the existing format (3 lines), then we are unable to do it.
@@ -459,7 +459,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes | |||
test("saveAsTable()/load() - partitioned table - ErrorIfExists") { | |||
Seq.empty[(Int, String)].toDF().createOrReplaceTempView("t") | |||
|
|||
withTempView("t") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need withTempView("t")
to drop the temp view that is created at the beginning of this test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, my mistake
Test build #70321 has finished for PR 16313 at commit
|
if (specifiedPartCols != existingTable.partitionColumnNames) { | ||
throw new AnalysisException( | ||
s""" | ||
|Specified partitioning does not match the existing table $tableName. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the grammar issue.
Specified partitioning does not match the existing table $tableName.
->
Specified partitioning does not match that of the existing table $tableName.
Found a reference link: in sybase adaptive server.
existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed") | ||
throw new AnalysisException( | ||
s""" | ||
|Specified bucketing does not match the existing table $tableName. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The same here.
|
||
if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { | ||
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + | ||
s"not supported yet. Please use the insertInto() API as an alternative.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the string interpolations
is not needed.
throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + | ||
s"not supported yet. Please use the insertInto() API as an alternative.") | ||
} | ||
|
||
// Check if the specified data source match the data source of the existing table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, the checking logics are split two places for CTAS of data source tables using the Append mode. Maybe we can improve the comment to explain AnalyzeCreateTable
verifies the consistency between the user-specified table schema/definition and the SELECT query. Here, we verifies the consistency between the user-specified table schema/definition and the existing table schema/definition, the consistency between the existing table schema/definition and the SELECT query.
LGTM except a few minor comments. |
Test build #70331 has finished for PR 16313 at commit
|
LGTM |
retest this please |
Test build #70392 has finished for PR 16313 at commit
|
Thanks! Merging to master/2.1. |
…ing when append data to an existing table ## What changes were proposed in this pull request? When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data. However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc. This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs: * SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files. * SPARK-18912: We forget to check the number of columns for non-file-based data source table * SPARK-18913: We don't support append data to a table with special column names. ## How was this patch tested? new regression test. Author: Wenchen Fan <[email protected]> Closes #16313 from cloud-fan/bug1. (cherry picked from commit f923c84) Signed-off-by: Wenchen Fan <[email protected]>
Actually this PR was not backported to 2.1, now I've backported. |
…ing when append data to an existing table ## What changes were proposed in this pull request? When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data. However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc. This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs: * SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files. * SPARK-18912: We forget to check the number of columns for non-file-based data source table * SPARK-18913: We don't support append data to a table with special column names. ## How was this patch tested? new regression test. Author: Wenchen Fan <[email protected]> Closes apache#16313 from cloud-fan/bug1.
What changes were proposed in this pull request?
When we append data to an existing table with
DataFrameWriter.saveAsTable
, we will do various checks to make sure the appended data is consistent with the existing data.However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for
HadoopFsRelation
, we forget to check bucketing, etc.This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs:
How was this patch tested?
new regression test.