Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39676: [C++][Parquet] Fast Random Rowgroup Reads #39677

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4b712fd
Add initial code to update metadata with OffsetIndex
Dec 8, 2023
fcb8c7d
Fix glitches.
Dec 8, 2023
5759219
Updated code to raed from modified index. All pages work except for l…
Dec 8, 2023
aba996e
Get basic dictionary offset calculation working.
Dec 19, 2023
b566085
Try to generalize the dictionary offset calculation so that it will s…
Dec 19, 2023
e104546
Add offset test.
Dec 19, 2023
9555d90
Get basic index read test working inside project.
Dec 20, 2023
f0db140
Streamline test and test for multiple rowgroups.
Dec 20, 2023
e82e745
Cleanup test code.
Dec 20, 2023
fa7df1b
Add IndexedTo method to metadata class to create a subset via the pag…
Dec 20, 2023
69cf241
Add hackish code to be able to read only row group 0.
Dec 21, 2023
740e0e4
Bubble up option to read only rowgroup 0. Modify tests to use this me…
Dec 21, 2023
9b104fc
Add more comments for skip_butes
Dec 21, 2023
bbb291b
Add more comments.
Dec 21, 2023
4a47b23
Get OffsetIndex reader working.
Dec 23, 2023
550c947
Streamline GetAllOffsets code.
Jan 8, 2024
db18d3b
Add code to benchmark indexed read.
Jan 9, 2024
2d8fb1f
Tune index reader.
Jan 9, 2024
f6cb980
Update performance notes.
Jan 10, 2024
703e28b
Upgrade internal thrift deserializer to support multiple reads effici…
Jan 12, 2024
c2850a6
Tweak code for performance and better benchmarking.
Jan 12, 2024
d972989
Slight performance tweak.
Jan 12, 2024
01081ab
Improve docs.
Jan 12, 2024
e9433f7
Update benchmark results.
Jan 12, 2024
e6ab5cf
Update code to write dictionary pages to offsets. Update set_column_o…
Jan 12, 2024
641db01
Merge branch 'main' into offset_reader
Jan 17, 2024
b88c414
Remove whitespace changes.
Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion cpp/src/generated/parquet_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion cpp/src/generated/parquet_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,19 @@ class SerializedPageWriter : public PageWriter {

PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));

// Add the dictionary page to the offsets
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
if (compressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed dictionary page size overflows to INT32_MAX.");
}

/// start_pos is a relative offset in the buffered mode. It should be
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
offset_index_builder_->AddPage(start_pos, static_cast<int32_t>(compressed_size), -1);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be a bug or oversight in the existing implementation? When decoding data for a rowgroup, if a dictionary is enabled you need to read the dictionary page. Currently this dictionary page is not written to the OffsetIndex section and, without it, there is no reliable way to know where the dictionary page can be found (due to variable sizes). Therefore, and since it is a type of data page, I believe it should be written to the OffsetIndex collection. Anyway, the current implementation needs it to decode dictionary data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm don't think this is an oversight. Dictionary page offset is stored in column metadata and the first data page offset is also stored, the length should be the difference of these two. CC @wgtmac to confirm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the difference between offsets to dict page and 1st data page can be used as the upper-bound (exact in most cases) size of the dict page.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield @wgtmac
So, what you are saying is correct, but it seems I have not clearly explained my reasoning here.
The idea behind PageIndex is to make "point lookups I/O efficient". So, you can look at a ColumnIndex to find a range of values for a particular column, then use the OffsetIndex to efficiently load that data. For directly encoded data, the OffsetIndex is great, you can directly load and examine column data. For dictionary encoded data there is a problem. You can't read the data without having the dictionary. So, I believe that the OffsetIndex should contain the dictionary page for two reasons:

  1. Without the dictionary page you cannot decode the data in subsequent data pages. Instead, you now have to go retrieve the information from the metadata. This violates the principle that the PageIndex should be a way to directly access data.
  2. The dictionary page, is in fact, a type of data page. So including it should be backwards compatible since readers should recognize its type in the list of pages provided by an OffsetIndex.

Sadly, the spec seems to be silent on what should happen in the case of dictionary encodings. It seems that most writers do not provide this information. I wonder if @mkornacker or @lekv considered dictionary encodings or would be willing to comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At any rate, in this PR I wanted to be able to decode data directly without having to read the full metadata header. For dictionary pages I was unable to guess the location just from the data page (because dictionary pages can have varying sizes). So, I added this information to the PageOffset since I felt it made sense to have it there, but this may not be within the PageIndex spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should clarify the spec, but I believe dictionary pages were intended to be retrieved via column metadata (which would be in the footer). I think it would have been mentioned if it is, and I think the spec always assumes the footer is parsed, and for page indexes there is a copy of the metadata there. The OffsetIndex is meant to directly parallel the ColumnIndex which only keep stats for data pages.

The dictionary page, is in fact, a type of data page. So including it should be backwards compatible since readers should recognize its type in the list of pages provided by an OffsetIndex.

In the sense that everything is "data" that is correct. But in parquet dictionaries are modeled as a separate concept. The actual type of page is not stored in either index, it is therefore not really possible to make this a compatible change because old readers would likely error out here. Parquet-mr does not touch the index when writing a dictionary page either it does for data pages.

Copy link
Author

@corwinjoy corwinjoy Jan 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield To be specific I do actually mean dictionary page in a much more specific way. As seen in the
spec:

enum PageType {
  DATA_PAGE = 0;
  INDEX_PAGE = 1;
  DICTIONARY_PAGE = 2;
  DATA_PAGE_V2 = 3;
}

And this is reflected in the PageHeader

struct PageHeader {
  /** the type of the page: indicates which of the *_header fields is set **/
  1: required PageType type

  /** Uncompressed page size in bytes (not including this header) **/
  2: required i32 uncompressed_page_size

  /** Compressed (and potentially encrypted) page size in bytes, not including this header **/
  3: required i32 compressed_page_size

  /** The 32-bit CRC checksum for the page, to be be calculated as follows:
   ...
   */
  4: optional i32 crc

  // Headers for page specific data.  One only will be set.
  5: optional DataPageHeader data_page_header;
  6: optional IndexPageHeader index_page_header;
  7: optional DictionaryPageHeader dictionary_page_header;
  8: optional DataPageHeaderV2 data_page_header_v2;
}

So that the dictionary page is just one kind of page. This is reflected in the reader class as in column_page.h:150

class DictionaryPage : public Page {
...
}

So, when dictionary encoded data is written to the file, first a dictionary page is written, then data pages are written. The parquet reader enforces and leverages this fact. It actually reads from a single collection of pages where it requires that the first page be a dictionary page for dictionary encoded pages. So, including the encoding page in the OffsetIndex seems pretty natural to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are in agreement on "a dictionary page is a type of page". The original statement I was responding to was:

The dictionary page, is in fact, a type of data page.

Which as highlighted above data pages have distinct enums and header metadata. It could be PageIndex was originally intended for all page types, in actual implementations and given existing metadata the index captures it is only for data pages. Any changes to these semantics pose challenges for compatibility.


total_uncompressed_size_ += uncompressed_size + header_size;
total_compressed_size_ += output_data_len + header_size;
++dict_encoding_stats_[page.encoding()];
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -909,8 +909,9 @@ ::arrow::Future<> ParquetFileReader::WhenBuffered(
// File metadata helpers

std::shared_ptr<FileMetaData> ReadMetaData(
const std::shared_ptr<::arrow::io::RandomAccessFile>& source) {
return ParquetFileReader::Open(source)->metadata();
const std::shared_ptr<::arrow::io::RandomAccessFile>& source,
const ReaderProperties& props) {
return ParquetFileReader::Open(source, props)->metadata();
}

// ----------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class PARQUET_EXPORT ParquetFileReader {

// Read only Parquet file metadata
std::shared_ptr<FileMetaData> PARQUET_EXPORT
ReadMetaData(const std::shared_ptr<::arrow::io::RandomAccessFile>& source);
ReadMetaData(const std::shared_ptr<::arrow::io::RandomAccessFile>& source, const ReaderProperties& props = default_reader_properties());

/// \brief Scan all values in file. Useful for performance testing
/// \param[in] columns the column numbers to scan. If empty scans all
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "parquet/schema.h"
#include "parquet/schema_internal.h"
#include "parquet/thrift_internal.h"
#include "parquet/page_index.h"

namespace parquet {

Expand Down Expand Up @@ -610,6 +611,10 @@ class FileMetaData::FileMetaDataImpl {
: properties_(std::move(properties)), file_decryptor_(std::move(file_decryptor)) {
metadata_ = std::make_unique<format::FileMetaData>();

if(properties_.read_only_rowgroup_0()) {
metadata_->read_only_rowgroup_0 = true;
}

auto footer_decryptor =
file_decryptor_ != nullptr ? file_decryptor_->GetFooterDecryptor() : nullptr;

Expand Down Expand Up @@ -826,6 +831,32 @@ class FileMetaData::FileMetaDataImpl {
file_decryptor_ = std::move(file_decryptor);
}

void set_column_offsets(const std::vector<std::shared_ptr<OffsetIndex>>& column_offsets, int64_t expected_num_rows) {
if (num_row_groups() != 1) {
throw ParquetException(
"This operation can only be applied to metadata with a single row group");
}

format::RowGroup& row_group = metadata_->row_groups[0];
int idx = 0;
for (format::ColumnChunk& chunk : row_group.columns) {
// Assume a chunk has only 1 page for now
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not quite sure how this should go when a ColumnChunk has multiple pages or if that is even possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a column chunk will very frequently has multiple pages.

auto pages = column_offsets[idx++].get()->page_locations();
for(PageLocation& page: pages) {
if(chunk.meta_data.dictionary_page_offset > 0 && page.first_row_index < 0) {
// Offset is a dictionary page
// Assumes OffsetIndex code has been updated to output dictionary offsets
chunk.meta_data.__set_dictionary_page_offset(page.offset);
} else {
chunk.meta_data.__set_data_page_offset(page.offset);
chunk.meta_data.__set_num_values(expected_num_rows);
// The compressed size can be set too large
// Use the value in row 0
}
}
}
}

private:
friend FileMetaDataBuilder;
uint32_t metadata_len_ = 0;
Expand Down Expand Up @@ -985,6 +1016,24 @@ std::shared_ptr<FileMetaData> FileMetaData::Subset(
return impl_->Subset(row_groups);
}

void FileMetaData::IndexTo(int row_group, const std::vector<ColumnOffsets> &rowgroup_offsets){
std::vector<int> row_groups = {row_group};
auto target_column_offsets = rowgroup_offsets[row_group];
int64_t total_rows = this->num_rows();
int64_t chunk_rows = this->RowGroup(0)->num_rows();
int64_t num_values = chunk_rows;
if (row_group >= total_rows / chunk_rows) {
// last page, set num_values to remainder
num_values = total_rows % chunk_rows;
}
this->set_column_offsets(target_column_offsets, num_values);
}

void FileMetaData::set_column_offsets(const ColumnOffsets& column_offsets, int64_t expected_num_rows) {

impl_->set_column_offsets(column_offsets, expected_num_rows);
}

void FileMetaData::WriteTo(::arrow::io::OutputStream* dst,
const std::shared_ptr<Encryptor>& encryptor) const {
return impl_->WriteTo(dst, encryptor);
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class PARQUET_EXPORT RowGroupMetaData {
};

class FileMetaDataBuilder;
class OffsetIndex;
typedef std::vector<std::shared_ptr<OffsetIndex>> ColumnOffsets;

/// \brief FileMetaData is a proxy around format::FileMetaData.
class PARQUET_EXPORT FileMetaData {
Expand Down Expand Up @@ -396,6 +398,13 @@ class PARQUET_EXPORT FileMetaData {
/// FileMetaData.
std::shared_ptr<FileMetaData> Subset(const std::vector<int>& row_groups) const;

/// \brief Return FileMetaData pointing to a single group
/// as created via page offsets
void IndexTo(int row_group, const std::vector<ColumnOffsets> &rowgroup_offsets);

/// \brief Override column chunk offsets with provided page offsets
void set_column_offsets(const ColumnOffsets& column_offsets, int64_t expected_num_rows);

private:
friend FileMetaDataBuilder;
friend class SerializedFile;
Expand Down
104 changes: 101 additions & 3 deletions cpp/src/parquet/page_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader {
encryption::kOffsetIndex);
}

return OffsetIndex::Make(offset_index_buffer_->data() + buffer_offset, length,
return OffsetIndex::Make(offset_index_buffer_->data() + buffer_offset, &length,
properties_, decryptor.get());
}

Expand Down Expand Up @@ -344,6 +344,54 @@ class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader {
std::shared_ptr<::arrow::Buffer> offset_index_buffer_;
};

/// Read offset index of a single column.
class OffsetIndexReader {
public:
OffsetIndexReader(
std::shared_ptr<RowGroupMetaData> row_group_metadata,
const ReaderProperties &properties,
InternalFileDecryptor *file_decryptor,
std::shared_ptr<::arrow::Buffer> offset_index_buffer)
:
row_group_metadata_(std::move(row_group_metadata)),
properties_(properties),
file_decryptor_(file_decryptor),
offset_index_buffer_(std::move(offset_index_buffer)){}

std::shared_ptr<OffsetIndex> GetOffsetIndex(int32_t col, int64_t buffer_offset, uint32_t estimated_length,
uint32_t *actual_length) {
auto col_chunk = row_group_metadata_->ColumnChunk(col);
uint32_t actual_len(estimated_length);

// Get decryptor of offset index if encrypted.
std::shared_ptr<Decryptor> decryptor =
GetColumnMetaDecryptor(col_chunk->crypto_metadata().get(), file_decryptor_);
if (decryptor != nullptr) {
UpdateDecryptor(decryptor, 0, /*column_ordinal=*/col,
encryption::kOffsetIndex);
}

auto offset_index = OffsetIndex::Make(offset_index_buffer_->data() + buffer_offset, &actual_len,
properties_, decryptor.get());

*actual_length = actual_len;
return offset_index;
}

private:
/// The row group metadata to get column chunk metadata.
std::shared_ptr<RowGroupMetaData> row_group_metadata_;

/// Reader properties used to deserialize thrift object.
const ReaderProperties &properties_;

/// File-level decryptor.
InternalFileDecryptor *file_decryptor_;

/// Buffer to hold the raw bytes of the page index.
std::shared_ptr<::arrow::Buffer> offset_index_buffer_;
};

class PageIndexReaderImpl : public PageIndexReader {
public:
PageIndexReaderImpl(::arrow::io::RandomAccessFile* input,
Expand Down Expand Up @@ -388,6 +436,56 @@ class PageIndexReaderImpl : public PageIndexReader {
return nullptr;
}

/// Method to read full set of OffsetIndex pages
/// Key feature is that this does not require a full set of metadata
/// Only rowgroup 0 metadata is needed.
std::vector<ColumnOffsets> GetAllOffsets() override{
std::shared_ptr<RowGroupMetaData> row_group_metadata = file_metadata_->RowGroup(0);
int32_t rowgroup_len = 0; // This rowgroup length is just an estimate, may vary by rowgroup
int64_t offset_index_start = -1;
int64_t total_rows = file_metadata_->num_rows();
int64_t chunk_rows = row_group_metadata->num_rows();
// Don't use row_group count from metadata since may be dummy with only rowgroup 0
int num_row_groups = ceil(static_cast<double>(total_rows) / static_cast<double>(chunk_rows));
int num_columns = file_metadata_->num_columns();
// TODO add methods to get offset_index_start and rowgroup_len directly
// This is because ColumnChunk creation is super expensive.
auto col_chunk = row_group_metadata->ColumnChunk(0);
auto offset_index_location = col_chunk->GetOffsetIndexLocation();
offset_index_start = offset_index_location->offset;
rowgroup_len = offset_index_location->length * num_columns;

// Retrieve 1.5x the estimated size to allow for variation in storing pages
// This is just a guess, but we can go over because metadata comes after offsets
// So, we can retrieve a slightly larger buffer here
float overhead_factor = 1.5;
int32_t est_offset_index_size = num_row_groups * rowgroup_len * overhead_factor;
std::shared_ptr<::arrow::Buffer> offset_index_buffer;
PARQUET_ASSIGN_OR_THROW(offset_index_buffer,
input_->ReadAt(offset_index_start,
est_offset_index_size));

// Perform a direct read against the buffer for performance
ThriftDeserializer deserializer(properties_);
uint32_t len_used(est_offset_index_size);
deserializer.SetInternalBuffer(const_cast<uint8_t*>(offset_index_buffer->data()), &len_used);

std::vector<ColumnOffsets> rowgroup_offsets;
rowgroup_offsets.reserve(num_row_groups);
format::OffsetIndex offset_index;
for (int rg = 0; rg < num_row_groups; ++rg) {
ColumnOffsets offset_indexes;
offset_indexes.reserve(num_columns);
for (int col = 0; col < num_columns; ++col) {
deserializer.DeserializeUnencryptedMessageUsingInternalBuffer(&offset_index);
auto offset_index_ptr = std::make_shared<OffsetIndexImpl>(offset_index);
offset_indexes.emplace_back(std::move(offset_index_ptr));
}
rowgroup_offsets.emplace_back(std::move(offset_indexes));
}
return rowgroup_offsets;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would love to do is only read the OffsetIndex entries that we need. That is, just the rowgroups and column indexes that are required for a random access read. Sadly, I don't think that is possible because the OffsetIndex entries are written in thrift Compact protocol. This means that encoded page addresses may be of variable size. (ARGH!!). I think it would make a lot of sense to propose a parquet format enhancement where the index entries are written using Binary protocol with a fixed size. This would allow for random access and make data access A LOT faster.

void WillNeed(const std::vector<int32_t>& row_group_indices,
const std::vector<int32_t>& column_indices,
const PageIndexSelection& selection) override {
Expand Down Expand Up @@ -908,13 +1006,13 @@ std::unique_ptr<ColumnIndex> ColumnIndex::Make(const ColumnDescriptor& descr,
}

std::unique_ptr<OffsetIndex> OffsetIndex::Make(const void* serialized_index,
uint32_t index_len,
uint32_t *index_len,
const ReaderProperties& properties,
Decryptor* decryptor) {
format::OffsetIndex offset_index;
ThriftDeserializer deserializer(properties);
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(serialized_index),
&index_len, &offset_index, decryptor);
index_len, &offset_index, decryptor);
return std::make_unique<OffsetIndexImpl>(offset_index);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably don't need this anymore. Originally I had to keep track of the bytes used as I moved through the buffer. But then, I added an improved method to the deserializer itself for speed.


Expand Down
Loading