-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19993][SQL] Caching logical plans containing subquery expressions does not work. #17330
Conversation
Test build #74729 has finished for PR 17330 at commit
|
// TODO : improve the hashcode generation by considering the plan info. | ||
override def hashCode(): Int = { | ||
val state = Seq(children, this.getClass.getName) | ||
state.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we write this imperatively
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin Sure Reynold.
Test build #74772 has finished for PR 17330 at commit
|
retest this please |
Test build #74775 has finished for PR 17330 at commit
|
def canonicalize(e: SubqueryExpression, attrs: AttributeSeq): CanonicalizedSubqueryExpr = { | ||
// Normalize the outer references in the subquery plan. | ||
val subPlan = e.plan.transformAllExpressions { | ||
case o @ OuterReference(e) => BindReferences.bindReference(e, attrs, allowFailures = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: case OuterReference(r) => BindReferences.bindReference(r, attrs, allowFailures = true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Will change. Thanks.
|
||
override def equals(o: Any): Boolean = o match { | ||
case n: CanonicalizedSubqueryExpr => expr.semanticEquals(n.expr) | ||
case other => false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case CanonicalizedSubqueryExpr(e) => expr.semanticEquals(e)
case _ => false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Will change. Thanks.
|
||
/** | ||
* Clean the outer references by normalizing them to BindReference in the same way | ||
* we clean up the arguments during LogicalPlan.sameResult. This enables to compare two |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also replace SubqueryExpression
by CanonicalizedSubqueryExpr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile OK.
assert(getNumInMemoryRelations(cachedDs2) == 1) | ||
|
||
spark.catalog.cacheTable("t1") | ||
ds1.unpersist() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about splitting the test cases to multiple individual ones? The cache will be cleaned for each test case. This can avoid any extra checking, like assert(getNumInMemoryRelations(cachedMissDs) == 0)
or ds1.unpersist()
override def afterEach(): Unit = {
try {
spark.catalog.clearCache()
} finally {
super.afterEach()
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile Sure. will do.
Generally, it looks good to me. cc @hvanhovell @rxin @cloud-fan |
Test build #74813 has finished for PR 17330 at commit
|
// re-order them based on hashcode. | ||
// TODO : improve the hashcode generation by considering the plan info. | ||
override def hashCode(): Int = { | ||
val h = Objects.hashCode(expr.children) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use expr.semanticHash
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Hi wenchen, we are unable to use expr.semanticHash here. I had actually tried to do that. So when we canonicalize the expressions we are re-ordering them based on their hash code and that creates problem. I have a test case that has three subquery expressions (ScalarSubquery, ListQuery and Exists) and while re-ordering them the order becomes unpredictable. So in here when i generate the hashcode, i am generating them conservatively.
Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the reordering is unpredictable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan In my understanding, its because a SubqueryExpression contains a LogicalPlan and the logical plan has all sorts of expression ids that attributes to this unpredictability. In order to remove this, we may have to implement a canonicalized() on SubqueryExpression that also canonicalizes all the expressions on the subquery plan in a recursive fashion. I had marked it as a TODO in my comment. Please let me know if my understanding is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after canonicalize, the ordering should be deterministic, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Hi Wenchen, let me take an example and perhaps that will help us.
Lets assume there is a Filter operator that has 3 subquery expressions like following.
Filter Scalar-subquery(.., splan1,..) || Exists(.., splan2, ..) || In(.., ListQuery(.., splan3, ..)
- During sameResult on this plan, we perform this logic
- We clean the args from both source and target plans.
2.1 We change the subquery expression to CanonicalizedSubqueryExpression so that we can do a customized equals that basically does a semantic equals of two subquery expressions.
2.2 Additionally we change the subquery plan (splan1, splan2, splan3) to change the outer attribute references to BindReference. Since these are outer references the inner plan is not aware of these attributes and hence we fix them as part of canonicalize. After this, when the equals method is called on CanonicalizedSubqueryExpression here and the sameResult is called on splan1, splan2, splan3 it works
fine as the outer references have been normalized to BindReference. The local attributes referenced in the plan is handled in sameResult itself.
2.3 As part of cleanArgs, after cleaning the expressions we call canonicalized here. So here we try to re-order 3 CanonicalizedSubqueryExpressions on the basis of their hashCode. When i had the hashCode call expr.semanticHash(), i observed that the ordering of the expressions between source and target is not consistent. I debugged to find that expr.semanticHash() considers the subquery plans as they are the class arguments and since plans have arbitary expression ids (we have only normalized the outer references.. not all of them at this point). Thus, in the current implementation i am excluding the subplans while computing the hashcode and have marked it as a todo.
Please let me know your thoughts and let me know if i have misunderstood any thing.
* optimizer or planner never sees this expression type during transformation of | ||
* plans. | ||
*/ | ||
case class CanonicalizedSubqueryExpr(expr: SubqueryExpression) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this new abstraction?
Actually I don't think we should compare SubqueryExpression
s and wonder how to canonicalize them.
Basically, in sameResult
, by adding new condition (left.subqueries, right.subqueries).zipped.forall(_ sameResult _)
, we can achieve the same goal and all added tests are passed.
Btw, all PlanExpression
should be erased when doing cleanArg
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thank you very much. Let me try this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Thank you for your suggestion. I thought about this and went through my notes i had prepared on this while debugging this. The reason i had opted for comparing subquery expressions as opposed to just the plans is i wanted take advantage of expr.canonicalized which re-orders expression nicely to maximize cache hit.
Example - plan1 - Filter In || Exists || Scalar
plan2- Filter Scalar | In | Exists
When we compare the above two plans .. all other things being equal should cause a cache hit. I added a test case now to make sure. One other aspect is in the future the subquery expression may evolve to hold more attributes and not considering them didn't feel safe. The other thing is i suspect we may still have to deal with the outer references in the same way i am handling now (I haven't tried it yet). Please let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. The benefit of this expression approach is expression canonicalization.
def canonicalize(e: SubqueryExpression, attrs: AttributeSeq): CanonicalizedSubqueryExpr = { | ||
// Normalize the outer references in the subquery plan. | ||
val subPlan = e.plan.transformAllExpressions { | ||
case OuterReference(r) => BindReferences.bindReference(r, attrs, allowFailures = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here shall we bind reference for other Attribute
in sub query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan The local attributes would be bound when we call sameResult on the subquery plan as part of Subexpression.equals() ? We certainly can bind them here and if you don't mind, i wanted to explore this in a follow up in an effort to start considering the sub-plan in the hashCode computation (marked as a todo now in comment) as i wanted more testing on that. Please let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we bind all reference here, can we solve the problem that reordering is nondeterministic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Hi Wenchen, If we are able to normalize all the expression ids, then i think we can solve the nondeterministic problem. I had actually tried this for the new test cases i have added and they worked fine. But i didn't go with this as i felt i may have to test this more. In my testing, i only normalized the AttributeReference .. but we can have generated expression ids for other expressions, right , Aliases, SubqueryExpression etc. We also need to normalize these as well, no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Hi Wenchen, I tried to normalize all the expressions in the plan. But still the hashcodes are not stable. On two instances of the same plan i get two different hashcode. Please advice.
Hi @dilipbiswal can you update now? |
@cloud-fan Sure Wenchen. |
0c64245
to
7346dca
Compare
@@ -401,8 +401,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT | |||
* do not use `BindReferences` here as the plan may take the expression as a parameter with type | |||
* `Attribute`, and replace it with `BoundReference` will cause error. | |||
*/ | |||
protected def normalizeExprId[T <: Expression](e: T, input: AttributeSeq = allAttributes): T = { | |||
def normalizeExprId[T <: Expression](e: T, input: AttributeSeq = allAttributes): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, seems we can move this method to object QueryPlan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Ok. Sounds good wenchen.
@@ -236,6 +244,12 @@ case class ScalarSubquery( | |||
override def nullable: Boolean = true | |||
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = copy(plan = plan) | |||
override def toString: String = s"scalar-subquery#${exprId.id} $conditionString" | |||
override lazy val canonicalized: Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just override preCanonicalize
to turn exprId
to 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan preCanonicalize is a method in QueryPlan ? Can we override it here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry it's expression
Test build #75647 has finished for PR 17330 at commit
|
Test build #75654 has finished for PR 17330 at commit
|
@@ -59,6 +58,13 @@ abstract class SubqueryExpression( | |||
children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) | |||
case _ => false | |||
} | |||
def canonicalize(attrs: AttributeSeq): SubqueryExpression = { | |||
// Normalize the outer references in the subquery plan. | |||
val subPlan = plan.transformAllExpressions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
normalizedPlan
def canonicalize(attrs: AttributeSeq): SubqueryExpression = { | ||
// Normalize the outer references in the subquery plan. | ||
val subPlan = plan.transformAllExpressions { | ||
case OuterReference(r) => QueryPlan.normalizeExprId(r, attrs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OuterReference
will all be removed, is it expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Actually you r right. Preserving the OuterReference would be good.
Test build #75664 has finished for PR 17330 at commit
|
test("SPARK-19993 simple subquery caching") { | ||
withTempView("t1", "t2") { | ||
Seq(1).toDF("c1").createOrReplaceTempView("t1") | ||
Seq(1).toDF("c1").createOrReplaceTempView("t2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is t2
used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan sorry... actually i had some of these tests combined and when i split, i forgot to remove this. Will fix it.
test("SPARK-19993 subquery with cached underlying relation") { | ||
withTempView("t1", "t2") { | ||
Seq(1).toDF("c1").createOrReplaceTempView("t1") | ||
Seq(1).toDF("c1").createOrReplaceTempView("t2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is t2
used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan sorry... actually i had some of these tests combined and when i split, i forgot to remove this. Will fix it.
@@ -76,6 +76,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext | |||
sum | |||
} | |||
|
|||
private def getNumInMemoryTableScanExecs(plan: SparkPlan): Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a better name, this actually get in-memory table recursively, which is different from getNumInMemoryRelations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan So we are operating at the physical plan level in this method where as the other method getNumInMemoryRelations operates at a logical plan level. And in here we are simply counting the the InMemoryTableScanExec nodes in the plan. I have changed the function name to getNumInMemoryTablesRecursively. Does it look ok to you ?
LGTM except some minor comments about test |
Test build #75710 has finished for PR 17330 at commit
|
thanks, merging to master! |
@cloud-fan @gatorsmile Thanks a lot!! |
…ons does not work. ## What changes were proposed in this pull request? The sameResult() method does not work when the logical plan contains subquery expressions. **Before the fix** ```SQL scala> val ds = spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)") ds: org.apache.spark.sql.DataFrame = [c1: int] scala> ds.cache res13: ds.type = [c1: int] scala> spark.sql("select * from s1 where s1.c1 in (select s2.c1 from s2 where s1.c1 = s2.c1)").explain(true) == Analyzed Logical Plan == c1: int Project [c1#86] +- Filter c1#86 IN (list#78 [c1#86]) : +- Project [c1#87] : +- Filter (outer(c1#86) = c1#87) : +- SubqueryAlias s2 : +- Relation[c1#87] parquet +- SubqueryAlias s1 +- Relation[c1#86] parquet == Optimized Logical Plan == Join LeftSemi, ((c1#86 = c1#87) && (c1#86 = c1#87)) :- Relation[c1#86] parquet +- Relation[c1#87] parquet ``` **Plan after fix** ```SQL == Analyzed Logical Plan == c1: int Project [c1#22] +- Filter c1#22 IN (list#14 [c1#22]) : +- Project [c1#23] : +- Filter (outer(c1#22) = c1#23) : +- SubqueryAlias s2 : +- Relation[c1#23] parquet +- SubqueryAlias s1 +- Relation[c1#22] parquet == Optimized Logical Plan == InMemoryRelation [c1#22], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *BroadcastHashJoin [c1#1, c1#1], [c1#2, c1#2], LeftSemi, BuildRight :- *FileScan parquet default.s1[c1#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> +- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft(cast(input[0, int, true] as bigint), 32) | (cast(input[0, int, true] as bigint) & 4294967295)))) +- *FileScan parquet default.s2[c1#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/dbiswal/mygit/apache/spark/bin/spark-warehouse/s2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int> ``` ## How was this patch tested? New tests are added to CachedTableSuite. Author: Dilip Biswal <[email protected]> Closes apache#17330 from dilipbiswal/subquery_cache_final.
What changes were proposed in this pull request?
The sameResult() method does not work when the logical plan contains subquery expressions.
Before the fix
Plan after fix
How was this patch tested?
New tests are added to CachedTableSuite.