Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20413] Add new query hint NO_COLLAPSE. #17708

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,14 @@ def nanvl(col1, col2):
return Column(sc._jvm.functions.nanvl(_to_java_column(col1), _to_java_column(col2)))


@since(2.2)
def no_collapse(df):
"""Marks a DataFrame as non-collapsible."""

sc = SparkContext._active_spark_context
return DataFrame(sc._jvm.functions.no_collapse(df._jdf), df.sql_ctx)


@since(1.4)
def rand(seed=None):
"""Generates a random column with independent and identically distributed (i.i.d.) samples
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,13 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
child.stats(conf).copy(isBroadcastable = true)
}

/**
* A hint for the optimizer that we should not merge two projections.
*/
case class NoCollapseHint(child: LogicalPlan) extends UnaryNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we want this in the LogicalPlan level and not on the expression level?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this approach is that most other optimizations won't work with this, for example predicate push down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought about putting it at the expression level, but ultimately decided it made more sense at the LogicalPlan node level, since the purpose was in fact to disrupt the optimizer. In some respects, it's meant to have the same effect as df.cache(), but without the caching. There may, in fact, be situations where predicate pushdown is not desired because the resulting condition would become complex and expensive to evaluate.

In Spark SQL, I think it also makes more sense to specify the hint at the derived table level, as opposed to a single expression. For instance,

SELECT SNO, PNO, C1 +1, C1 + 2
FROM ( SELECT /*+ NO_COLLAPSE */ SNO, PNO, QTY * 10 AS C1 FROM T ) T

This is similar to the NO_MERGE query hint in Oracle, which prevents the query from being flattened.

override def output: Seq[Attribute] = child.output
}

/**
* A general hint for the child. This node will be eliminated post analysis.
* A pair of (name, parameters).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Rand
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoCollapseHint}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

class CollapseProjectSuite extends PlanTest {
Expand Down Expand Up @@ -119,4 +119,14 @@ class CollapseProjectSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("do not collapse projects with onceOnly expressions") {
val query = NoCollapseHint(testRelation.select(('a * 10).as('a_times_10)))
.select(('a_times_10 + 1).as('a_times_10_plus_1), ('a_times_10 + 2).as('a_times_10_plus_2))

val optimized = Optimize.execute(query.analyze)
val correctAnswer = query.analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
case BroadcastHint(child) => planLater(child) :: Nil
case NoCollapseHint(child) => planLater(child) :: Nil
case _ => Nil
}
}
Expand Down
18 changes: 17 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, NoCollapseHint}
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1022,6 +1022,22 @@ object functions {
Dataset[T](df.sparkSession, BroadcastHint(df.logicalPlan))(df.exprEnc)
}

/**
* Marks a DataFrame as non-collapsible.
*
* For example:
* {{{
* df1 = no_collapse(df.select((df.col("qty") * lit(10).alias("c1")))
* df2 = df1.select(col("c1") + lit(1)), col("c1") + lit(2)))
* }}}
*
* @group normal_funcs
* @since 2.2.0
*/
def no_collapse[T](df: Dataset[T]): Dataset[T] = {
Dataset[T](df.sparkSession, NoCollapseHint(df.logicalPlan))(df.exprEnc)
}

/**
* Returns the first column that is not null, or null if all inputs are null.
*
Expand Down