diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 5fdccc02b3cce..2cf06982e25f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -31,13 +31,16 @@ import org.apache.spark.util.SerializableConfiguration */ trait DataWritingCommand extends RunnableCommand { + /** + * The input query plan that produces the data to be written. + */ def query: LogicalPlan // We make the input `query` an inner child instead of a child in order to hide it from the - // optimizer. This is because optimizer may change the output schema names, and we have to keep - // the original analyzed plan here so that we can pass the corrected schema to the writer. The - // schema of analyzed plan is what user expects(or specifies), so we should respect it when - // writing. + // optimizer. This is because optimizer may not preserve the output schema names' case, and we + // have to keep the original analyzed plan here so that we can pass the corrected schema to the + // writer. The schema of analyzed plan is what user expects(or specifies), so we should respect + // it when writing. override protected def innerChildren: Seq[LogicalPlan] = query :: Nil override lazy val metrics: Map[String, SQLMetric] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index e409994ae6336..75b1695fbc275 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -117,7 +117,9 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) - val allColumns = queryExecution.logical.output + // Pick the attributes from analyzed plan, as optimizer may not preserve the output schema + // names' case. + val allColumns = queryExecution.analyzed.output val partitionSet = AttributeSet(partitionColumns) val dataColumns = allColumns.filterNot(partitionSet.contains) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index 7cacafb3b058b..6f8767db176aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -33,10 +33,15 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext { } test("FileFormatWriter should respect the input query schema") { - withTable("t1", "t2") { + withTable("t1", "t2", "t3", "t4") { spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1") spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2") checkAnswer(spark.table("t2"), Row(0, 0)) + + // Test picking part of the columns when writing. + spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3") + spark.sql("select COL1, COL2 from t3").write.saveAsTable("t4") + checkAnswer(spark.table("t4"), Row(0, 0)) } } }