Skip to content

Commit

Permalink
[SPARK-26352][SQL] join reorder should not change the order of output…
Browse files Browse the repository at this point in the history
… attributes

The optimizer rule `org.apache.spark.sql.catalyst.optimizer.ReorderJoin` performs join reordering on inner joins. This was introduced from SPARK-12032 (apache#10073) in 2015-12.

After it had reordered the joins, though, it didn't check whether or not the output attribute order is still the same as before. Thus, it's possible to have a mismatch between the reordered output attributes order vs the schema that a DataFrame thinks it has.
The same problem exists in the CBO version of join reordering (`CostBasedJoinReorder`) too.

This can be demonstrated with the example:
```scala
spark.sql("create table table_a (x int, y int) using parquet")
spark.sql("create table table_b (i int, j int) using parquet")
spark.sql("create table table_c (a int, b int) using parquet")
val df = spark.sql("""
  with df1 as (select * from table_a cross join table_b)
  select * from df1 join table_c on a = x and b = i
""")
```
here's what the DataFrame thinks:
```
scala> df.printSchema
root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- i: integer (nullable = true)
 |-- j: integer (nullable = true)
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
```
here's what the optimized plan thinks, after join reordering:
```
scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
|-- x: integer
|-- y: integer
|-- a: integer
|-- b: integer
|-- i: integer
|-- j: integer
```

If we exclude the `ReorderJoin` rule (using Spark 2.4's optimizer rule exclusion feature), it's back to normal:
```
scala> spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ReorderJoin")

scala> val df = spark.sql("with df1 as (select * from table_a cross join table_b) select * from df1 join table_c on a = x and b = i")
df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields]

scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- ${a.name}: ${a.dataType.typeName}"))
|-- x: integer
|-- y: integer
|-- i: integer
|-- j: integer
|-- a: integer
|-- b: integer
```

Note that this output attribute ordering problem leads to data corruption, and can manifest itself in various symptoms:
* Silently corrupting data, if the reordered columns happen to either have matching types or have sufficiently-compatible types (e.g. all fixed length primitive types are considered as "sufficiently compatible" in an `UnsafeRow`), then only the resulting data is going to be wrong but it might not trigger any alarms immediately. Or
* Weird Java-level exceptions like `java.lang.NegativeArraySizeException`, or even SIGSEGVs.

Added new unit test in `JoinReorderSuite` and new end-to-end test in `JoinSuite`.
Also made `JoinReorderSuite` and `StarJoinReorderSuite` assert more strongly on maintaining output attribute order.

Closes apache#23303 from rednaxelafx/fix-join-reorder.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 56448c6)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
rednaxelafx authored and sumwale committed Aug 14, 2021
1 parent 304afa3 commit 9c45cb9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,16 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ ExtractFiltersAndInnerJoins(input, conditions)
case p @ ExtractFiltersAndInnerJoins(input, conditions)
if input.size > 2 && conditions.nonEmpty =>
createOrderedJoin(input, conditions)
val reordered = createOrderedJoin(input, conditions)
if (p.sameOutput(reordered)) {
reordered
} else {
// Reordering the joins have changed the order of the columns.
// Inject a projection to make sure we restore to the expected ordering.
Project(p.output, reordered)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,19 @@ class JoinOptimizationSuite extends PlanTest {
x.join(y).join(z).where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
x.join(z, condition = Some("x.b".attr === "z.b".attr))
.join(y, condition = Some("y.d".attr === "z.a".attr))
.select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*)
),
(
x.join(y, Cross).join(z, Cross)
.where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
x.join(z, Cross, Some("x.b".attr === "z.b".attr))
.join(y, Cross, Some("y.d".attr === "z.a".attr))
.select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*)
),
(
x.join(y, Inner).join(z, Cross).where("x.b".attr === "z.a".attr),
x.join(z, Cross, Some("x.b".attr === "z.a".attr)).join(y, Inner)
.select(Seq("x.a", "x.b", "x.c", "y.d", "z.a", "z.b", "z.c").map(_.attr): _*)
)
)

Expand Down
14 changes: 14 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -604,4 +604,18 @@ class JoinSuite extends QueryTest with SharedSQLContext {

cartesianQueries.foreach(checkCartesianDetection)
}

test("SPARK-26352: join reordering should not change the order of columns") {
withTable("tab1", "tab2", "tab3") {
spark.sql("select 1 as x, 100 as y").write.saveAsTable("tab1")
spark.sql("select 42 as i, 200 as j").write.saveAsTable("tab2")
spark.sql("select 1 as a, 42 as b").write.saveAsTable("tab3")

val df = spark.sql("""
with tmp as (select * from tab1 cross join tab2)
select * from tmp join tab3 on a = x and b = i
""")
checkAnswer(df, Row(1, 100, 42, 200, 1, 42))
}
}
}

0 comments on commit 9c45cb9

Please sign in to comment.