From c1c1bda3cecd82a926526e5e5ee24d9909cb7e49 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 9 Sep 2018 09:07:31 -0700 Subject: [PATCH] [SPARK-25368][SQL] Incorrect predicate pushdown returns wrong result ## What changes were proposed in this pull request? How to reproduce: ```scala val df1 = spark.createDataFrame(Seq( (1, 1) )).toDF("a", "b").withColumn("c", lit(null).cast("int")) val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull) df2.show +---+---+----+---+ | a| b| c| d| +---+---+----+---+ | 1| 1|null| 0| | 1| 1|null| 1| +---+---+----+---+ ``` `filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before https://github.com/apache/spark/pull/19201, but it is transformed to `(c#10 = null)` since https://github.com/apache/spark/pull/20155. This pr revert it to `(null <=> c#10)` to fix this issue. ## How was this patch tested? unit tests Closes #22368 from wangyum/SPARK-25368. Authored-by: Yuming Wang Signed-off-by: gatorsmile (cherry picked from commit 77c996403d5c761f0dfea64c5b1cb7480ba1d3ac) Signed-off-by: gatorsmile --- .../catalyst/plans/logical/LogicalPlan.scala | 2 +- .../InferFiltersFromConstraintsSuite.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 17 +++++++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0e4456ac0e6a9..5f136629eb15b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -159,7 +159,7 @@ abstract class UnaryNode extends LogicalPlan { var allConstraints = child.constraints.asInstanceOf[Set[Expression]] projectList.foreach { case a @ Alias(l: Literal, _) => - allConstraints += EqualTo(a.toAttribute, l) + allConstraints += EqualNullSafe(a.toAttribute, l) case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. allConstraints ++= allConstraints.map(_ transform { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index e4671f0d1cce6..a40ba2dc38b70 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -196,7 +196,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { test("constraints should be inferred from aliased literals") { val originalLeft = testRelation.subquery('left).as("left") - val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a === 2).as("left") + val optimizedLeft = testRelation.subquery('left).where(IsNotNull('a) && 'a <=> 2).as("left") val right = Project(Seq(Literal(2).as("two")), testRelation.subquery('right)).as("right") val condition = Some("left.a".attr === "right.two".attr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 45b17b3d4958f..435b887cb3c78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2552,4 +2552,21 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-25368 Incorrect predicate pushdown returns wrong result") { + def check(newCol: Column, filter: Column, result: Seq[Row]): Unit = { + val df1 = spark.createDataFrame(Seq( + (1, 1) + )).toDF("a", "b").withColumn("c", newCol) + + val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter(filter) + checkAnswer(df2, result) + } + + check(lit(null).cast("int"), $"c".isNull, Seq(Row(1, 1, null, 0), Row(1, 1, null, 1))) + check(lit(null).cast("int"), $"c".isNotNull, Seq()) + check(lit(2).cast("int"), $"c".isNull, Seq()) + check(lit(2).cast("int"), $"c".isNotNull, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1))) + check(lit(2).cast("int"), $"c" === 2, Seq(Row(1, 1, 2, 0), Row(1, 1, 2, 1))) + check(lit(2).cast("int"), $"c" =!= 2, Seq()) + } }