Skip to content

Commit

Permalink
[SPARK-25691][SQL] Use semantic equality in order to compare attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Oct 13, 2018
1 parent 2eaf058 commit 87fc62a
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ class Analyzer(
if (!s.resolved || s.missingInput.nonEmpty) && child.resolved =>
val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child)
val ordering = newOrder.map(_.asInstanceOf[SortOrder])
if (child.output == newChild.output) {
if (child.sameOutput(newChild)) {
s.copy(order = ordering)
} else {
// Add missing attributes and then project them away.
Expand All @@ -1189,7 +1189,7 @@ class Analyzer(

case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved =>
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child)
if (child.output == newChild.output) {
if (child.sameOutput(newChild)) {
f.copy(condition = newCond.head)
} else {
// Add missing attributes and then project them away.
Expand Down Expand Up @@ -2087,7 +2087,7 @@ class Analyzer(
// todo: It's hard to write a general rule to pull out nondeterministic expressions
// from LogicalPlan, currently we only do it for UnaryNode which has same output
// schema with its child.
case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) =>
case p: UnaryNode if p.sameOutput(p.child) && p.expressions.exists(!_.deterministic) =>
val nondeterToAttr = getNondeterToAttr(p.expressions)
val newPlan = p.transformExpressions { case e =>
nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.sql.internal.SQLConf
*/
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case v @ View(desc, output, child) if child.resolved && output != child.output =>
case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
Expand All @@ -70,7 +70,7 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp
}
// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
case (attr, originAttr) if attr != originAttr =>
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output attribute.
// Will throw an AnalysisException if the cast can't perform or might truncate.
Expand Down Expand Up @@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, output, child) =>
assert(output == child.output,
case v @ View(_, output, child) =>
assert(v.sameOutput(child),
s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
s"view output ${output.mkString("[", ",", "]")}")
child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
*/
object RemoveRedundantProject extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case p @ Project(_, child) if p.output == child.output => child
case p @ Project(_, child) if p.sameOutput(child) => child
}
}

Expand Down Expand Up @@ -530,9 +530,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper
* p2 is usually inserted by this rule and useless, p1 could prune the columns anyway.
*/
object ColumnPruning extends Rule[LogicalPlan] {
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform {
// Prunes the unused columns from project list of Project/Aggregate/Expand
Expand Down Expand Up @@ -607,7 +604,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
case w: Window if w.windowExpressions.isEmpty => w.child

// Eliminate no-op Projects
case p @ Project(_, child) if sameOutput(child.output, p.output) => child
case p @ Project(_, child) if child.sameOutput(p) => child

// Can't prune the columns on LeafNode
case p @ Project(_, _: LeafNode) => p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ abstract class LogicalPlan
* Returns the output ordering that this plan generates.
*/
def outputOrdering: Seq[SortOrder] = Nil

/**
* Returns true iff `other`'s output is semantically the same, ie.:
* - it contains the same number of `Attribute`s;
* - references are the same;
* - the order is equal too.
*/
def sameOutput(other: LogicalPlan): Boolean = {
val thisOutput = this.output
val otherOutput = other.output
thisOutput.length == otherOutput.length && thisOutput.zip(otherOutput).forall {
case (a1, a2) => a1.semanticEquals(a2)
}
}
}

/**
Expand Down

0 comments on commit 87fc62a

Please sign in to comment.