Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:Add function for row alignment with page mask #1791

Merged
merged 13 commits into from
Jun 6, 2022

Conversation

Ted-Jiang
Copy link
Member

@Ted-Jiang Ted-Jiang commented Jun 5, 2022

Which issue does this PR close?

Closes #1790.

Rationale for this change

For now row group filter in datafusion pass a closure to arrow-rs

fn build_row_group_predicate(
    pruning_predicate: &PruningPredicate,
    metrics: ParquetFileMetrics,
) -> Box<dyn FnMut(&RowGroupMetaData, usize) -> bool> {

https://github.com/apache/arrow-datafusion/blob/585bc3a629b92ea7a86ebfe8bf762dbef4155710/datafusion/core/src/physical_plan/file_format/parquet.rs#L559-L562

So for page filter in datafusion, define filter_predicate

 Box<dyn FnMut(&[pageIndex], &[pageLocation], usize) -> &[bool]>

datafusion will send a mask(&[bool]) to arrow-rs,
then use mask call compute_row_ranges to construct RowRanges : row ranges in a row-group (one col) if col is sorted vec size will be 1.
For multi filter combine:
if there are two filters use and connect,use RowRanges::intersection to get the final rowRange; two filters use or connect,use RowRanges::union to get the final rowRange.

After this PR: i will working on column_page_reader, enable read specific row ranges record.

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label Jun 5, 2022
@codecov-commenter
Copy link

codecov-commenter commented Jun 5, 2022

Codecov Report

Merging #1791 (adc48cf) into master (c1a91dc) will increase coverage by 0.02%.
The diff coverage is 91.44%.

@@            Coverage Diff             @@
##           master    #1791      +/-   ##
==========================================
+ Coverage   83.39%   83.42%   +0.02%     
==========================================
  Files         198      199       +1     
  Lines       56142    56427     +285     
==========================================
+ Hits        46821    47075     +254     
- Misses       9321     9352      +31     
Impacted Files Coverage Δ
parquet/src/file/metadata.rs 95.12% <ø> (ø)
parquet/src/file/serialized_reader.rs 94.46% <ø> (-1.17%) ⬇️
parquet/src/file/page_index/range.rs 91.44% <91.44%> (ø)
parquet/src/util/cursor.rs 62.18% <0.00%> (-1.69%) ⬇️
parquet_derive/src/parquet_field.rs 65.98% <0.00%> (-0.23%) ⬇️
arrow/src/ipc/reader.rs 90.73% <0.00%> (-0.11%) ⬇️
parquet/src/file/writer.rs 92.84% <0.00%> (-0.02%) ⬇️
parquet/src/arrow/arrow_writer.rs 97.76% <0.00%> (-0.02%) ⬇️
parquet/src/arrow/schema.rs 96.81% <0.00%> (-0.01%) ⬇️
... and 14 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c1a91dc...adc48cf. Read the comment docs.

@Ted-Jiang
Copy link
Member Author

@tustvold @alamb @viirya PTAL 😊

@tustvold
Copy link
Contributor

tustvold commented Jun 5, 2022

Thank you, I will review this tomorrow (GMT)

}

/// Return the row ranges `Vec(start, len)` of all the selected pages
pub fn compute_row_ranges(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this be used?

Do you need to make something like with_predicate in ReadOptionsBuilder to take a closure for filtering pages based on the ranges?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, basically add a closure called page_predicates in ReadOptionsBuilder like

Box<dyn FnMut(&[pageIndex], &[pageLocation], usize) -> &[bool]>

to generate mask then call this function compute_row_ranges .

I'm thinking about doing this at the end for testing with datafusion.

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, left some high level comments. Will review more thoroughly tomorrow

@@ -223,6 +223,7 @@ pub struct RowGroupMetaData {
num_rows: i64,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
// Todo add filter result -> row range
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about this the more I wonder whether the metadata structs are the right place to put the index information. They're parsed and interpreted separately from the main metadata, and so I think it makes sense for them to be stored separately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. page index stored in file-meta level.
My thought is read less pageIndex after rowgroup filter

let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, rg_meta) in row_groups.into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
keep = false;
break;
}
}
if keep {
filtered_row_groups.push(rg_meta);
}
}

metadata: ParquetMetaData::new(
metadata.file_metadata().clone(),
filtered_row_groups,
),

So i want to read index here and insert it into RowGroupMetaData.
It was just a simple idea at first, maybe we can find a better way in the process of implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

page index stored in file-meta level.

It isn't even file-meta level, it isn't part of the footer but stored as separate pages 😅

It was just a simple idea at first, maybe we can find a better way in the process of implementation

Provided we take care to ensure we keep things pub(crate) so we don't break APIs, this seems like a good strategy 👍

Copy link
Member Author

@Ted-Jiang Ted-Jiang Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't even file-meta level, it isn't part of the footer but stored as separate pages 😅

yes separately from RowGroup, before the footer !😂

parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Show resolved Hide resolved
parquet/src/file/page_index/mod.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, I am, however, a bit confused as to how it will be used... I had expected something like the following.

For each not-pruned row group:

  • Use Index to identify covering set of rows based on predicates
  • Pass row selection down to RecordReader
  • Add a skip_next_page to PageReader
  • Add a skip_values to ColumnValueDecoder
  • Have RecordReader use a combination of the above to skip pages and rows during decode based on the row selection

This would also naturally extend to #1191

I'm not entirely sure where the datastructure added in this PR would fit into this?

Edit: is the intention to use this for the first step?

parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
parquet/src/file/page_index/range.rs Outdated Show resolved Hide resolved
Ted-Jiang and others added 2 commits June 6, 2022 20:27
@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Jun 6, 2022

Edit: is the intention to use this for the first step?

Basically, yes ! 👍

Use Index to identify covering set of rows based on predicates
Pass row selection down to RecordReader
Add a skip_next_page to PageReader
Add a skip_values to ColumnValueDecoder
Have RecordReader use a combination of the above to skip pages and rows during decode based on the row selection

First step contains:

  1. Use SerializedFileReader read fileMeta get file_metadata(optional on page_index)
  2. Use min_max on row group filter filter useless group (already exist)
  3. Read specific row group page_index construct Index(Support reading PageIndex from column metadata #1761)
  4. Use both index and filter generate pagemask (this one)
    ....

@tustvold I will follow up in PageReader
Hope will get big performance improvement in selective query!

@Ted-Jiang Ted-Jiang changed the title feat:Implement page filtering with Row Alignment feat:Add function for row alignment with page mask Jun 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add function for row alignment with page mask
4 participants