-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23877][SQL]: Use filter predicates to prune partitions in metadata-only queries #20988
[SPARK-23877][SQL]: Use filter predicates to prune partitions in metadata-only queries #20988
Conversation
The LocalRelation created for partition data for metadata-only queries may be a stream produced by listing directories. If the stream is large, serializing the LocalRelation to executors results in a stack overflow because the stream is a recursive structure of (head, rest-of-stream).
Test build #88959 has finished for PR 20988 at commit
|
@cloud-fan or @gatorsmile, could you review this? |
can we add a test? We can use |
|
||
case p @ Project(projectList, child) if projectList.forall(_.deterministic) => | ||
unapply(child).flatMap { case (partAttrs, filters, relation) => | ||
if (p.references.subsetOf(partAttrs)) Some((p.outputSet, filters, relation)) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about Filter(p > 1, Project(a, p, Table(a, b, p, partitioned by p)))
? p > 1
should also be a partition filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd propose something top-down like
def getPartitionedRelation(
plan: LogicalPlan,
predicates: Seq[Expression]): Option[(AttributeSet, Seq[Expression], LogicalPlan)] = {
plan match {
case Filter(condition, child) if condition.deterministic =>
getPartitionedRelation(child, predicates ++ splitConjunctivePredicates(condition))
case Project(projectList, child) if projectList.forall(_.deterministic) =>
getPartitionedRelation(child, predicates.filter(_.references.subsetOf(child.outputSet)))
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) if fsRelation.partitionSchema.nonEmpty =>
val partAttrs = ...
val partitionFilters = predicates.filter(_.references.subsetOf(partAttrs))
Some(...)
case _ => None
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, that is basically how this works already. Each matched node calls unapply(child)
to get the result from the child node, then it adds the current node's conditions to that result. Using unapply
instead of getPartitionedRelation
makes this work in the matching rule:
case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, filters, relation)) =>
@cloud-fan, I've added the test. Thanks for letting me know about HiveCatalogMetrics, that's exactly what I needed. |
Test build #89535 has finished for PR 20988 at commit
|
case f @ Filter(condition, child) if condition.deterministic => | ||
unapply(child).flatMap { case (partAttrs, filters, relation) => | ||
if (f.references.subsetOf(partAttrs)) { | ||
Some((partAttrs, splitConjunctivePredicates(condition) ++ filters, relation)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a bug here. Think about Filter(x > 1, Project(p + 1 as x, Table(a, p, partitioned by p)))
, we will mistakenly report x > 1
as partition predicates and use it to list partitions and fail.
I think we should use PhysicalOperation
here, which can help us to substitute the attributes in filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I've added that as a test case and updated the PartitionedRelation
code to keep track of both original partition attributes -- that the filter needs -- and the top-most node's output that is used by the rule. One thing to note: the optimizer usually pushes Filter
below Project` by the time this runs, so it is difficult to even construct this test case.
For using PhysicalOperation
instead of PartitionedRelation
, I don't see a compelling reason for such an invasive change. This currently adds a couple of results to unapply and keeps mostly the same logic. PhysicalOperation
would lose the check that the references are a subset of the partition attributes and be a lot larger change. If you think this should be refactored, lets talk about that separately to understand the motivation for the change.
Test build #89588 has finished for PR 20988 at commit
|
thanks, merging to master! |
…ndling of Project and Filter over partitioned relation ## What changes were proposed in this pull request? A followup of apache#20988 `PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation. ## How was this patch tested? existing test Author: Wenchen Fan <[email protected]> Closes apache#21111 from cloud-fan/refactor.
…ata-only queries ## What changes were proposed in this pull request? This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver. This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq. ## How was this patch tested? Existing tests for metadata-only queries. Author: Ryan Blue <[email protected]> Closes apache#20988 from rdblue/SPARK-23877-metadata-only-push-filters. (cherry picked from commit b3fde5a)
…ndling of Project and Filter over partitioned relation A followup of apache#20988 `PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation. existing test Author: Wenchen Fan <[email protected]> Closes apache#21111 from cloud-fan/refactor. (cherry picked from commit f70f46d) Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
…ata-only queries ## What changes were proposed in this pull request? This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver. This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq. ## How was this patch tested? Existing tests for metadata-only queries. Author: Ryan Blue <[email protected]> Closes apache#20988 from rdblue/SPARK-23877-metadata-only-push-filters. (cherry picked from commit b3fde5a)
…ndling of Project and Filter over partitioned relation A followup of apache#20988 `PhysicalOperation` can collect Project and Filters over a certain plan and substitute the alias with the original attributes in the bottom plan. We can use it in `OptimizeMetadataOnlyQuery` rule to handle the Project and Filter over partitioned relation. existing test Author: Wenchen Fan <[email protected]> Closes apache#21111 from cloud-fan/refactor. (cherry picked from commit f70f46d) Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
What changes were proposed in this pull request?
This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.
This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.
How was this patch tested?
Existing tests for metadata-only queries.