Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Oct 12, 2017
1 parent 9d4c7a2 commit 5bdaf7d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ import org.apache.spark.util.SerializableConfiguration
*/
trait DataWritingCommand extends RunnableCommand {

/**
* The input query plan that produces the data to be written.
*/
def query: LogicalPlan

// We make the input `query` an inner child instead of a child in order to hide it from the
// optimizer. This is because optimizer may change the output schema names, and we have to keep
// the original analyzed plan here so that we can pass the corrected schema to the writer. The
// schema of analyzed plan is what user expects(or specifies), so we should respect it when
// writing.
// optimizer. This is because optimizer may not preserve the output schema names' case, and we
// have to keep the original analyzed plan here so that we can pass the corrected schema to the
// writer. The schema of analyzed plan is what user expects(or specifies), so we should respect
// it when writing.
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

override lazy val metrics: Map[String, SQLMetric] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ object FileFormatWriter extends Logging {
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

val allColumns = queryExecution.logical.output
// Pick the attributes from analyzed plan, as optimizer may not preserve the output schema
// names' case.
val allColumns = queryExecution.analyzed.output
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = allColumns.filterNot(partitionSet.contains)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
}

test("FileFormatWriter should respect the input query schema") {
withTable("t1", "t2") {
withTable("t1", "t2", "t3", "t4") {
spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")
checkAnswer(spark.table("t2"), Row(0, 0))

// Test picking part of the columns when writing.
spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3")
spark.sql("select COL1, COL2 from t3").write.saveAsTable("t4")
checkAnswer(spark.table("t4"), Row(0, 0))
}
}
}

0 comments on commit 5bdaf7d

Please sign in to comment.