Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22252][SQL] FileFormatWriter should respect the input query schema #19474

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

In #18064, we allowed RunnableCommand to have children in order to fix some UI issues. Then we made InsertIntoXXX commands take the input query as a child, when we do the actual writing, we just pass the physical plan to the writer(FileFormatWriter.write).

However this is problematic. In Spark SQL, optimizer and planner are allowed to change the schema names a little bit. e.g. ColumnPruning rule will remove no-op Projects, like Project("A", Scan("a")), and thus change the output schema from "<A: int>" to <a: int>. When it comes to writing, especially for self-description data format like parquet, we may write the wrong schema to the file and cause null values at the read path.

Fortunately, in #18450 , we decided to allow nested execution and one query can map to multiple executions in the UI. This releases the major restriction in #18604 , and now we don't have to take the input query as child of InsertIntoXXX commands.

So the fix is simple, this PR partially revert #18064 and make InsertIntoXXX commands leaf nodes again.

How was this patch tested?

new regression test

@cloud-fan
Copy link
Contributor Author

cc @gatorsmile @viirya

@cloud-fan
Copy link
Contributor Author

For a simple command Seq(1 -> "a").toDF("i", "j").write.parquet("/tmp/qwe"), the UI before this PR:
p1

The UI after this PR:
p2

The scan node is no longer visible above the insert node, I'll fix this later. The writer bug is more important and we should fix it ASAP.

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82638 has finished for PR 19474 at commit 3b1174f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait Command extends LeafNode
  • trait RunnableCommand extends Command
  • case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82639 has finished for PR 19474 at commit 0667ac8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait Command extends LeafNode
  • trait RunnableCommand extends Command
  • case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82640 has finished for PR 19474 at commit 9d4c7a2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait Command extends LeafNode
  • trait RunnableCommand extends Command
  • case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode

@viirya
Copy link
Member

viirya commented Oct 12, 2017

The scan node is no longer visible above the insert node, I'll fix this later. The writer bug is more important and we should fix it ASAP.

Totally agreed. LGTM

@viirya
Copy link
Member

viirya commented Oct 12, 2017

I like this change because the relation between ExecutedCommandExec and RunnableCommand is a little entangled before.

@@ -117,7 +117,7 @@ object FileFormatWriter extends Logging {
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

val allColumns = plan.output
val allColumns = queryExecution.logical.output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be good to leave a comment that we should not use optimized output here in case it will be changed in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, shall we use queryExecution.analyzed.output?

@viirya
Copy link
Member

viirya commented Oct 12, 2017

Minor comments. LGTM


test("FileFormatWriter should respect the input query schema") {
withTable("t1", "t2") {
spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add another case here?

spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3")

def query: LogicalPlan

// We make the input `query` an inner child instead of a child in order to hide it from the
// optimizer. This is because optimizer may change the output schema names, and we have to keep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will scare others. :)

-> may not preserve the output schema names' case

@@ -117,7 +117,7 @@ object FileFormatWriter extends Logging {
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

val allColumns = plan.output
val allColumns = queryExecution.logical.output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly using analyzed's schema is better here.

@@ -30,6 +31,15 @@ import org.apache.spark.util.SerializableConfiguration
*/
trait DataWritingCommand extends RunnableCommand {

def query: LogicalPlan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one line description for query?

val allColumns = plan.output
// Pick the attributes from analyzed plan, as optimizer may not preserve the output schema
// names' case.
val allColumns = queryExecution.analyzed.output
val partitionSet = AttributeSet(partitionColumns)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might need to double check the partitionColumns in all the other files are also from analyzed plans.

@gatorsmile
Copy link
Member

LGTM pending Jenkins.

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82661 has finished for PR 19474 at commit 5bdaf7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

thanks for review, merging to master!

@asfgit asfgit closed this in 274f0ef Oct 12, 2017
asfgit pushed a commit that referenced this pull request Oct 13, 2017
## What changes were proposed in this pull request?

This is a minor folllowup of #19474 .

#19474 partially reverted #18064 but accidentally introduced a behavior change. `Command` extended `LogicalPlan` before #18064 , but #19474 made it extend `LeafNode`. This is an internal behavior change as now all `Command` subclasses can't define children, and they have to implement `computeStatistic` method.

This PR fixes this by making `Command` extend `LogicalPlan`

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #19493 from cloud-fan/minor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants