Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27699][SQL] Partially push down disjunctive predicated in Parquet/ORC #24598

Closed

Conversation

gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented May 14, 2019

What changes were proposed in this pull request?

Currently, in ParquetFilters and OrcFilters, if the child predicate of Or operator can't be entirely pushed down, the predicates will be thrown away.
In fact, the conjunctive predicates under Or operators can be partially pushed down.
For example, says a and b are convertible, while c can't be pushed down, the predicate
a or (b and c)
can be converted as
(a or b) and (a or c)
We can still push down (a or b).
We can't push down disjunctive predicates only when one of its children is not partially convertible.

This PR also improve the filter pushing down logic in DataSourceV2Strategy. With partial filter push down in Or operator, the result of pushedFilters() might not exist in the mapping translatedFilterToExpr. To fix it, this PR changes the mapping translatedFilterToExpr as leaf filter expression to sources.filter, and later on rebuild the whole expression with the mapping.

How was this patch tested?

Unit test

@SparkQA
Copy link

SparkQA commented May 14, 2019

Test build #105380 has finished for PR 24598 at commit 490e00c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

@cloud-fan
Copy link
Contributor

looks reasonable to me.

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @gengliangwang . I'll take a look today~

@@ -529,9 +529,9 @@ private[parquet] class ParquetFilters(
case sources.Or(lhs, rhs) =>
for {
lhsFilter <-
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments like we did in AND?

_ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false)
_ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false)
_ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = true)
_ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gengliangwang .
Is there a reason not to update another OrcFilters? We had better be consistent for both in order to support Hadoop 3 in Spark 3.0.0.

  • sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, cc @wangyum since he is actively working on Hadoop 3.2 support.

@@ -398,6 +387,55 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
}
}

test("SPARK-27699 Converting disjunctions into ORC SearchArguments") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for this test case.

@SparkQA
Copy link

SparkQA commented May 15, 2019

Test build #105397 has finished for PR 24598 at commit 38519d3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

// (a1 AND a2) OR (b1 AND b2),
// a1 and b1 is convertible, while a2 and b2 is not.
// The predicate can be converted as
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule is correct, but it looks a little dangerous to set true blindly without considering the given parameter canPartialPushDownConjuncts. Could you add a test case for that complex example like OR under NOT in the deep predicate tree?

Also, it would be great to add more higher level test case in SQLQuerySuite.scala to show the benefit of this additional predicate pushdown a1 OR b1. Could you add that, too?

Copy link
Member Author

@gengliangwang gengliangwang May 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule is correct, but it looks a little dangerous to set true blindly without considering the given parameter canPartialPushDownConjuncts. Could you add a test case for that complex example like OR under NOT in the deep predicate tree?

@dongjoon-hyun nice catch. However, here the Not predicate won't have a child as Or or And predicate because of BooleanSimplification optimization rule.
I have updated the code, but a new test case seems unnecessary. Not(a Or b) can be converted as Not(a) And Not(b)
Also, I have created a PR for pushing down Not operator before(for double insurance), but seems the PR made thing too complex: #22687

Also, it would be great to add more higher level test case in SQLQuerySuite.scala to show the benefit of this additional predicate pushdown a1 OR b1. Could you add that, too?

How can we verify the predicate is pushed down? Match the OrcScan and check the pushedFilters? Only Orc V2 can be checked in this way currently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion from (a1 AND a2) OR (b1 AND b2) to(a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) always works no matter it's a top level boolean expression or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang . You can use explain with Console.withOut. What we need is to check PushedFilters: in the plan.

How can we verify the predicate is pushed down? Match the OrcScan and check the pushedFilters? Only Orc V2 can be checked in this way currently.

@cloud-fan and @gengliangwang . Yes. Of course, I already agreed that the rule is correct. I want to have a test case in a complete end-to-end query form which shows the new benefit clearly. Could you please add a real use case you met?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for an end-to-end test

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we apply this on org.apache.spark.sql.hive.orc.OrcFilters.createBuilder, too?

@SparkQA
Copy link

SparkQA commented May 15, 2019

Test build #105406 has finished for PR 24598 at commit 38519d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 15, 2019

Test build #105409 has finished for PR 24598 at commit 3b88687.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang gengliangwang force-pushed the pushdownDisjunctivePredicates branch from c65e026 to caeb64d Compare May 15, 2019 12:23
@SparkQA
Copy link

SparkQA commented May 15, 2019

Test build #105415 has finished for PR 24598 at commit c65e026.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 15, 2019

Test build #105417 has finished for PR 24598 at commit caeb64d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Some(sources.IsNull(a.name))
case expressions.IsNotNull(a: Attribute) =>
Some(sources.IsNotNull(a.name))
translateFilterWithMapping(predicate, None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more explanation why this PR needs to have translateFilterWithMapping and translateLeafNodeFilter? Is this inevitable refactoring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inevitable. See my comments below or the ending part of the PR description.

withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
withTempPath { dir =>
spark.range(10).map(i => (i, i.toString)).toDF("id", "s").write.orc(dir.getCanonicalPath)
val df = spark.read.orc(dir.getCanonicalPath)
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to have Parquet testing because it's the default format in Apache Spark. But, Yes. I got it. You want to use a DSv2 way for testing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a follow up after the Parquet V2 is merged.

if (translated.isDefined) {
translatedFilterToExpr(translated.get) = filterExpr
} else {
val translated =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With partial filter push down in Or operator, the result of pushedFilters() might not exist in the mapping translatedFilterToExpr. To fix it, this PR changes the mapping translatedFilterToExpr as leaf filter expression to sources.filter, and later on rebuild the whole expression with the mapping.

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Actually, when I tested your PR before, I also noticed that. The new Or doesn't work in the end-to-end case. That's the reason to ask this. I'll test more.

Thank you for making this PR working, @gengliangwang !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it would be great to add a real comment in the code. It's non-trivial to figure out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for for the testing. Appreciate it!

@SparkQA
Copy link

SparkQA commented May 16, 2019

Test build #105458 has finished for PR 24598 at commit 5f412bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 16, 2019

Test build #105463 has finished for PR 24598 at commit b8cb843.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// A map from translated data source filters to original catalyst filter expressions.
// A map from translated data source leaf node filters to original catalyst filter
// expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially
// pushed down. This map can used to construct a catalyst filter expression from the input
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can used -> can be used?

@SparkQA
Copy link

SparkQA commented May 17, 2019

Test build #105476 has finished for PR 24598 at commit 4d84060.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 17, 2019

Test build #105477 has finished for PR 24598 at commit 90b0b69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} yield filter
import org.apache.spark.sql.sources._

def convertibleFiltersHelper(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird to call a method "helper", what does it do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a helper method for converting Filter to Expression recursively. It can also be _convertibleFilters or convertibleFilters0...
Here it is following createFilterHelper in ParquetFilters.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in e39e97b May 17, 2019
cloud-fan pushed a commit that referenced this pull request Jul 20, 2020
… Join/Partitions

### What changes were proposed in this pull request?

In #28733 and #28805, CNF conversion is used to push down disjunctive predicates through join and partitions pruning.

It's a good improvement, however, converting all the predicates in CNF can lead to a very long result, even with grouping functions over expressions.  For example, for the following predicate
```
(p0 = '1' AND p1 = '1') OR (p0 = '2' AND p1 = '2') OR (p0 = '3' AND p1 = '3') OR (p0 = '4' AND p1 = '4') OR (p0 = '5' AND p1 = '5') OR (p0 = '6' AND p1 = '6') OR (p0 = '7' AND p1 = '7') OR (p0 = '8' AND p1 = '8') OR (p0 = '9' AND p1 = '9') OR (p0 = '10' AND p1 = '10') OR (p0 = '11' AND p1 = '11') OR (p0 = '12' AND p1 = '12') OR (p0 = '13' AND p1 = '13') OR (p0 = '14' AND p1 = '14') OR (p0 = '15' AND p1 = '15') OR (p0 = '16' AND p1 = '16') OR (p0 = '17' AND p1 = '17') OR (p0 = '18' AND p1 = '18') OR (p0 = '19' AND p1 = '19') OR (p0 = '20' AND p1 = '20')
```
will be converted into a long query(130K characters) in Hive metastore, and there will be error:
```
javax.jdo.JDOException: Exception thrown when executing query : SELECT DISTINCT 'org.apache.hadoop.hive.metastore.model.MPartition' AS NUCLEUS_TYPE,A0.CREATE_TIME,A0.LAST_ACCESS_TIME,A0.PART_NAME,A0.PART_ID,A0.PART_NAME AS NUCORDER0 FROM PARTITIONS A0 LEFT OUTER JOIN TBLS B0 ON A0.TBL_ID = B0.TBL_ID LEFT OUTER JOIN DBS C0 ON B0.DB_ID = C0.DB_ID WHERE B0.TBL_NAME = ? AND C0."NAME" = ? AND ((((((A0.PART_NAME LIKE '%/p1=1' ESCAPE '\' ) OR (A0.PART_NAME LIKE '%/p1=2' ESCAPE '\' )) OR (A0.PART_NAME LIKE '%/p1=3' ESCAPE '\' )) OR ((A0.PART_NAME LIKE '%/p1=4' ESCAPE '\' ) O ...
```

Essentially, we just need to traverse predicate and extract the convertible sub-predicates like what we did in #24598. There is no need to maintain the CNF result set.

### Why are the changes needed?

A better implementation for pushing down disjunctive and complex predicates. The pushed down predicates is always equal or shorter than the CNF result.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #29101 from gengliangwang/pushJoin.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants