-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support user defined ParquetAccessPlan
in ParquetExec
, validation to ParquetAccessPlan::select
#10813
Conversation
…ation to ParquetAccessPlan::select
3e839ef
to
38fa45a
Compare
ParquetAccessPlan
to be passed in to ParquetExec
, add validation to ParquetAccessPlan::select
ParquetAccessPlan
in ParquetExec
, validation to ParquetAccessPlan::select
@@ -182,6 +183,11 @@ impl ParquetAccessPlan { | |||
/// is returned for *all* the rows in the row groups that are not skipped. | |||
/// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`]. | |||
/// | |||
/// # Errors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since users can now provide a ParquetAccessPlan
it is important to do validation on the contents.
While technically we could avoid doing this validation when the selections came from the page pruning, I think it would be a good check to have to catch future bugs rather than subtle wrong results so I chose to always validate
@@ -366,7 +410,8 @@ mod test { | |||
RowSelector::select(10), | |||
// selectors from the second row group | |||
RowSelector::select(5), | |||
RowSelector::skip(7) | |||
RowSelector::skip(7), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
turns out that some of the existing unit tests were actually invalid. However, I think the issues are actually test problem, not actual code problems. All the actual parquet reader tests passed
@@ -139,7 +139,8 @@ impl FileOpener for ParquetOpener { | |||
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); | |||
let rg_metadata = file_metadata.row_groups(); | |||
// track which row groups to actually read | |||
let access_plan = ParquetAccessPlan::new_all(rg_metadata.len()); | |||
let access_plan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the required plumbing, which I am quite pleased with -- it is quite straightforward now
let mut finder = MetricsFinder { metrics: None }; | ||
accept(physical_plan.as_ref(), &mut finder).unwrap(); | ||
let parquet_metrics = finder.metrics.unwrap(); | ||
let parquet_metrics = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulled into a new file so I could reuse it
/// | ||
/// For a complete example, see the [`parquet_index_advanced` example]). | ||
/// | ||
/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be added in #10701
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW #10701 (example of how to use this API) is ready for review
} | ||
|
||
// validate all Selections | ||
for (idx, (rg, rg_meta)) in self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new checking added as users can pass in ParquetAccessPlan
and the semantics are quite subtle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good explanation, makes sense to me coming in with no context on the original issue 👍
datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Outdated
Show resolved
Hide resolved
Co-authored-by: Jeffrey Vo <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Jefffrey for your review
BTW I am happy to make additional corrections as follow on PRs if anyone has additional notes cc @advancedxy @thinkharderdev @crepererum @NGA-TRAN and @Ted-Jiang @xinlifoobar and @hengfeiyang who reviewed the original PR to create |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb, this is great work. I think the code is pretty good.
Sorry for being late to the party, left some minor style issues comment.
@@ -348,14 +384,22 @@ mod test { | |||
let access_plan = ParquetAccessPlan::new(vec![ | |||
RowGroupAccess::Scan, | |||
RowGroupAccess::Selection( | |||
vec![RowSelector::select(5), RowSelector::skip(7)].into(), | |||
// select / skip all 20 rows in row group 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: to be consistent with L427 in this file, it would be better to call it as
specifies all 20 rows in row group
.
fn test_invalid_too_few() { | ||
let access_plan = ParquetAccessPlan::new(vec![ | ||
RowGroupAccess::Scan, | ||
// select 12 rows, but row group 1 has 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: -> specifies 12 rows
?
I think the select is referred to as selection, which the following code also includes a skip.
fn test_invalid_too_many() { | ||
let access_plan = ParquetAccessPlan::new(vec![ | ||
RowGroupAccess::Scan, | ||
// select 22 rows, but row group 1 has only 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
/// The `ParquetExec` will try and further reduce any provided | ||
/// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: there are two further
in this sentence. How about:
/// The `ParquetExec` will try and reduce any provided
/// `ParquetAccessPlan` further based on the contents ...
|
||
// check row group count matches the plan | ||
return Ok(access_plan.clone()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: is it better to add a logging in the else branch?
No worries -- thank you for the comments. I will make a PR to address them shortly |
PR with comments: #10896 |
… to `ParquetAccessPlan::select` (apache#10813) * Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select * Add test for filtering and user supplied access plan * fix on windows * Apply suggestions from code review Co-authored-by: Jeffrey Vo <[email protected]> --------- Co-authored-by: Jeffrey Vo <[email protected]>
… to `ParquetAccessPlan::select` (apache#10813) (#8) * Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select * Add test for filtering and user supplied access plan * fix on windows * Apply suggestions from code review --------- Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Jeffrey Vo <[email protected]>
… to `ParquetAccessPlan::select` (apache#10813) * Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select * Add test for filtering and user supplied access plan * fix on windows * Apply suggestions from code review Co-authored-by: Jeffrey Vo <[email protected]> --------- Co-authored-by: Jeffrey Vo <[email protected]>
This PR looks big but the 85% of it is tests and documentation
Which issue does this PR close?
Closes #9929
Rationale for this change
Many query engines / use cases have some sort of a specialized index for data stored in parquet. This index can be used to determine which row groups / selections within a file are needed
However, the DataFusion
ParquetExec
has no way for users to pass this information in. Instead it tries to prune row groups based on the min/max statistics and other information in the file's metadata.This PR makes it possible for users to pass in a
ParquetAccessPlan
added in #10738 toParquetExec
with a starting plan, which is then further pruned based on the file's metadata.What changes are included in this PR?
ParquetAccessPlan
for eachPartitionedFile
read byParquetExec
ParquetAccessPlan
now that it can be specified by usersAre these changes tested?
Yes, new tests are added
Are there any user-facing changes?
a new API
Here is a complete end to end example of using this API: #10701