From ec0ee529285e91e93104c550c036b10312986e27 Mon Sep 17 00:00:00 2001 From: Rylan Dmello Date: Fri, 4 Jan 2019 19:16:44 -0500 Subject: [PATCH] PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for PageType::DATA_PAGE_V2 to address incompatibility with parquetjs. Tests This commit doesn't include tests; I am working on them now. I may need to use an actual file generated by parquetjs to test this issue, so I wonder if adding feeds1kMicros.parquet from the JIRA task to the parquet-testing repository is an option. Description parquetjs seems to be writing Parquet V2 files with DataPageV2 pages, while parquet-cpp writes Parquet V2 files with DataPage pages. Since TypedRecordReader::ReadNewPage() only had a branch for PageType::DATA_PAGE, the reader would return without reading any data for records that have DATA_PAGE_V2 pages. This explains the behavior observed in PARQUET-1482. This commit adds a new if-else branch for the DataPageV2 case in TypedRecordReader::ReadNewPage(). Since the DataPageV2 branch needed to reuse the code from the DataPage case, I refactored the repetition/definition level decoder initialization and the data decoder initialization to two new methods in the TypedRecordReader class. These new methods are now called by the DataPage and DataPageV2 initialization branches in TypedRecordReader::ReadNewPage(). There is an alternate implementation possible (with a smaller diff) by sharing the same else-if branch between DataPage and DataPageV2 using a pointer-to-derived shared_ptr. However, since the Page superclass doesn't have the necessary encoding() or num_values() methods, I would need to add a common superclass to both DataPage and DataPageV2 that defined these methods. I didn't do this because I was hesitant to modify the Page class hierarchy for this commit. --- cpp/src/parquet/arrow/record_reader.cc | 183 +++++++++++++++---------- 1 file changed, 108 insertions(+), 75 deletions(-) diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index 4a988dacdd9aa..a8a2270b8b7bf 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -570,6 +570,16 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { DecoderType* current_decoder_; + // Initialize repetition and definition level decoders on the next data page. + template + int64_t InitializeLevelDecoders(const std::shared_ptr page, + const Encoding::type repetition_level_encoding, + const Encoding::type definition_level_encoding); + + template + void InitializeDataDecoder(const std::shared_ptr page, + const int64_t levels_bytes); + // Advance to the next data page bool ReadNewPage() override; @@ -724,11 +734,95 @@ inline void TypedRecordReader::ConfigureDictionary(const DictionaryPage* current_decoder_ = decoders_[encoding].get(); } +// If the data page includes repetition and definition levels, we +// initialize the level decoders and return the number of encoded level bytes. +// The return value helps determine the number of bytes in the encoded data. +template +template +int64_t TypedRecordReader::InitializeLevelDecoders( + const std::shared_ptr page, const Encoding::type repetition_level_encoding, + const Encoding::type definition_level_encoding) { + // Read a data page. + num_buffered_values_ = page->num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + + const uint8_t* buffer = page->data(); + int64_t levels_byte_size = 0; + + // Data page Layout: Repetition Levels - Definition Levels - encoded values. + // Levels are encoded as rle or bit-packed. + // Init repetition levels + if (descr_->max_repetition_level() > 0) { + int64_t rep_levels_bytes = repetition_level_decoder_.SetData( + repetition_level_encoding, descr_->max_repetition_level(), + static_cast(num_buffered_values_), buffer); + buffer += rep_levels_bytes; + levels_byte_size += rep_levels_bytes; + } + // TODO figure a way to set max_definition_level_ to 0 + // if the initial value is invalid + + // Init definition levels + if (descr_->max_definition_level() > 0) { + int64_t def_levels_bytes = definition_level_decoder_.SetData( + definition_level_encoding, descr_->max_definition_level(), + static_cast(num_buffered_values_), buffer); + levels_byte_size += def_levels_bytes; + } + + return levels_byte_size; +} + +// Get a decoder object for this page or create a new decoder if this is the +// first page with this encoding. +template +template +void TypedRecordReader::InitializeDataDecoder(const std::shared_ptr page, + const int64_t levels_byte_size) { + const uint8_t* buffer = page->data() + levels_byte_size; + const int64_t data_size = page->size() - levels_byte_size; + + Encoding::type encoding = page->encoding(); + + if (IsDictionaryIndexEncoding(encoding)) { + encoding = Encoding::RLE_DICTIONARY; + } + + auto it = decoders_.find(static_cast(encoding)); + if (it != decoders_.end()) { + if (encoding == Encoding::RLE_DICTIONARY) { + DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); + } + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case Encoding::PLAIN: { + std::shared_ptr decoder(new PlainDecoder(descr_)); + decoders_[static_cast(encoding)] = decoder; + current_decoder_ = decoder.get(); + break; + } + case Encoding::RLE_DICTIONARY: + throw ParquetException("Dictionary page must be before data page."); + + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: + ParquetException::NYI("Unsupported encoding"); + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_decoder_->SetData(static_cast(num_buffered_values_), buffer, + static_cast(data_size)); +} + template bool TypedRecordReader::ReadNewPage() { // Loop until we find the next data page. - const uint8_t* buffer; - while (true) { current_page_ = pager_->NextPage(); if (!current_page_) { @@ -740,79 +834,18 @@ bool TypedRecordReader::ReadNewPage() { ConfigureDictionary(static_cast(current_page_.get())); continue; } else if (current_page_->type() == PageType::DATA_PAGE) { - const DataPage* page = static_cast(current_page_.get()); - - // Read a data page. - num_buffered_values_ = page->num_values(); - - // Have not decoded any values from the data page yet - num_decoded_values_ = 0; - - buffer = page->data(); - - // If the data page includes repetition and definition levels, we - // initialize the level decoder and subtract the encoded level bytes from - // the page size to determine the number of bytes in the encoded data. - int64_t data_size = page->size(); - - // Data page Layout: Repetition Levels - Definition Levels - encoded values. - // Levels are encoded as rle or bit-packed. - // Init repetition levels - if (descr_->max_repetition_level() > 0) { - int64_t rep_levels_bytes = repetition_level_decoder_.SetData( - page->repetition_level_encoding(), descr_->max_repetition_level(), - static_cast(num_buffered_values_), buffer); - buffer += rep_levels_bytes; - data_size -= rep_levels_bytes; - } - // TODO figure a way to set max_definition_level_ to 0 - // if the initial value is invalid - - // Init definition levels - if (descr_->max_definition_level() > 0) { - int64_t def_levels_bytes = definition_level_decoder_.SetData( - page->definition_level_encoding(), descr_->max_definition_level(), - static_cast(num_buffered_values_), buffer); - buffer += def_levels_bytes; - data_size -= def_levels_bytes; - } - - // Get a decoder object for this page or create a new decoder if this is the - // first page with this encoding. - Encoding::type encoding = page->encoding(); - - if (IsDictionaryIndexEncoding(encoding)) { - encoding = Encoding::RLE_DICTIONARY; - } - - auto it = decoders_.find(static_cast(encoding)); - if (it != decoders_.end()) { - if (encoding == Encoding::RLE_DICTIONARY) { - DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); - } - current_decoder_ = it->second.get(); - } else { - switch (encoding) { - case Encoding::PLAIN: { - std::shared_ptr decoder(new PlainDecoder(descr_)); - decoders_[static_cast(encoding)] = decoder; - current_decoder_ = decoder.get(); - break; - } - case Encoding::RLE_DICTIONARY: - throw ParquetException("Dictionary page must be before data page."); - - case Encoding::DELTA_BINARY_PACKED: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: - ParquetException::NYI("Unsupported encoding"); - - default: - throw ParquetException("Unknown encoding type."); - } - } - current_decoder_->SetData(static_cast(num_buffered_values_), buffer, - static_cast(data_size)); + const auto page = std::static_pointer_cast(current_page_); + const int64_t levels_byte_size = InitializeLevelDecoders( + page, page->repetition_level_encoding(), page->definition_level_encoding()); + InitializeDataDecoder(page, levels_byte_size); + return true; + } else if (current_page_->type() == PageType::DATA_PAGE_V2) { + const auto page = std::static_pointer_cast(current_page_); + // Repetition and definition levels are always encoded using RLE encoding + // in the DataPageV2 format. + const int64_t levels_byte_size = + InitializeLevelDecoders(page, Encoding::RLE, Encoding::RLE); + InitializeDataDecoder(page, levels_byte_size); return true; } else { // We don't know what this page type is. We're allowed to skip non-data