-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21657][SQL] optimize explode quadratic memory consumpation #19683
Conversation
ok to test |
Test build #83558 has finished for PR 19683 at commit
|
I've fixed the styling issue. |
Test build #83559 has finished for PR 19683 at commit
|
I've fixed more styling issue. |
Test build #83562 has finished for PR 19683 at commit
|
Test build #83566 has finished for PR 19683 at commit
|
Test build #83587 has finished for PR 19683 at commit
|
Do you understand this failure? |
retest this please |
Test build #83613 has finished for PR 19683 at commit
|
Can somebody please review this PR? Thanks. |
} else { | ||
generatorOutput | ||
projectedChildOutput ++ generatorOutput | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need update indentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
Test build #84001 has finished for PR 19683 at commit
|
Hi, did somebody had a chance to look at this PR? |
Looks like @kiszk already reviewed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small things but this isn't really my area, so will delegate to others.
@@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] { | |||
case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => | |||
p.copy(child = g.copy(join = false)) | |||
|
|||
// Turn on `omitGeneratorChild` for Generate if it's child column is not used | |||
case p @ Project(_, g @ Generate(gu: UnaryExpression, true, _, false, _, _, _)) | |||
if (AttributeSet(Seq(gu.child)) -- p.references).nonEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.references.contains(gu.child)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't compile:
Type mismatch, expected: NamedExpression, actual: Expression
@@ -450,6 +450,11 @@ object ColumnPruning extends Rule[LogicalPlan] { | |||
case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) => | |||
p.copy(child = g.copy(join = false)) | |||
|
|||
// Turn on `omitGeneratorChild` for Generate if it's child column is not used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: its
generatorOutput: Seq[Attribute], | ||
child: SparkPlan) | ||
extends UnaryExecNode with CodegenSupport { | ||
|
||
private def projectedChildOutput = generator match { | ||
case g: UnaryExpression if omitGeneratorChild => | ||
child.output diff Seq(g.child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .diff(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been looking at this a bit for my own edification. I wonder if a slightly more general approach would be to omit the UnsafeProject step in GenerateExec.doExecute if the generation is an immediate child of a projection?
That would save the copy entirely (rather than just the projected-out child column) since the projection is just going to redo the copy anyhow.
I don't know if SparkSQL has a more elegant way of expressing the constraint that output rows from a SparkPlan must be mutable, but it would be quite nice to be able to emit the JoinedRows from the generator and delay materialization of the combined tuples until needed.
Test build #84595 has finished for PR 19683 at commit
|
could you please retest this? |
@henryr I understand what you're saying. I'm not sure why there is the UnsafeProject in the end of the function, but it's commented in this PR that fixes [SPARK-13476] without much elaboration. |
I can reproduce the test failure locally. |
The sql in the failed test SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t The |
Now I feel it's a little hacky to introduce |
There was a similar exception as in failing unit tests was fixed in SPARK-18300
Not sure if this is directly applicable or helpful here though. |
seems reasonable, let's do that. |
outer: Boolean, | ||
qualifier: Option[String], | ||
generatorOutput: Seq[Attribute], | ||
child: LogicalPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong indentation?
@@ -57,20 +62,19 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In | |||
*/ | |||
case class GenerateExec( | |||
generator: Generator, | |||
join: Boolean, | |||
unrequiredChildIndex: Seq[Int], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The physical plan can just take requiredChildOutput
, and in the planner we can just do
case g @ logical.Generate(...) => GenerateExec(..., g.requiredChildOutput)
LGTM except 2 comments |
@@ -47,8 +47,13 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In | |||
* terminate(). | |||
* | |||
* @param generator the generator expression | |||
* @param join when true, each output row is implicitly joined with the input tuple that produced | |||
* it. | |||
* @param requiredChildOutput this paramter starts as Nil and gets filled by the Optimizer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to duplicate the comment here, just say required attributes from child output
val rows = if (requiredChildOutput.nonEmpty) { | ||
|
||
val pruneChildForResult: InternalRow => InternalRow = | ||
if ((child.outputSet -- requiredChildOutput).isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just child.output == requiredChildOutput
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't it always return false? or should I use child.output == AttributeSet(requiredChildOutput)
Test build #85501 has finished for PR 19683 at commit
|
Test build #85500 has finished for PR 19683 at commit
|
Test build #85502 has finished for PR 19683 at commit
|
retest this please. |
* A common use case is when we explode(array(..)) and are interested | ||
* only in the exploded data and not in the original array. before this | ||
* optimization the array got duplicated for each of its elements, | ||
* causing O(n^^2) memory consumption. (see [SPARK-21657]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems an extra space at the beginning since 2nd line.
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.benchmark | |||
|
|||
import org.apache.spark.util.Benchmark | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary blank line.
|
||
} | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This benchmark result should be moved up to be included in the ignored test block, as other tests.
Three comments for style. LGTM. |
Test build #85503 has finished for PR 19683 at commit
|
this timeout in "org.apache.spark.ml.regression.LinearRegressionSuite.linear regression with intercept without regularization" doesn't seem related to our fix.. |
retest this please |
Test build #85504 has finished for PR 19683 at commit
|
thanks, merging to master, great work! |
What changes were proposed in this pull request?
The issue has been raised in two Jira tickets: SPARK-21657, SPARK-16998. Basically, what happens is that in collection generators like explode/inline we create many rows from each row. Currently each exploded row contains also the column on which it was created. This causes, for example, if we have a 10k array in one row that this array will get copy 10k times - to each of the row. this results a qudratic memory consumption. However, it is a common case that the original column gets projected out after the explode, so we can avoid duplicating it.
In this solution we propose to identify this situation in the optimizer and turn on a flag for omitting the original column in the generation process.
How was this patch tested?