From 9c45cb9bd23938797d99d82335358ec1a971b96f Mon Sep 17 00:00:00 2001 From: Kris Mok Date: Mon, 17 Dec 2018 13:41:20 +0800 Subject: [PATCH] [SPARK-26352][SQL] join reorder should not change the order of output attributes The optimizer rule `org.apache.spark.sql.catalyst.optimizer.ReorderJoin` performs join reordering on inner joins. This was introduced from SPARK-12032 (https://github.com/apache/spark/pull/10073) in 2015-12. After it had reordered the joins, though, it didn't check whether or not the output attribute order is still the same as before. Thus, it's possible to have a mismatch between the reordered output attributes order vs the schema that a DataFrame thinks it has. The same problem exists in the CBO version of join reordering (`CostBasedJoinReorder`) too. This can be demonstrated with the example: ```scala spark.sql("create table table_a (x int, y int) using parquet") spark.sql("create table table_b (i int, j int) using parquet") spark.sql("create table table_c (a int, b int) using parquet") val df = spark.sql(""" with df1 as (select * from table_a cross join table_b) select * from df1 join table_c on a = x and b = i """) ``` here's what the DataFrame thinks: ``` scala> df.printSchema root |-- x: integer (nullable = true) |-- y: integer (nullable = true) |-- i: integer (nullable = true) |-- j: integer (nullable = true) |-- a: integer (nullable = true) |-- b: integer (nullable = true) ``` here's what the optimized plan thinks, after join reordering: ``` scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}")) |-- x: integer |-- y: integer |-- a: integer |-- b: integer |-- i: integer |-- j: integer ``` If we exclude the `ReorderJoin` rule (using Spark 2.4's optimizer rule exclusion feature), it's back to normal: ``` scala> spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ReorderJoin") scala> val df = spark.sql("with df1 as (select * from table_a cross join table_b) select * from df1 join table_c on a = x and b = i") df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields] scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}")) |-- x: integer |-- y: integer |-- i: integer |-- j: integer |-- a: integer |-- b: integer ``` Note that this output attribute ordering problem leads to data corruption, and can manifest itself in various symptoms: * Silently corrupting data, if the reordered columns happen to either have matching types or have sufficiently-compatible types (e.g. all fixed length primitive types are considered as "sufficiently compatible" in an `UnsafeRow`), then only the resulting data is going to be wrong but it might not trigger any alarms immediately. Or * Weird Java-level exceptions like `java.lang.NegativeArraySizeException`, or even SIGSEGVs. Added new unit test in `JoinReorderSuite` and new end-to-end test in `JoinSuite`. Also made `JoinReorderSuite` and `StarJoinReorderSuite` assert more strongly on maintaining output attribute order. Closes #23303 from rednaxelafx/fix-join-reorder. Authored-by: Kris Mok Signed-off-by: Wenchen Fan (cherry picked from commit 56448c662398f4c5319a337e6601450270a6a27c) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/optimizer/joins.scala | 11 +++++++++-- .../catalyst/optimizer/JoinOptimizationSuite.scala | 3 +++ .../scala/org/apache/spark/sql/JoinSuite.scala | 14 ++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 2722c9eb57551..390c0ffcf51fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -84,9 +84,16 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case j @ ExtractFiltersAndInnerJoins(input, conditions) + case p @ ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => - createOrderedJoin(input, conditions) + val reordered = createOrderedJoin(input, conditions) + if (p.sameOutput(reordered)) { + reordered + } else { + // Reordering the joins have changed the order of the columns. + // Inject a projection to make sure we restore to the expected ordering. + Project(p.output, reordered) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 087718b3ecf1a..06defd172da21 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -104,16 +104,19 @@ class JoinOptimizationSuite extends PlanTest { x.join(y).join(z).where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)), x.join(z, condition = Some("x.b".attr === "z.b".attr)) .join(y, condition = Some("y.d".attr === "z.a".attr)) + .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*) ), ( x.join(y, Cross).join(z, Cross) .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)), x.join(z, Cross, Some("x.b".attr === "z.b".attr)) .join(y, Cross, Some("y.d".attr === "z.a".attr)) + .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*) ), ( x.join(y, Inner).join(z, Cross).where("x.b".attr === "z.a".attr), x.join(z, Cross, Some("x.b".attr === "z.a".attr)).join(y, Inner) + .select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*) ) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 6c07b1aafc3bc..07a02f57821bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -604,4 +604,18 @@ class JoinSuite extends QueryTest with SharedSQLContext { cartesianQueries.foreach(checkCartesianDetection) } + + test("SPARK-26352: join reordering should not change the order of columns") { + withTable("tab1", "tab2", "tab3") { + spark.sql("select 1 as x, 100 as y").write.saveAsTable("tab1") + spark.sql("select 42 as i, 200 as j").write.saveAsTable("tab2") + spark.sql("select 1 as a, 42 as b").write.saveAsTable("tab3") + + val df = spark.sql(""" + with tmp as (select * from tab1 cross join tab2) + select * from tmp join tab3 on a = x and b = i + """) + checkAnswer(df, Row(1, 100, 42, 200, 1, 42)) + } + } }