Skip to content

Commit

Permalink
[SPARK-44431][SQL] Fix behavior of null IN (empty list) in optimizati…
Browse files Browse the repository at this point in the history
…on rules

### What changes were proposed in this pull request?
`null IN (empty list)` incorrectly evaluates to null, when it should evaluate to false. (The reason it should be false is because a IN (b1, b2) is defined as a = b1 OR a = b2, and an empty IN list is treated as an empty OR which is false. This is specified by ANSI SQL.)

Many places in Spark execution (In, InSet, InSubquery) and optimization (OptimizeIn, NullPropagation) implemented this wrong behavior. This is a longstanding correctness issue which has existed since null support for IN expressions was first added to Spark.

This PR fixes the optimization rules OptimizeIn and NullPropagation, which followed the preexisting, incorrect execution behavior. The execution fixes will be in the next PR.

The behavior is under a flag, which will be available to revert to the legacy behavior if needed. This flag is set to disable the new behavior until all of the fix PRs are complete.

See [this doc](https://docs.google.com/document/d/1k8AY8oyT-GI04SnP7eXttPDnDj-Ek-c3luF2zL6DPNU/edit) for more information.

### Why are the changes needed?
Fix wrong SQL semantics

### Does this PR introduce _any_ user-facing change?
Not yet, but will fix wrong SQL semantics when enabled

### How was this patch tested?
Add unit tests and sql tests.

Closes #42007 from jchen5/null-in-empty-opt.

Authored-by: Jack Chen <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
jchen5 authored and cloud-fan committed Jul 20, 2023
1 parent e0c79c6 commit db357ed
Show file tree
Hide file tree
Showing 9 changed files with 857 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeRepartition,
TransposeWindow,
NullPropagation,
// NullPropagation may introduce Exists subqueries, so RewriteNonCorrelatedExists must run
// after.
RewriteNonCorrelatedExists,
NullDownPropagation,
ConstantPropagation,
FoldablePropagation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.{AlwaysProcess, TreeNodeTag}
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -283,9 +284,13 @@ object OptimizeIn extends Rule[LogicalPlan] {
_.containsPattern(IN), ruleId) {
case q: LogicalPlan => q.transformExpressionsDownWithPruning(_.containsPattern(IN), ruleId) {
case In(v, list) if list.isEmpty =>
// When v is not nullable, the following expression will be optimized
// to FalseLiteral which is tested in OptimizeInSuite.scala
If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
if (!SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)) {
FalseLiteral
} else {
// Incorrect legacy behavior optimizes to null if the left side is null, and otherwise
// to false.
If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
}
case expr @ In(v, list) if expr.inSetConvertible =>
val newList = ExpressionSet(list).toSeq
if (newList.length == 1
Expand Down Expand Up @@ -841,9 +846,24 @@ object NullPropagation extends Rule[LogicalPlan] {
}
}

// If the value expression is NULL then transform the In expression to null literal.
case In(Literal(null, _), _) => Literal.create(null, BooleanType)
case InSubquery(Seq(Literal(null, _)), _) => Literal.create(null, BooleanType)
// If the list is empty, transform the In expression to false literal.
case In(_, list)
if list.isEmpty && !SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR) =>
Literal.create(false, BooleanType)
// If the value expression is NULL (and the list is non-empty), then transform the
// In expression to null literal.
// If the legacy flag is set, then it becomes null even if the list is empty (which is
// incorrect legacy behavior)
case In(Literal(null, _), list)
if list.nonEmpty || SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)
=> Literal.create(null, BooleanType)
case InSubquery(Seq(Literal(null, _)), _)
if SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR) =>
Literal.create(null, BooleanType)
case InSubquery(Seq(Literal(null, _)), ListQuery(sub, _, _, _, conditions, _))
if !SQLConf.get.getConf(SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR)
&& conditions.isEmpty =>
If(Exists(sub), Literal(null, BooleanType), FalseLiteral)

// Non-leaf NullIntolerant expressions will return null, if at least one of its children is
// a null literal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4289,6 +4289,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR =
buildConf("spark.sql.legacy.nullInEmptyListBehavior")
.internal()
.doc("When set to true, restores the legacy incorrect behavior of IN expressions for " +
"NULL values IN an empty list (including IN subqueries and literal IN lists): " +
"`null IN (empty list)` should evaluate to false, but sometimes (not always) " +
"incorrectly evaluates to null in the legacy behavior.")
.version("3.5.0")
.booleanConf
.createWithDefault(true)

val ERROR_MESSAGE_FORMAT = buildConf("spark.sql.error.messageFormat")
.doc("When PRETTY, the error message consists of textual representation of error class, " +
"message and query context. The MINIMAL and STANDARD formats are pretty JSON formats where " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD
import org.apache.spark.sql.internal.SQLConf.{LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR, OPTIMIZER_INSET_CONVERSION_THRESHOLD}
import org.apache.spark.sql.types._

class OptimizeInSuite extends PlanTest {
Expand Down Expand Up @@ -121,19 +121,43 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: NULL IN (subquery) gets transformed to Filter(null)") {
val subquery = ListQuery(testRelation.select(UnresolvedAttribute("a")))
val originalQuery =
testRelation
.where(InSubquery(Seq(Literal.create(null, NullType)), subquery))
.analyze
test("OptimizedIn test: Legacy behavior: " +
"NULL IN (subquery) gets transformed to Filter(null)") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
val subquery = ListQuery(testRelation.select(UnresolvedAttribute("a")))
val originalQuery =
testRelation
.where(InSubquery(Seq(Literal.create(null, NullType)), subquery))
.analyze

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where(Literal.create(null, BooleanType))
.analyze
comparePlans(optimized, correctAnswer)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where(Literal.create(null, BooleanType))
.analyze
comparePlans(optimized, correctAnswer)
}
}

test("OptimizedIn test: NULL IN (subquery) gets transformed to " +
"If(Exists(subquery), null, false)") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
val subquery = testRelation.select(UnresolvedAttribute("a"))
val originalQuery =
testRelation
.where(InSubquery(Seq(Literal.create(null, NullType)), ListQuery(subquery)))
.analyze

val optimized = Optimize.execute(originalQuery.analyze)
// Our simplified Optimize results in an extra redundant Project. This gets collapsed in
// the full optimizer.
val correctAnswer =
testRelation
.where(If(Exists(Project(Seq(UnresolvedAttribute("a")), subquery)),
Literal.create(null, BooleanType), Literal(false)))
.analyze
comparePlans(optimized, correctAnswer)
}
}

test("OptimizedIn test: Inset optimization disabled as " +
Expand Down Expand Up @@ -219,36 +243,108 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: In empty list gets transformed to FalseLiteral " +
"when value is not nullable") {
val originalQuery =
testRelation
.where(In(Literal("a"), Nil))
.analyze
test("OptimizedIn test: expr IN (empty list) gets transformed to literal false") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"), Nil))
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(Literal(false))
.analyze
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where(Literal.create(false, BooleanType))
.analyze

comparePlans(optimized, correctAnswer)
comparePlans(optimized, correctAnswer)
}
}

test("OptimizedIn test: In empty list gets transformed to `If` expression " +
"when value is nullable") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"), Nil))
.analyze
test("OptimizedIn test: null IN (empty list) gets transformed to literal false") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
val originalQuery =
testRelation
.where(In(Literal.create(null, NullType), Nil))
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(If(IsNotNull(UnresolvedAttribute("a")),
Literal(false), Literal.create(null, BooleanType)))
.analyze
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where(Literal.create(false, BooleanType))
.analyze

comparePlans(optimized, correctAnswer)
comparePlans(optimized, correctAnswer)
}
}

test("OptimizedIn test: expr IN (empty list) gets transformed to literal false in select") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
val originalQuery =
testRelation
.select(In(UnresolvedAttribute("a"), Nil).as("x"))
.analyze

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.select(Literal.create(false, BooleanType).as("x"))
.analyze

comparePlans(optimized, correctAnswer)
}
}

test("OptimizedIn test: null IN (empty list) gets transformed to literal false in select") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
val originalQuery =
testRelation
.select(In(Literal.create(null, NullType), Nil).as("x"))
.analyze

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.select(Literal.create(false, BooleanType).as("x"))
.analyze

comparePlans(optimized, correctAnswer)
}
}

test("OptimizedIn test: Legacy behavior: " +
"In empty list gets transformed to FalseLiteral when value is not nullable") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
val originalQuery =
testRelation
.where(In(Literal("a"), Nil))
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(Literal(false))
.analyze

comparePlans(optimized, correctAnswer)
}
}

test("OptimizedIn test: Legacy behavior: " +
"In empty list gets transformed to `If` expression when value is nullable") {
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"), Nil))
.analyze

val optimized = Optimize.execute(originalQuery)
val correctAnswer =
testRelation
.where(If(IsNotNull(UnresolvedAttribute("a")),
Literal(false), Literal.create(null, BooleanType)))
.analyze

comparePlans(optimized, correctAnswer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.internal.SQLConf.LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR
import org.apache.spark.sql.types.BooleanType

class ReplaceOperatorSuite extends PlanTest {
Expand Down Expand Up @@ -233,13 +234,24 @@ class ReplaceOperatorSuite extends PlanTest {
val basePlan = LocalRelation(Seq($"a".int, $"b".int))
val otherPlan = basePlan.where($"a".in(1, 2) || $"b".in())
val except = Except(basePlan, otherPlan, false)
val result = OptimizeIn(Optimize.execute(except.analyze))
val correctAnswer = Aggregate(basePlan.output, basePlan.output,
Filter(!Coalesce(Seq(
$"a".in(1, 2) || If($"b".isNotNull, Literal.FalseLiteral, Literal(null, BooleanType)),
Literal.FalseLiteral)),
basePlan)).analyze
comparePlans(result, correctAnswer)
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "false") {
val result = OptimizeIn(Optimize.execute(except.analyze))
val correctAnswer = Aggregate(basePlan.output, basePlan.output,
Filter(!Coalesce(Seq(
$"a".in(1, 2) || Literal.FalseLiteral,
Literal.FalseLiteral)),
basePlan)).analyze
comparePlans(result, correctAnswer)
}
withSQLConf(LEGACY_NULL_IN_EMPTY_LIST_BEHAVIOR.key -> "true") {
val result = OptimizeIn(Optimize.execute(except.analyze))
val correctAnswer = Aggregate(basePlan.output, basePlan.output,
Filter(!Coalesce(Seq(
$"a".in(1, 2) || If($"b".isNotNull, Literal.FalseLiteral, Literal(null, BooleanType)),
Literal.FalseLiteral)),
basePlan)).analyze
comparePlans(result, correctAnswer)
}
}

test("SPARK-26366: ReplaceExceptWithFilter should not transform non-deterministic") {
Expand Down
Loading

0 comments on commit db357ed

Please sign in to comment.