-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9192][SQL] add initialization phase for nondeterministic expression #7535
Conversation
Test build #37828 has finished for PR 7535 at commit
|
Test build #37829 has finished for PR 7535 at commit
|
Test build #37841 has finished for PR 7535 at commit
|
I discussed with @marmbrus on this. Here's the suggestion:
|
} | ||
|
||
def checkAnalysis(plan: LogicalPlan): Unit = { | ||
// We transform up and order the rules so as to catch the first possible failure instead | ||
// of the result of cascading resolution failures. | ||
plan.foreachUp { | ||
|
||
case operator: LogicalPlan => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case operator: LogicalPlan
catches all cases, so why not just use a normal function here instead of pattern match?
Test build #38087 has finished for PR 7535 at commit
|
Test build #1158 has finished for PR 7535 at commit
|
case e: Generator => true | ||
}).nonEmpty | ||
case e: Generator => e | ||
}).length > 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a un-related but small fix: check multiple should use length > 1
instead of nonEmpty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops
Test build #38091 has finished for PR 7535 at commit
|
Test build #38176 has finished for PR 7535 at commit
|
Test build #38194 has finished for PR 7535 at commit
|
failAnalysis( | ||
s"""nondeterministic expressions are only allowed in Project or Filter, found: | ||
| ${o.expressions.map(_.prettyString).mkString(",")} | ||
|in operator ${operator.simpleString} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not aligned ...
62c28a7
to
6c6f332
Compare
Test build #38438 has finished for PR 7535 at commit
|
Test build #38440 has finished for PR 7535 at commit
|
Test build #38441 has finished for PR 7535 at commit
|
// todo: It's hard to write a general rule to pull out nondeterministic expressions | ||
// from LogicalPlan, currently we only do it for UnaryNode which has same output | ||
// schema with its child. | ||
case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to merge this one first. Would be great to get a holistic review before 1.5 release on all the random related optimizer/analyzer changes. |
…oin Conditions ## What changes were proposed in this pull request? ``` sql("SELECT t1.b, rand(0) as r FROM cachedData, cachedData t1 GROUP BY t1.b having r > 0.5").show() ``` We will get the following error: ``` Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 8, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87) at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) ``` Filters could be pushed down to the join conditions by the optimizer rule `PushPredicateThroughJoin`. However, Analyzer [blocks users to add non-deterministics conditions](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L386-L395) (For details, see the PR apache#7535). We should not push down non-deterministic conditions; otherwise, we need to explicitly initialize the non-deterministic expressions. This PR is to simply block it. ### How was this patch tested? Added a test case Author: Xiao Li <[email protected]> Closes apache#17585 from gatorsmile/joinRandCondition.
Currently nondeterministic expression is broken without a explicit initialization phase.
Let me take
MonotonicallyIncreasingID
as an example. This expression need a mutable state to remember how many times it has been evaluated, so we use@transient var count: Long
there. By being transient, thecount
will be reset to 0 and only to 0 when serialize and deserialize it, as deserialize transient variable will result to default value. There is no way to use another initial value forcount
, until we add the explicit initialization phase.Another use case is local execution for
LocalRelation
, there is no serialize and deserialize phase and thus we can't reset mutable states for it.