-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-2366: Optimize random seek during rewriting #1174
Conversation
Hi, @wgtmac @gszadovszky please help to review this when you are free. Thanks a lot. |
@@ -265,6 +265,10 @@ private void processBlocksFromReader() throws IOException { | |||
BlockMetaData blockMetaData = meta.getBlocks().get(blockId); | |||
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns(); | |||
|
|||
List<ColumnIndex> columnIndexes = readAllColumnIndexes(reader, columnsInOrder, descriptorsMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could add an option to it if someone is concerned about memory usage. This only caches the metadata for only one block and should be smaller than doing file writing which needs to cache all blocks' metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! The change looks reasonable to me. I would suggest adding a new class to specifically cache and read these indexes. The new class have methods like readBloomFilter()
, readColumnIndex()
and readOffsetIndex()
for a specific column path, and can be configured to cache required columns in advance. With this new class, we can do more optimizations including evict consumed items out of cache and use async I/O prefetch to load items. We can split them into separate patches. For the first one, we may simply add the new class without any caching (i.e. no behavior change). WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wgtmac thanks for your suggestions. Do you mean to read the indexes column by column to reduce memory footprint? The suggested way should have less memory usage. The indexes are stored as the following from my understanding:
// column index
block1_col1_column_index
...
block1_coln_column_index
block2_col1_column_index
...
block2_coln_column_index
...
// offset index
block1_col1_offset_index
...
block1_coln_offset_index
block2_col1_offset_index
...
block2_coln_offset_index
...
// bloom index
block1_col1_bloom_index
...
block1_coln_bloom_index
block2_col1_bloom_index
...
block2_coln_bloom_index
...
So the problem would be we still need to do random seek for a single rowgroup(3 * number of columns). The async I/O should be helpful for the random seek performance. With this PR, we only need 3 times random seek (except the column pruning) for a single rowgroup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to read the indexes column by column to reduce memory footprint?
No, my suggested interface does not restrict any implementation detail, at least they should be ready at the readXXX()
call. You can still read all indexes at once (controlled by a config). We can configurated it to release any consumed index object to reduce memory footprint.
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/IndexCacher.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick response! I have left some comments. Let me know what you think.
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/IndexCacher.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/IndexCacher.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/IndexCacher.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change! This is looking good to me.
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
Outdated
Show resolved
Hide resolved
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java
Outdated
Show resolved
Hide resolved
…hIndexCache.java Co-authored-by: Gang Wu <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
cc @gszadovszky @ggershinsky @shangxinli if this interests you. |
Thanks @wgtmac @gszadovszky |
Make sure you have checked all steps below.
Jira
Tests
Commits
Documentation
The
ColunIndex
,OffsetIndex
, andBloomFilter
are stored at the end of the file. We need to randomly seek 4 times when rewriting a column chunk. We found this could impact the rewrite performance heavily for files with a number of columns(~1000). In this PR, we read theColumnIndex
,OffsetIndex
, andBloomFilter
into a cache to avoid the random seek. We got about 60 times performance improvement in production environments for the files with about one thousand columns.