Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39676: [C++][Parquet] Fast Random Rowgroup Reads #39677

Closed
wants to merge 27 commits into from

Conversation

corwinjoy
Copy link

@corwinjoy corwinjoy commented Jan 17, 2024

Rationale for this change

Improve the performance when reading a small row and column sample from a wide parquet table with many rows and columns. (See #39676).

What changes are included in this PR?

A fast method for reading sub-samples of a large parquet file. This is implemented by adding methods to skip expensive metadata reads and use the parquet index to jump directly to a desired RowGroup.
Sample benchmark, parquet file:

Number of Cols=6000, Number of Rows=1000, chunk_size=10 (this means 100 RowGroups) 
wall time with fast reader using index=0.930297s, time with full metadata=1.88833s

Are these changes tested?

These changes are tested by adding a new indexed test to src/parquet/page_index_test.cc, along with a benchmark test.

Are there any user-facing changes?

Right now, the user facing changes are additional features such as an IndexTo method for the metadata class and an option to read only minimal metadata when opening the file. This is a preliminary implementation and it may make sense to improve the interface.

Corwin Joy added 27 commits December 7, 2023 16:36
…ffsets to use dictionary offsets and fix bug with chunk sizes.
Copy link

⚠️ GitHub issue #39676 has been automatically assigned in GitHub to PR creator.

@@ -10,6 +10,7 @@
#include <ostream>

#include <thrift/TToString.h>
#include <thrift/transport/TBufferTransports.h>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that this is a generated file and we don't want to make changes here. However, I am unsure how this is compiled and/or how to make changes at the generation level to do a "minimal" read.

public:
void *vtbl;
::apache::thrift::transport::TBufferBase *trans_;
};
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize the above is a hack, but this is more for proof of concept. See the discussion below.

// skip the remaining rowgroups
uint32_t skip_len = (_size326 -1) * rowgroup_size;
skip_bytes(iprot, skip_len);
xfer += skip_len;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably unsafe since the compact thrift protocol may possibly have a variable number of bytes for each rowgroup. (Though this works in the simple test cases).
I can think of a few other options and would appreciate suggestions:

  1. Just exit after reading the first rowgroup and fill out the remaining fields with defaults. (This would be a bummer because then it could not support encryption).
  2. Reorder the items in the thrift file so that the (long) rowgroup section comes last.
  3. Change the parquet format so that metadata is encoded in a binary rather than a compact format to allow random access / skipping sections.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Background on the compact thrift protocol and why it may give variable size encodings for rowgroups: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md).

/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
offset_index_builder_->AddPage(start_pos, static_cast<int32_t>(compressed_size), -1);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be a bug or oversight in the existing implementation? When decoding data for a rowgroup, if a dictionary is enabled you need to read the dictionary page. Currently this dictionary page is not written to the OffsetIndex section and, without it, there is no reliable way to know where the dictionary page can be found (due to variable sizes). Therefore, and since it is a type of data page, I believe it should be written to the OffsetIndex collection. Anyway, the current implementation needs it to decode dictionary data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm don't think this is an oversight. Dictionary page offset is stored in column metadata and the first data page offset is also stored, the length should be the difference of these two. CC @wgtmac to confirm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the difference between offsets to dict page and 1st data page can be used as the upper-bound (exact in most cases) size of the dict page.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield @wgtmac
So, what you are saying is correct, but it seems I have not clearly explained my reasoning here.
The idea behind PageIndex is to make "point lookups I/O efficient". So, you can look at a ColumnIndex to find a range of values for a particular column, then use the OffsetIndex to efficiently load that data. For directly encoded data, the OffsetIndex is great, you can directly load and examine column data. For dictionary encoded data there is a problem. You can't read the data without having the dictionary. So, I believe that the OffsetIndex should contain the dictionary page for two reasons:

  1. Without the dictionary page you cannot decode the data in subsequent data pages. Instead, you now have to go retrieve the information from the metadata. This violates the principle that the PageIndex should be a way to directly access data.
  2. The dictionary page, is in fact, a type of data page. So including it should be backwards compatible since readers should recognize its type in the list of pages provided by an OffsetIndex.

Sadly, the spec seems to be silent on what should happen in the case of dictionary encodings. It seems that most writers do not provide this information. I wonder if @mkornacker or @lekv considered dictionary encodings or would be willing to comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At any rate, in this PR I wanted to be able to decode data directly without having to read the full metadata header. For dictionary pages I was unable to guess the location just from the data page (because dictionary pages can have varying sizes). So, I added this information to the PageOffset since I felt it made sense to have it there, but this may not be within the PageIndex spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should clarify the spec, but I believe dictionary pages were intended to be retrieved via column metadata (which would be in the footer). I think it would have been mentioned if it is, and I think the spec always assumes the footer is parsed, and for page indexes there is a copy of the metadata there. The OffsetIndex is meant to directly parallel the ColumnIndex which only keep stats for data pages.

The dictionary page, is in fact, a type of data page. So including it should be backwards compatible since readers should recognize its type in the list of pages provided by an OffsetIndex.

In the sense that everything is "data" that is correct. But in parquet dictionaries are modeled as a separate concept. The actual type of page is not stored in either index, it is therefore not really possible to make this a compatible change because old readers would likely error out here. Parquet-mr does not touch the index when writing a dictionary page either it does for data pages.

Copy link
Author

@corwinjoy corwinjoy Jan 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield To be specific I do actually mean dictionary page in a much more specific way. As seen in the
spec:

enum PageType {
  DATA_PAGE = 0;
  INDEX_PAGE = 1;
  DICTIONARY_PAGE = 2;
  DATA_PAGE_V2 = 3;
}

And this is reflected in the PageHeader

struct PageHeader {
  /** the type of the page: indicates which of the *_header fields is set **/
  1: required PageType type

  /** Uncompressed page size in bytes (not including this header) **/
  2: required i32 uncompressed_page_size

  /** Compressed (and potentially encrypted) page size in bytes, not including this header **/
  3: required i32 compressed_page_size

  /** The 32-bit CRC checksum for the page, to be be calculated as follows:
   ...
   */
  4: optional i32 crc

  // Headers for page specific data.  One only will be set.
  5: optional DataPageHeader data_page_header;
  6: optional IndexPageHeader index_page_header;
  7: optional DictionaryPageHeader dictionary_page_header;
  8: optional DataPageHeaderV2 data_page_header_v2;
}

So that the dictionary page is just one kind of page. This is reflected in the reader class as in column_page.h:150

class DictionaryPage : public Page {
...
}

So, when dictionary encoded data is written to the file, first a dictionary page is written, then data pages are written. The parquet reader enforces and leverages this fact. It actually reads from a single collection of pages where it requires that the first page be a dictionary page for dictionary encoded pages. So, including the encoding page in the OffsetIndex seems pretty natural to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are in agreement on "a dictionary page is a type of page". The original statement I was responding to was:

The dictionary page, is in fact, a type of data page.

Which as highlighted above data pages have distinct enums and header metadata. It could be PageIndex was originally intended for all page types, in actual implementations and given existing metadata the index captures it is only for data pages. Any changes to these semantics pose challenges for compatibility.

format::RowGroup& row_group = metadata_->row_groups[0];
int idx = 0;
for (format::ColumnChunk& chunk : row_group.columns) {
// Assume a chunk has only 1 page for now
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not quite sure how this should go when a ColumnChunk has multiple pages or if that is even possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a column chunk will very frequently has multiple pages.

}
return rowgroup_offsets;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would love to do is only read the OffsetIndex entries that we need. That is, just the rowgroups and column indexes that are required for a random access read. Sadly, I don't think that is possible because the OffsetIndex entries are written in thrift Compact protocol. This means that encoded page addresses may be of variable size. (ARGH!!). I think it would make a lot of sense to propose a parquet format enhancement where the index entries are written using Binary protocol with a fixed size. This would allow for random access and make data access A LOT faster.

const ReaderProperties& properties,
Decryptor* decryptor) {
format::OffsetIndex offset_index;
ThriftDeserializer deserializer(properties);
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(serialized_index),
&index_len, &offset_index, decryptor);
index_len, &offset_index, decryptor);
return std::make_unique<OffsetIndexImpl>(offset_index);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably don't need this anymore. Originally I had to keep track of the bytes used as I moved through the buffer. But then, I added an improved method to the deserializer itself for speed.

@corwinjoy
Copy link
Author

This is a somewhat preliminary PR as I wanted to gauge community interest in a fast random access reader for parquet files. Initial results look promising, but I could use feedback / help in a couple areas as noted above. I am tagging a few members that I think may be interested. @huberylee @emkornfield @binmahone

@corwinjoy
Copy link
Author

@m4rs-mt @marcin-krystianc

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Jan 18, 2024
@emkornfield
Copy link
Contributor

I think we should probably discuss on the original issue before getting too much into code specifics.

/**
* Flag to read only the first row group
*/
bool read_only_rowgroup_0;
Copy link
Member

@mapleFU mapleFU Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why changing the generated thrift? And why only the first row-group is read?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mapleFU Please see the discussion I tagged you on in the related github spec. As discussed, the main bottleneck is reading huge metadata files. I needed a way to quickly read minimal metadata. Eventually, this will likely become some derived "fast metadata" class with a thrift spec that reads minimal information. But, I am not all that familiar with the generator and don't know how to do that yet. So, this is a proof-of-concept / performance benchmark modification of the file for now.

@kou kou changed the title GH-39676: Fast Random Rowgroup Reads GH-39676: [C++][Parquet] Fast Random Rowgroup Reads Jan 18, 2024
@emkornfield
Copy link
Contributor

I'm going to close this PR for now I think we should focus on the design space and the API surface area changes and an implementation direction that doesn't involve, hacks. I think we can revisit this when we can come to a consensus. If you want to pursue parquet spec changes, please start a conversation on the parquet dev mailing list.

@corwinjoy
Copy link
Author

Fair enough, this is only an exploratory PR to show what this approach could look like and for performance profiling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++][Parquet] Fast Random Rowgroup Reads
4 participants