Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25691][SQL] Use semantic equality in AliasViewChild in order to compare attributes #22713

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,28 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
* Trivial [[Analyzer]]s with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
* Used for testing when all relations are already filled in and the analyzer needs only
* to resolve attribute references.
*/
object SimpleAnalyzer extends Analyzer(
sealed class BaseSimpleAnalyzer(caseSensitive: Boolean) extends Analyzer(
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) {
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean) {}
},
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive))

/**
* A trivial analyzer which use case sensitive resolution.
*/
object SimpleAnalyzer extends BaseSimpleAnalyzer(true)

/**
* A trivial analyzer which use case insensitive resolution.
*/
object SimpleCaseInsensitiveAnalyzer extends BaseSimpleAnalyzer(false)

/**
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
Expand Down Expand Up @@ -1179,7 +1189,7 @@ class Analyzer(
if (!s.resolved || s.missingInput.nonEmpty) && child.resolved =>
val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child)
val ordering = newOrder.map(_.asInstanceOf[SortOrder])
if (child.output == newChild.output) {
if (child.sameOutput(newChild)) {
s.copy(order = ordering)
} else {
// Add missing attributes and then project them away.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.sql.internal.SQLConf
*/
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case v @ View(desc, output, child) if child.resolved && output != child.output =>
case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
Expand All @@ -70,7 +70,7 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp
}
// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
case (attr, originAttr) if attr != originAttr =>
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output attribute.
// Will throw an AnalysisException if the cast can't perform or might truncate.
Expand Down Expand Up @@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, output, child) =>
assert(output == child.output,
case v @ View(_, output, child) =>
assert(v.sameOutput(child),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to come out a test case showing previous comparing is problematic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding a test case. BTW does it impact end-users? If it does we need to backport it to 2.4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we can test this in a way different from running @maryannxue's checks. I'll try to find one in the next days. As of now, I have no evidence that this impacts end-users. If I'll find out a case, I'll notify you. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the check and I found a case which may be considered as a bug (not sure honestly, it is a weird situation which I think might occur, but it a bad condition, which we may want to handle differently).

Currently the rule doesn't work well when the output of the view and the output of its child differs because of some nullable. You can find an example in the UT I added, where the view has all the output attributes as nullable, while the child has one as not-nullable. In this case, we are currently failing with an exception in the optimizer rule EliminateView. After the change, the plan is correctly created.

s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
s"view output ${output.mkString("[", ",", "]")}")
child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ package object dsl {
def analyze: LogicalPlan =
EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))

def analyzeCaseInsensitive: LogicalPlan =
EliminateSubqueryAliases(analysis.SimpleCaseInsensitiveAnalyzer.execute(logicalPlan))

def hint(name: String, parameters: Any*): LogicalPlan =
UnresolvedHint(name, parameters, logicalPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
*/
object RemoveRedundantProject extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case p @ Project(_, child) if p.output == child.output => child
case p @ Project(_, child) if p.sameOutput(child) => child
}
}

Expand Down Expand Up @@ -530,9 +530,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper
* p2 is usually inserted by this rule and useless, p1 could prune the columns anyway.
*/
object ColumnPruning extends Rule[LogicalPlan] {
private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))

def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform {
// Prunes the unused columns from project list of Project/Aggregate/Expand
Expand Down Expand Up @@ -607,7 +604,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
case w: Window if w.windowExpressions.isEmpty => w.child

// Eliminate no-op Projects
case p @ Project(_, child) if sameOutput(child.output, p.output) => child
case p @ Project(_, child) if child.sameOutput(p) => child

// Can't prune the columns on LeafNode
case p @ Project(_, _: LeafNode) => p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ abstract class LogicalPlan
* Returns the output ordering that this plan generates.
*/
def outputOrdering: Seq[SortOrder] = Nil

/**
* Returns true iff `other`'s output is semantically the same, ie.:
* - it contains the same number of `Attribute`s;
* - references are the same;
* - the order is equal too.
*/
def sameOutput(other: LogicalPlan): Boolean = {
val thisOutput = this.output
val otherOutput = other.output
thisOutput.length == otherOutput.length && thisOutput.zip(otherOutput).forall {
case (a1, a2) => a1.semanticEquals(a2)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,11 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper
val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze)
comparePlans(optimized, expected)
}

test("SPARK-25691: RemoveRedundantProject works also with different cases") {
val relation = LocalRelation('a.int, 'b.int)
val query = relation.select('A, 'b).analyzeCaseInsensitive
val optimized = Optimize.execute(query)
comparePlans(optimized, relation)
Copy link
Contributor

@cloud-fan cloud-fan Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using == on attributes is error-prone, but we should update then one-by-one, to narrow down the scope and make sure the change is reasonable.

For instance, I don't think this is a valid case. If we optimize it, the final schema field names will change, which is a breaking change if this plan is an input of a parquet writer. (the result parquet files will have a different schema)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for you comment. Then let me focus for this only to the view topic, we can open other tickets for each change later.

For instance, I don't think this is a valid case.

I see the concern about the possible breaking change, so I agree about not introducing this. My point is: then we are saying that Spark is never really case-insensitive, even though the case sensitive option is turned to false, isn't it? Shouldn't datasources write/read columns in a non-case-sensitive way when this flag is turned on?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark can be case-sensitive or not w.r.t. the config, but Spark should always be case-preserving.

}
}