-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20413] Add new query hint NO_COLLAPSE. #17708
Conversation
ok to test |
Test build #75995 has finished for PR 17708 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ptkool thinks for submitting the PR. I am not sure this is the best way to avoid projection collapse. The problem is that this approach will also inhibit other optimization from taking place.
python/pyspark/sql/functions.py
Outdated
@@ -466,6 +466,14 @@ def nanvl(col1, col2): | |||
return Column(sc._jvm.functions.nanvl(_to_java_column(col1), _to_java_column(col2))) | |||
|
|||
|
|||
@since(2.2) | |||
def no_collapse(df): | |||
"""Marks a DataFrame as small enough for use in broadcast joins.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc is incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
* @group normal_funcs | ||
* @since 1.5.0 | ||
*/ | ||
* Marks a DataFrame as small enough for use in broadcast joins. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please undo this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
def broadcast[T](df: Dataset[T]): Dataset[T] = { | ||
Dataset[T](df.sparkSession, BroadcastHint(df.logicalPlan))(df.exprEnc) | ||
} | ||
|
||
/** | ||
* Marks a DataFrame as small enough for use in broadcast joins. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the alignment is of by a space, it should be:
/**
* Text...
*/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix.
@@ -387,6 +387,13 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { | |||
} | |||
|
|||
/** | |||
* A hint for the optimizer that we should not merge two projections. | |||
*/ | |||
case class NoCollapseHint(child: LogicalPlan) extends UnaryNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we want this in the LogicalPlan level and not on the expression level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with this approach is that most other optimizations won't work with this, for example predicate push down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally thought about putting it at the expression level, but ultimately decided it made more sense at the LogicalPlan node level, since the purpose was in fact to disrupt
the optimizer. In some respects, it's meant to have the same effect as df.cache()
, but without the caching. There may, in fact, be situations where predicate pushdown is not desired because the resulting condition would become complex and expensive to evaluate.
In Spark SQL, I think it also makes more sense to specify the hint at the derived table level, as opposed to a single expression. For instance,
SELECT SNO, PNO, C1 +1, C1 + 2
FROM ( SELECT /*+ NO_COLLAPSE */ SNO, PNO, QTY * 10 AS C1 FROM T ) T
This is similar to the NO_MERGE query hint in Oracle, which prevents the query from being flattened.
|
||
comparePlans( | ||
parsePlan("SELECT a FROM (SELECT /*+ NO_COLLAPSE */ * FROM t) t1"), | ||
SubqueryAlias("t1", Hint("NO_COLLAPSE", Seq.empty, table("t").select(star()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are you testing here that is not covered by the other cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, nothing. I will remove it.
Test build #76001 has finished for PR 17708 at commit
|
1231585
to
3986247
Compare
Test build #76005 has finished for PR 17708 at commit
|
Based on the JIRA description, it sounds like we should not simply merge two Projects to avoid calling the same UDF multiple times, instead of adding a new logical plan node. |
I have the same question as Reynold asked in the mailing list. Doesn't common sub expression elimination already address this issue? |
Any update? Maybe we can close this PR at first? |
@gatorsmile I will run a few more tests to determine if subexpression elimination solves this issue. |
We are closing the inactive PRs. After you run more test, please do reopen if you still hit this issue. Thanks! |
What changes were proposed in this pull request?
This PR proposes adding a new query hint called NO_COLLAPSE that can be used to prevent adjacent projections from being collapsed.
How was this patch tested?
Test using unit tests, integration tests and manual tests.