-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22944][SQL] improve FoldablePropagation #20139
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -506,18 +506,21 @@ object NullPropagation extends Rule[LogicalPlan] { | |
|
||
|
||
/** | ||
* Propagate foldable expressions: | ||
* Replace attributes with aliases of the original foldable expressions if possible. | ||
* Other optimizations will take advantage of the propagated foldable expressions. | ||
* | ||
* Other optimizations will take advantage of the propagated foldable expressions. For example, | ||
* This rule can optimize | ||
* {{{ | ||
* SELECT 1.0 x, 'abc' y, Now() z ORDER BY x, y, 3 | ||
* ==> SELECT 1.0 x, 'abc' y, Now() z ORDER BY 1.0, 'abc', Now() | ||
* }}} | ||
* to | ||
* {{{ | ||
* SELECT 1.0 x, 'abc' y, Now() z ORDER BY 1.0, 'abc', Now() | ||
* }}} | ||
* and other rules can further optimize it and remove the ORDER BY operator. | ||
*/ | ||
object FoldablePropagation extends Rule[LogicalPlan] { | ||
def apply(plan: LogicalPlan): LogicalPlan = { | ||
val foldableMap = AttributeMap(plan.flatMap { | ||
var foldableMap = AttributeMap(plan.flatMap { | ||
case Project(projectList, _) => projectList.collect { | ||
case a: Alias if a.child.foldable => (a.toAttribute, a) | ||
} | ||
|
@@ -530,38 +533,50 @@ object FoldablePropagation extends Rule[LogicalPlan] { | |
if (foldableMap.isEmpty) { | ||
plan | ||
} else { | ||
var stop = false | ||
CleanupAliases(plan.transformUp { | ||
// A leaf node should not stop the folding process (note that we are traversing up the | ||
// tree, starting at the leaf nodes); so we are allowing it. | ||
case l: LeafNode => | ||
l | ||
|
||
// We can only propagate foldables for a subset of unary nodes. | ||
case u: UnaryNode if !stop && canPropagateFoldables(u) => | ||
case u: UnaryNode if canPropagateFoldables(u) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For all these cases, how about adding another condition There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah good catch! |
||
u.transformExpressions(replaceFoldable) | ||
|
||
// Allow inner joins. We do not allow outer join, although its output attributes are | ||
// derived from its children, they are actually different attributes: the output of outer | ||
// join is not always picked from its children, but can also be null. | ||
// Join derives the output attributes from its child while they are actually not the | ||
// same attributes. For example, the output of outer join is not always picked from its | ||
// children, but can also be null. We should exclude these miss-derived attributes when | ||
// propagating the foldable expressions. | ||
// TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes | ||
// of outer join. | ||
case j @ Join(_, _, Inner, _) if !stop => | ||
j.transformExpressions(replaceFoldable) | ||
case j @ Join(left, right, joinType, _) => | ||
val newJoin = j.transformExpressions(replaceFoldable) | ||
val missDerivedAttrsSet: AttributeSet = AttributeSet(joinType match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How about renaming There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to keep the word |
||
case _: InnerLike | LeftExistence(_) => Nil | ||
case LeftOuter => right.output | ||
case RightOuter => left.output | ||
case FullOuter => left.output ++ right.output | ||
}) | ||
foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { | ||
case (attr, _) => missDerivedAttrsSet.contains(attr) | ||
}.toSeq) | ||
newJoin | ||
|
||
// We can fold the projections an expand holds. However expand changes the output columns | ||
// and often reuses the underlying attributes; so we cannot assume that a column is still | ||
// foldable after the expand has been applied. | ||
// Similar to Join, Expand also miss-derives output attributes from child attributes, we | ||
// should exclude them when propagating. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After #12496, the above statement is still true? |
||
// TODO(hvanhovell): Expand should use new attributes as the output attributes. | ||
case expand: Expand if !stop => | ||
case expand: Expand => | ||
val newExpand = expand.copy(projections = expand.projections.map { projection => | ||
projection.map(_.transform(replaceFoldable)) | ||
}) | ||
stop = true | ||
val missDerivedAttrsSet = expand.child.outputSet | ||
foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { | ||
case (attr, _) => missDerivedAttrsSet.contains(attr) | ||
}.toSeq) | ||
newExpand | ||
|
||
// For other plans, they are not safe to apply foldable propagation, and they should not | ||
// propagate foldable expressions from children. | ||
case other => | ||
stop = true | ||
val childrenOutputSet = AttributeSet(other.children.flatMap(_.output)) | ||
foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we directly set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can't as it may not be empty. We may have new attributes produced by operators above those unsupported operators. |
||
case (attr, _) => childrenOutputSet.contains(attr) | ||
}.toSeq) | ||
other | ||
}) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -161,4 +161,23 @@ class FoldablePropagationSuite extends PlanTest { | |
val correctAnswer = correctExpand.where(a1.isNotNull).select(a1, a2).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
|
||
test("Propagate above outer join") { | ||
val left = LocalRelation('a.int).select('a, Literal(1).as('b)) | ||
val right = LocalRelation('c.int).select('c, Literal(1).as("d")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit. "d" -> 'd |
||
|
||
val join = left.join( | ||
right, | ||
joinType = LeftOuter, | ||
condition = Some('a === 'c && 'b === 'd)) | ||
val query = join.select(('b + 3).as('res)).analyze | ||
val optimized = Optimize.execute(query) | ||
|
||
val correctAnswer = left.join( | ||
right, | ||
joinType = LeftOuter, | ||
condition = Some('a === 'c && Literal(1) === Literal(1))) | ||
.select((Literal(1) + 3).as('res)).analyze | ||
comparePlans(optimized, correctAnswer) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
->this