From b4787f70c0339f5a7a3b70c7a53ea03ea9043c7a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 3 Jan 2018 16:39:33 +0800 Subject: [PATCH 1/3] improve FoldablePropagation --- .../sql/catalyst/optimizer/expressions.scala | 61 ++++++++++++------- .../optimizer/FoldablePropagationSuite.scala | 19 ++++++ 2 files changed, 57 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 7d830bbb7dc32..f3dd35e7aacc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -506,18 +506,21 @@ object NullPropagation extends Rule[LogicalPlan] { /** - * Propagate foldable expressions: * Replace attributes with aliases of the original foldable expressions if possible. - * Other optimizations will take advantage of the propagated foldable expressions. - * + * Other optimizations will take advantage of the propagated foldable expressions. For example, + * This rule can optimize * {{{ * SELECT 1.0 x, 'abc' y, Now() z ORDER BY x, y, 3 - * ==> SELECT 1.0 x, 'abc' y, Now() z ORDER BY 1.0, 'abc', Now() * }}} + * to + * {{{ + * SELECT 1.0 x, 'abc' y, Now() z ORDER BY 1.0, 'abc', Now() + * }}} + * and other rules can further optimize it and remove the ORDER BY operator. */ object FoldablePropagation extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { - val foldableMap = AttributeMap(plan.flatMap { + var foldableMap = AttributeMap(plan.flatMap { case Project(projectList, _) => projectList.collect { case a: Alias if a.child.foldable => (a.toAttribute, a) } @@ -530,38 +533,50 @@ object FoldablePropagation extends Rule[LogicalPlan] { if (foldableMap.isEmpty) { plan } else { - var stop = false CleanupAliases(plan.transformUp { - // A leaf node should not stop the folding process (note that we are traversing up the - // tree, starting at the leaf nodes); so we are allowing it. - case l: LeafNode => - l - // We can only propagate foldables for a subset of unary nodes. - case u: UnaryNode if !stop && canPropagateFoldables(u) => + case u: UnaryNode if canPropagateFoldables(u) => u.transformExpressions(replaceFoldable) - // Allow inner joins. We do not allow outer join, although its output attributes are - // derived from its children, they are actually different attributes: the output of outer - // join is not always picked from its children, but can also be null. + // Join derives the output attributes from its child while they are actually not the + // same attributes. For example, the output of outer join is not always picked from its + // children, but can also be null. We should exclude these miss-derived attributes when + // propagating the foldable expressions. // TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes // of outer join. - case j @ Join(_, _, Inner, _) if !stop => - j.transformExpressions(replaceFoldable) + case j @ Join(left, right, joinType, _) => + val newJoin = j.transformExpressions(replaceFoldable) + val missDerivedAttrsSet: AttributeSet = AttributeSet(joinType match { + case _: InnerLike | LeftExistence(_) => Nil + case LeftOuter => right.output + case RightOuter => left.output + case FullOuter => left.output ++ right.output + }) + foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { + case (attr, _) => missDerivedAttrsSet.contains(attr) + }.toSeq) + newJoin - // We can fold the projections an expand holds. However expand changes the output columns - // and often reuses the underlying attributes; so we cannot assume that a column is still - // foldable after the expand has been applied. + // Similar to Join, Expand also miss-derives output attributes from child attributes, we + // should exclude them when propagating. // TODO(hvanhovell): Expand should use new attributes as the output attributes. - case expand: Expand if !stop => + case expand: Expand => val newExpand = expand.copy(projections = expand.projections.map { projection => projection.map(_.transform(replaceFoldable)) }) - stop = true + val missDerivedAttrsSet = expand.child.outputSet + foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { + case (attr, _) => missDerivedAttrsSet.contains(attr) + }.toSeq) newExpand + // For other plans, they are not safe to apply foldable propagation, and they should not + // propagate foldable expressions from children. case other => - stop = true + val childrenOutputSet = AttributeSet(other.children.flatMap(_.output)) + foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { + case (attr, _) => childrenOutputSet.contains(attr) + }.toSeq) other }) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala index dccb32f0379a8..c7c4effccda25 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala @@ -161,4 +161,23 @@ class FoldablePropagationSuite extends PlanTest { val correctAnswer = correctExpand.where(a1.isNotNull).select(a1, a2).analyze comparePlans(optimized, correctAnswer) } + + test("Propagate above outer join") { + val left = LocalRelation('a.int).select('a, Literal(1).as('b)) + val right = LocalRelation('c.int).select('c, Literal(1).as("d")) + + val join = left.join( + right, + joinType = LeftOuter, + condition = Some('a === 'c && 'b === 'd)) + val query = join.select(('b + 3).as('res)).analyze + val optimized = Optimize.execute(query) + + val correctAnswer = left.join( + right, + joinType = LeftOuter, + condition = Some('a === 'c && Literal(1) === Literal(1))) + .select((Literal(1) + 3).as('res)).analyze + comparePlans(optimized, correctAnswer) + } } From 661e5d918225e6fe639ec72996b09227b03a9ea1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 4 Jan 2018 00:44:45 +0800 Subject: [PATCH 2/3] address comments --- .../sql/catalyst/optimizer/expressions.scala | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index f3dd35e7aacc1..1c0b7bd806801 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -508,7 +508,7 @@ object NullPropagation extends Rule[LogicalPlan] { /** * Replace attributes with aliases of the original foldable expressions if possible. * Other optimizations will take advantage of the propagated foldable expressions. For example, - * This rule can optimize + * this rule can optimize * {{{ * SELECT 1.0 x, 'abc' y, Now() z ORDER BY x, y, 3 * }}} @@ -535,7 +535,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { } else { CleanupAliases(plan.transformUp { // We can only propagate foldables for a subset of unary nodes. - case u: UnaryNode if canPropagateFoldables(u) => + case u: UnaryNode if foldableMap.nonEmpty && canPropagateFoldables(u) => u.transformExpressions(replaceFoldable) // Join derives the output attributes from its child while they are actually not the @@ -544,7 +544,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { // propagating the foldable expressions. // TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes // of outer join. - case j @ Join(left, right, joinType, _) => + case j @ Join(left, right, joinType, _) if foldableMap.nonEmpty => val newJoin = j.transformExpressions(replaceFoldable) val missDerivedAttrsSet: AttributeSet = AttributeSet(joinType match { case _: InnerLike | LeftExistence(_) => Nil @@ -557,22 +557,16 @@ object FoldablePropagation extends Rule[LogicalPlan] { }.toSeq) newJoin - // Similar to Join, Expand also miss-derives output attributes from child attributes, we - // should exclude them when propagating. - // TODO(hvanhovell): Expand should use new attributes as the output attributes. - case expand: Expand => - val newExpand = expand.copy(projections = expand.projections.map { projection => + // We can not replace the attributes in `Expand.output`. If there are other non-leaf + // operators that have the `output` field, we should put them here too. + case expand: Expand if foldableMap.nonEmpty => + expand.copy(projections = expand.projections.map { projection => projection.map(_.transform(replaceFoldable)) }) - val missDerivedAttrsSet = expand.child.outputSet - foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { - case (attr, _) => missDerivedAttrsSet.contains(attr) - }.toSeq) - newExpand // For other plans, they are not safe to apply foldable propagation, and they should not // propagate foldable expressions from children. - case other => + case other if foldableMap.nonEmpty => val childrenOutputSet = AttributeSet(other.children.flatMap(_.output)) foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { case (attr, _) => childrenOutputSet.contains(attr) From df75bffa1c27c2b70495a1b2c8988f3a1c979918 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 4 Jan 2018 10:06:11 +0800 Subject: [PATCH 3/3] address comments --- .../sql/catalyst/optimizer/FoldablePropagationSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala index c7c4effccda25..c28844642aed0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala @@ -147,8 +147,8 @@ class FoldablePropagationSuite extends PlanTest { test("Propagate in expand") { val c1 = Literal(1).as('a) val c2 = Literal(2).as('b) - val a1 = c1.toAttribute.withNullability(true) - val a2 = c2.toAttribute.withNullability(true) + val a1 = c1.toAttribute.newInstance().withNullability(true) + val a2 = c2.toAttribute.newInstance().withNullability(true) val expand = Expand( Seq(Seq(Literal(null), 'b), Seq('a, Literal(null))), Seq(a1, a2), @@ -164,7 +164,7 @@ class FoldablePropagationSuite extends PlanTest { test("Propagate above outer join") { val left = LocalRelation('a.int).select('a, Literal(1).as('b)) - val right = LocalRelation('c.int).select('c, Literal(1).as("d")) + val right = LocalRelation('c.int).select('c, Literal(1).as('d)) val join = left.join( right,