-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21657][SQL] optimize explode quadratic memory consumpation #19683
Changes from 29 commits
ce7c369
76aa258
7a9bc96
a3050e9
b8b5960
b825d6b
04b5814
7cb9454
ccc78e9
8ef78af
272a059
f9e69a4
42aa32d
93816b6
6caa0d5
11867e2
e00ecaf
c3183d0
b6b8694
09e6d05
227c7af
17db21e
9edd864
6c07d2b
f68bd2d
f92ec11
288aa73
283340f
8f06dda
1c6626a
4edd884
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,8 +73,13 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend | |
* their output. | ||
* | ||
* @param generator the generator expression | ||
* @param join when true, each output row is implicitly joined with the input tuple that produced | ||
* it. | ||
* @param unrequiredChildIndex this paramter starts as Nil and gets filled by the Optimizer. | ||
* It's used as an optimization for omitting data generation that will | ||
* be discarded next by a projection. | ||
* A common use case is when we explode(array(..)) and are interested | ||
* only in the exploded data and not in the original array. before this | ||
* optimization the array got duplicated for each of its elements, | ||
* causing O(n^^2) memory consumption. (see [SPARK-21657]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: seems an extra space at the beginning since 2nd line. |
||
* @param outer when true, each input row will be output at least once, even if the output of the | ||
* given `generator` is empty. | ||
* @param qualifier Qualifier for the attributes of generator(UDTF) | ||
|
@@ -83,15 +88,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend | |
*/ | ||
case class Generate( | ||
generator: Generator, | ||
join: Boolean, | ||
unrequiredChildIndex: Seq[Int], | ||
outer: Boolean, | ||
qualifier: Option[String], | ||
generatorOutput: Seq[Attribute], | ||
child: LogicalPlan) | ||
extends UnaryNode { | ||
|
||
/** The set of all attributes produced by this node. */ | ||
def generatedSet: AttributeSet = AttributeSet(generatorOutput) | ||
lazy val requiredChildOutput: Seq[Attribute] = { | ||
val unrequiredSet = unrequiredChildIndex.toSet | ||
child.output.zipWithIndex.filterNot(t => unrequiredSet.contains(t._2)).map(_._1) | ||
} | ||
|
||
override lazy val resolved: Boolean = { | ||
generator.resolved && | ||
|
@@ -114,9 +121,7 @@ case class Generate( | |
nullableOutput | ||
} | ||
|
||
def output: Seq[Attribute] = { | ||
if (join) child.output ++ qualifiedGeneratorOutput else qualifiedGeneratorOutput | ||
} | ||
def output: Seq[Attribute] = requiredChildOutput ++ qualifiedGeneratorOutput | ||
} | ||
|
||
case class Filter(condition: Expression, child: LogicalPlan) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -276,7 +276,7 @@ class PlanParserSuite extends AnalysisTest { | |
assertEqual( | ||
"select * from t lateral view explode(x) expl as x", | ||
table("t") | ||
.generate(explode, join = true, outer = false, Some("expl"), Seq("x")) | ||
.generate(explode, alias = Some("expl"), outputNames = Seq("x")) | ||
.select(star())) | ||
|
||
// Multiple lateral views | ||
|
@@ -286,12 +286,12 @@ class PlanParserSuite extends AnalysisTest { | |
|lateral view explode(x) expl | ||
|lateral view outer json_tuple(x, y) jtup q, z""".stripMargin, | ||
table("t") | ||
.generate(explode, join = true, outer = false, Some("expl"), Seq.empty) | ||
.generate(jsonTuple, join = true, outer = true, Some("jtup"), Seq("q", "z")) | ||
.generate(explode, alias = Some("expl")) | ||
.generate(jsonTuple, outer = true, alias = Some("jtup"), outputNames = Seq("q", "z")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we keep the previous code style and inline There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
to make it more similar to the previous code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and remove
|
||
.select(star())) | ||
|
||
// Multi-Insert lateral views. | ||
val from = table("t1").generate(explode, join = true, outer = false, Some("expl"), Seq("x")) | ||
val from = table("t1").generate(explode, alias = Some("expl"), outputNames = Seq("x")) | ||
assertEqual( | ||
"""from t1 | ||
|lateral view explode(x) expl as x | ||
|
@@ -303,7 +303,7 @@ class PlanParserSuite extends AnalysisTest { | |
|where s < 10 | ||
""".stripMargin, | ||
Union(from | ||
.generate(jsonTuple, join = true, outer = false, Some("jtup"), Seq("q", "z")) | ||
.generate(jsonTuple, alias = Some("jtup"), outputNames = Seq("q", "z")) | ||
.select(star()) | ||
.insertInto("t2"), | ||
from.where('s < 10).select(star()).insertInto("t3"))) | ||
|
@@ -312,10 +312,8 @@ class PlanParserSuite extends AnalysisTest { | |
val expected = table("t") | ||
.generate( | ||
UnresolvedGenerator(FunctionIdentifier("posexplode"), Seq('x)), | ||
join = true, | ||
outer = false, | ||
Some("posexpl"), | ||
Seq("x", "y")) | ||
alias = Some("posexpl"), | ||
outputNames = Seq("x", "y")) | ||
.select(star()) | ||
assertEqual( | ||
"select * from t lateral view posexplode(x) posexpl as x, y", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct?
AttributeSet.intersect
is special.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, the implementation was identical in class Generate:
def generatedSet: AttributeSet = AttributeSet(generatorOutput)
override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see