-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Aggregate push down for incremental scan #7636
Conversation
if (snapshot == null) { | ||
LOG.info("Skipping aggregate pushdown: table snapshot is null"); | ||
return false; | ||
org.apache.iceberg.Scan scan; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor style thought?
Could we do something like
Scan scan = null;
if (readConf.startSnapshotId() == null) {
TableScan tableScan = table.newScan()
Snapshot snapshot = readSnapshot();
if (snapshot == null) {
LOG.info("Skipping aggregate pushdown: table snapshot is null");
return false;
}
tableScan = tableScan.useSnapshot(snapshot.snapshotId());
scan = tableScan
} else {
IncrementalAppendScan incrementalScan = table.newIncrementalAppendScan();
incrementalScan = incrementalScan.fromSnapshotExclusive(readConf.startSnapshotId());
Long endSnapshotId = readConf.endSnapshotId();
if (endSnapshotId != null) {
incrementalScan = incrementalScan.toSnapshot(endSnapshotId);
}
scan = incrementalScan;
}
scan = scan.filter(filterExpression()).includeColumnStats();
I could also see extracting that branching part into another function. My main suggestion is basically to structure it more like
if (tableScan) {
TableScanStuff
}
else () {
IncrementalStuff
}
Common Stuff
or
def Scan getScan() {
...
}
getScan()
.filter()
.includeColumnStats()
So we can remove all the casting and keep all the common code in one place just incase we need to add things in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look further, we already have a build scan procedure for normal scans, why don't we use those private functions? or extract that logic?
Like buildBatchScan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your comment! I have changed the code to reuse the buildBatchScan
logic. Could you please take a look again when you have a moment?
scan = scan.useSnapshot(snapshot.snapshotId()); | ||
scan = configureSplitPlanning(scan); | ||
scan = scan.filter(filterExpression()); | ||
org.apache.iceberg.Scan scan = buildIcebergBatchScan(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please modify boolean parameters you are passing with an inline comment to follow the Contributing Style Guidelines on Boolean Arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks
@@ -188,6 +187,7 @@ public Filter[] pushedFilters() { | |||
} | |||
|
|||
@Override | |||
@SuppressWarnings("checkstyle:CyclomaticComplexity") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have removed a decent amount of the branch points that were added in the prior commit that made this SuppressWarnings
necessary. Can you see if its still needed after the most recent refactoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Thanks!
spark, table, buildIcebergBatchScan(false), readConf, expectedSchema, filterExpressions); | ||
} | ||
|
||
private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do we need the fully qualified type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, we need the fully qualified type because we have already import org.apache.spark.sql.connector.read.Scan
.collectAsList(); | ||
Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), result); | ||
.load(tableLocation); | ||
List<SimpleRecord> result1 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way we can check that pushdown is being used? I know this test passes now but I just want to make sure we are also using pushdown in future test scenarios
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can check if pushdown is being used. I normally check the explain string to find out if pushdown is being used.
For example,
SELECT min(data), max(data), count(data) FROM table;
If aggregate is pushed down, the physical plan has
LocalTableScan [min(data)#4461, max(data)#4462, count(data)#4463L]
If aggregate is not pushed down, the physical plan has
BatchScan default.table[data#4471]
I didn't check the explain string in this test to see if aggregate is pushed down, but I added a test for incremental Scan in TestAggregatePushDown
. I have checked the explain string in that test to make sure aggregate is pushed down.
@@ -432,6 +426,10 @@ private Scan buildBatchScan(Long snapshotId, Long asOfTimestamp, String branch, | |||
.filter(filterExpression()) | |||
.project(expectedSchema); | |||
|
|||
if (withStats) { | |||
scan = scan.includeColumnStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR, but something we should think about is how this can be overly expensive on extremely wide tables. @pvary is actually dealing with a similar issue, we may want to extract just the columns relating to our aggregation to avoid memory issues. Not an issue for this PR though, just wanted to bring it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks much cleaner! Thanks so much @huaxingao . I think once the remaining nits are cleaned up this is good to go. Just lets make sure we have a test where both aggregate pushdown and non-aggregate pushdown aggregates work successfully in Incr and Batch.
@RussellSpitzer I have addressed the comments and added a a new test in |
.agg(functions.min("data"), functions.max("data"), functions.count("data")); | ||
String explain2 = | ||
dfWithoutAggPushdown1.queryExecution().explainString(ExplainMode.fromString("simple")); | ||
explainContainsPushDownAggregates1 = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a StringAssert.contains(strings*) you can use (which will also give a nicer exception)
Assertions.asserThat("str").contains(.....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also how do we make sure that the function expressions are in the TableRelation and not somewhere else in the plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a good way to make sure that the expressions are in the TableRelation.
Here is the pushed down plan:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[min(agg_func_0#51), max(agg_func_1#52), sum(agg_func_2#53L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=65]
+- HashAggregate(keys=[], functions=[partial_min(agg_func_0#51), partial_max(agg_func_1#52), partial_sum(agg_func_2#53L)])
+- Project [min(data)#54 AS agg_func_0#51, max(data)#55 AS agg_func_1#52, count(data)#56L AS agg_func_2#53L]
+- LocalTableScan [min(data)#54, max(data)#55, count(data)#56L]
Here is the non pushed down plan:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[min(data#414), max(data#414), count(data#414)])
+- HashAggregate(keys=[], functions=[partial_min(data#414), partial_max(data#414), partial_count(data#414)])
+- BatchScan spark_catalog.default.table[data#414] spark_catalog.default.table (branch=null) [filters=, groupedBy=] RuntimeFilters: []
I think actually it might be easier to check LocalTableScan
instead.
I have changed this test to check LocalTableScan
. I will have a followup to change all the other tests to also check LocalTableScan
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertions at 588. 567. and 611 all can be converted to assertJ style contains with multiple strings. Other than that I think this is probably fine for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still a few assertJ conversions left to do in the tests but I think this is basically good to go. If I thought the underlying api would be more stable I would also push for testing the actual plans rather than the explain strings but I think they are both probably equally brittle in this case in terms of Spark upgrades.
I'll be on board to merge once the remaining tests are cleaned up
@@ -288,29 +289,37 @@ public void testIncrementalScanOptions() throws IOException { | |||
}); | |||
|
|||
// test (1st snapshot, current snapshot] incremental scan. | |||
List<SimpleRecord> result = | |||
Dataset<Row> resultDf1 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: resultDF1 => currentSnapshotResult
|
||
// test (2nd snapshot, 3rd snapshot] incremental scan. | ||
Dataset<Row> resultDf = | ||
Dataset<Row> resultDf2 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultDf2 => incrementalResult
long snapshotId3 = validationCatalog.loadTable(tableIdent).currentSnapshot().snapshotId(); | ||
sql("INSERT INTO %s VALUES (8, 7777), (9, 9999)", tableName); | ||
|
||
Dataset<Row> dfWithAggPushdown1 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushdownResult
|
||
Assert.assertTrue("aggregate pushed down", explainContainsPushDownAggregates1); | ||
|
||
Dataset<Row> dfWithoutAggPushdown1 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noPushdownResult
rowsToJava(dfWithAggPushdown1.collectAsList()), | ||
rowsToJava(dfWithoutAggPushdown1.collectAsList())); | ||
|
||
Dataset<Row> dfWithAggPushdown2 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unboundedPushdownResult
|
||
Assert.assertTrue("aggregate pushed down", explainContainsPushDownAggregates2); | ||
|
||
Dataset<Row> dfWithoutAggPushdown2 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unboundedNoPushdownResult
@@ -535,6 +540,106 @@ public void testAggregatePushDownForTimeTravel() { | |||
assertEquals("count push down", expected2, actual2); | |||
} | |||
|
|||
@Test | |||
@SuppressWarnings("checkstyle:CyclomaticComplexity") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some suggested renames, but would also be good to just split the use cases into separate tests to avoid having complicated variable names
@@ -285,29 +286,37 @@ public void testIncrementalScanOptions() throws IOException { | |||
"Cannot set only end-snapshot-id for incremental scans. Please, set start-snapshot-id too."); | |||
|
|||
// test (1st snapshot, current snapshot] incremental scan. | |||
List<SimpleRecord> result = | |||
Dataset<Row> resultDf1 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batchResult
|
||
// test (2nd snapshot, 3rd snapshot] incremental scan. | ||
Dataset<Row> resultDf = | ||
Dataset<Row> resultDf2 = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incrementalResult (again could just be two test cases, not picky about that or naming here but we should choose one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved, though there are merge conflicts so this PR will likely need to be rebased.
d13be87
to
3c362a8
Compare
@RussellSpitzer was this close, and do you think it should go to 1.3.1? Edit: realize its already disabled, so i guess it should not go into 1.3.1, as its a new feature and not a bug fix. |
Branch has conflicts again and can't be merged. Please fix when you have the chance. Would be nice if we could get this into 1.4. |
The conflict is very small so i think we can just merge after we get that fixed |
Sorry, I somehow missed the conversation thread for this PR and never resolved the conflicts. I noticed this when I recently checked my Iceberg PRs. It's a bit easier to open a new PR than to resolve the conflicts in this one. Here is the new PR. I will add support in Spark 3.5 first, and then add back the changes in 3.4 and 3.3 afterwards. I will close this PR for now. |
Enable Aggregate push down for incremental scan