Skip to content

Commit

Permalink
[SPARK-25368][SQL] Incorrect predicate pushdown returns wrong result
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
How to reproduce:
```scala
val df1 = spark.createDataFrame(Seq(
   (1, 1)
)).toDF("a", "b").withColumn("c", lit(null).cast("int"))
val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull)
df2.show

+---+---+----+---+
|  a|  b|   c|  d|
+---+---+----+---+
|  1|  1|null|  0|
|  1|  1|null|  1|
+---+---+----+---+
```
`filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before #19201, but it is transformed to `(c#10 = null)` since #20155. This pr revert it to `(null <=> c#10)` to fix this issue.

## How was this patch tested?

unit tests

Closes #22368 from wangyum/SPARK-25368.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
(cherry picked from commit 77c9964)
Signed-off-by: gatorsmile <[email protected]>
  • Loading branch information
wangyum authored and gatorsmile committed Sep 9, 2018
1 parent 6b7ea78 commit c1c1bda
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ abstract class UnaryNode extends LogicalPlan {
var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
projectList.foreach {
case a @ Alias(l: Literal, _) =>
allConstraints += EqualTo(a.toAttribute, l)
allConstraints += EqualNullSafe(a.toAttribute, l)
case a @ Alias(e, _) =>
// For every alias in `projectList`, replace the reference in constraints by its attribute.
allConstraints ++= allConstraints.map(_ transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {

test("constraints should be inferred from aliased literals") {
val originalLeft = testRelation.subquery('left).as("left")
val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a === 2).as("left")
val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a <=> 2).as("left")

val right = Project(Seq(Literal(2).as("two")), testRelation.subquery('right)).as("right")
val condition = Some("left.a".attr === "right.two".attr)
Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2552,4 +2552,21 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-25368 Incorrect predicate pushdown returns wrong result") {
def check(newCol: Column, filter: Column, result: Seq[Row]): Unit = {
val df1 = spark.createDataFrame(Seq(
(1, 1)
)).toDF("a", "b").withColumn("c", newCol)

val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter(filter)
checkAnswer(df2, result)
}

check(lit(null).cast("int"), $"c".isNull, Seq(Row(1, 1, null, 0), Row(1, 1, null, 1)))
check(lit(null).cast("int"), $"c".isNotNull, Seq())
check(lit(2).cast("int"), $"c".isNull, Seq())
check(lit(2).cast("int"), $"c".isNotNull, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
check(lit(2).cast("int"), $"c" === 2, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1)))
check(lit(2).cast("int"), $"c" =!= 2, Seq())
}
}

0 comments on commit c1c1bda

Please sign in to comment.