From 7190cf54e6c7c179b0746ef9bcae92706e5d927f Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Wed, 2 Aug 2023 23:53:13 +0000 Subject: [PATCH 1/7] Add api to get a RecordReader for a column from a RowGroupReader. --- cpp/src/parquet/column_writer.cc | 19 +------------------ cpp/src/parquet/file_reader.cc | 22 ++++++++++++++++++++++ cpp/src/parquet/file_reader.h | 7 +++++++ cpp/src/parquet/level_conversion.h | 18 ++++++++++++++++++ cpp/src/parquet/reader_test.cc | 15 +++++++++++++++ 5 files changed, 63 insertions(+), 18 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 33e9f8f6658ae..08a7b6ccc4e67 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -140,23 +140,6 @@ struct ValueBufferSlicer { MemoryPool* pool_; }; -internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { - internal::LevelInfo level_info; - level_info.def_level = descr->max_definition_level(); - level_info.rep_level = descr->max_repetition_level(); - - int16_t min_spaced_def_level = descr->max_definition_level(); - const ::parquet::schema::Node* node = descr->schema_node().get(); - while (node != nullptr && !node->is_repeated()) { - if (node->is_optional()) { - min_spaced_def_level--; - } - node = node->parent(); - } - level_info.repeated_ancestor_def_level = min_spaced_def_level; - return level_info; -} - template inline const T* AddIfNotNull(const T* base, int64_t offset) { if (base != nullptr) { @@ -726,7 +709,7 @@ class ColumnWriterImpl { Encoding::type encoding, const WriterProperties* properties) : metadata_(metadata), descr_(metadata->descr()), - level_info_(ComputeLevelInfo(metadata->descr())), + level_info_(internal::LevelInfo::ComputeLevelInfo(metadata->descr())), pager_(std::move(pager)), has_dictionary_(use_dictionary), encoding_(encoding), diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 488c3b95927eb..98d77382f7b9c 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -80,6 +80,28 @@ std::shared_ptr RowGroupReader::Column(int i) { const_cast(contents_->properties())->memory_pool()); } +std::shared_ptr RowGroupReader::RecordReader( + int i, bool read_dictionary, bool read_dense_for_nullable) { + if (i >= metadata()->num_columns()) { + std::stringstream ss; + ss << "Trying to read column index " << i << " but row group metadata has only " + << metadata()->num_columns() << " columns"; + throw ParquetException(ss.str()); + } + const ColumnDescriptor* descr = metadata()->schema()->Column(i); + + std::unique_ptr page_reader = contents_->GetColumnPageReader(i); + + internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr); + + auto reader = internal::RecordReader::Make( + descr, level_info, + const_cast(contents_->properties())->memory_pool(), + read_dictionary, read_dense_for_nullable); + reader->SetPageReader(std::move(page_reader)); + return reader; +} + std::shared_ptr RowGroupReader::ColumnWithExposeEncoding( int i, ExposedEncoding encoding_to_expose) { std::shared_ptr reader = Column(i); diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 13fcd196c4459..e7b8b3c771c65 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -22,6 +22,7 @@ #include #include +#include "parquet/column_reader.h" #include "arrow/io/caching.h" #include "arrow/util/type_fwd.h" #include "parquet/metadata.h" // IWYU pragma: keep @@ -58,6 +59,12 @@ class PARQUET_EXPORT RowGroupReader { // column. Ownership is shared with the RowGroupReader. std::shared_ptr Column(int i); + // Construct a RecordReader for the indicated row group-relative column i. + // Ownership is shared with the RowGroupReader. + // Set read_dense_for_nullable to true if reading dense and not leaving space for null. + std::shared_ptr<::parquet::internal::RecordReader> RecordReader( + int i, bool read_dictionary = false, bool read_dense_for_nullable = false); + // Construct a ColumnReader, trying to enable exposed encoding. // // For dictionary encoding, currently we only support column chunks that are fully diff --git a/cpp/src/parquet/level_conversion.h b/cpp/src/parquet/level_conversion.h index 480d82ed0d81a..5ba15e336d41b 100644 --- a/cpp/src/parquet/level_conversion.h +++ b/cpp/src/parquet/level_conversion.h @@ -122,6 +122,24 @@ struct PARQUET_EXPORT LevelInfo { return last_repeated_ancestor; } + // Calculates and returns LevelInfo for a column descriptor. + static LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) { + LevelInfo level_info; + level_info.def_level = descr->max_definition_level(); + level_info.rep_level = descr->max_repetition_level(); + + int16_t min_spaced_def_level = descr->max_definition_level(); + const ::parquet::schema::Node* node = descr->schema_node().get(); + while (node != nullptr && !node->is_repeated()) { + if (node->is_optional()) { + min_spaced_def_level--; + } + node = node->parent(); + } + level_info.repeated_ancestor_def_level = min_spaced_def_level; + return level_info; + } + friend std::ostream& operator<<(std::ostream& os, const LevelInfo& levels) { // This print method is to silence valgrind issues. What's printed // is not important because all asserts happen directly on diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 0a73002846add..6c0de71b6eeb1 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -502,6 +502,21 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException); } +// Tests getting a record reader from a row group reader. +TEST_F(TestAllTypesPlain, TestRecordReader) { + std::shared_ptr group = reader_->RowGroup(0); + + std::shared_ptr col_record_reader_ = + group->RecordReader(0, /*read_dictionary=*/false, /*read_dense_for_null=*/false); + + ASSERT_TRUE(col_record_reader_->HasMoreData()); + auto records_read = col_record_reader_->ReadRecords(4); + ASSERT_EQ(records_read, 4); + ASSERT_EQ(4, col_record_reader_->values_written()); + ASSERT_EQ(4, col_record_reader_->levels_position()); + ASSERT_EQ(8, col_record_reader_->levels_written()); +} + class TestLocalFile : public ::testing::Test { public: void SetUp() { From 1b726c311a5075afe6dfdd89a0caa13658851b79 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Thu, 3 Aug 2023 00:04:12 +0000 Subject: [PATCH 2/7] Minor edit. --- cpp/src/parquet/file_reader.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index e7b8b3c771c65..641368b7f32c6 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -61,9 +61,8 @@ class PARQUET_EXPORT RowGroupReader { // Construct a RecordReader for the indicated row group-relative column i. // Ownership is shared with the RowGroupReader. - // Set read_dense_for_nullable to true if reading dense and not leaving space for null. - std::shared_ptr<::parquet::internal::RecordReader> RecordReader( - int i, bool read_dictionary = false, bool read_dense_for_nullable = false); + std::shared_ptr RecordReader( + int i, bool read_dictionary = false, bool read_dense_for_nullable = false); // Construct a ColumnReader, trying to enable exposed encoding. // From 7d6e8dbf3f058600b1ea517b1f68280c2f717bb1 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Thu, 3 Aug 2023 16:41:26 +0000 Subject: [PATCH 3/7] Fix lint errors --- cpp/src/parquet/file_reader.h | 2 +- cpp/src/parquet/level_conversion.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 641368b7f32c6..8b1e0e394e9f9 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -22,9 +22,9 @@ #include #include -#include "parquet/column_reader.h" #include "arrow/io/caching.h" #include "arrow/util/type_fwd.h" +#include "parquet/column_reader.h" #include "parquet/metadata.h" // IWYU pragma: keep #include "parquet/platform.h" #include "parquet/properties.h" diff --git a/cpp/src/parquet/level_conversion.h b/cpp/src/parquet/level_conversion.h index 3a42c15b28500..2c6f628319fc4 100644 --- a/cpp/src/parquet/level_conversion.h +++ b/cpp/src/parquet/level_conversion.h @@ -129,7 +129,7 @@ struct PARQUET_EXPORT LevelInfo { int16_t min_spaced_def_level = descr->max_definition_level(); const ::parquet::schema::Node* node = descr->schema_node().get(); - while (node != nullptr && !node->is_repeated()) { + while (node && !node->is_repeated()) { if (node->is_optional()) { min_spaced_def_level--; } From 2d073bf45926b3f8a24a29cd228d2e818da0ef71 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Thu, 3 Aug 2023 16:58:13 +0000 Subject: [PATCH 4/7] Remove const_cast --- cpp/src/parquet/file_reader.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 5c1acd1b2066d..45494e9498149 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -96,10 +96,9 @@ std::shared_ptr RowGroupReader::RecordReader( internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr); - auto reader = internal::RecordReader::Make( - descr, level_info, - const_cast(contents_->properties())->memory_pool(), - read_dictionary, read_dense_for_nullable); + auto reader = internal::RecordReader::Make(descr, level_info, + contents_->properties()->memory_pool(), + read_dictionary, read_dense_for_nullable); reader->SetPageReader(std::move(page_reader)); return reader; } From c315dab1552fdeddcb7ede98092f942f66a98a55 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Tue, 3 Oct 2023 23:23:52 +0000 Subject: [PATCH 5/7] Add read_dense_for_nullable to ReaderProperties. --- cpp/src/parquet/file_reader.cc | 9 ++++----- cpp/src/parquet/file_reader.h | 3 +-- cpp/src/parquet/properties.h | 6 ++++++ cpp/src/parquet/reader_test.cc | 12 ++++++++---- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index a7dbd2cd4814d..1306459d71e90 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -82,8 +82,7 @@ std::shared_ptr RowGroupReader::Column(int i) { const_cast(contents_->properties())->memory_pool()); } -std::shared_ptr RowGroupReader::RecordReader( - int i, bool read_dictionary, bool read_dense_for_nullable) { +std::shared_ptr RowGroupReader::RecordReader(int i) { if (i >= metadata()->num_columns()) { std::stringstream ss; ss << "Trying to read column index " << i << " but row group metadata has only " @@ -96,9 +95,9 @@ std::shared_ptr RowGroupReader::RecordReader( internal::LevelInfo level_info = internal::LevelInfo::ComputeLevelInfo(descr); - auto reader = internal::RecordReader::Make(descr, level_info, - contents_->properties()->memory_pool(), - read_dictionary, read_dense_for_nullable); + auto reader = internal::RecordReader::Make( + descr, level_info, contents_->properties()->memory_pool(), + /* read_dictionary = */ false, contents_->properties()->read_dense_for_nullable()); reader->SetPageReader(std::move(page_reader)); return reader; } diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 8b1e0e394e9f9..6edc4a3b41269 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -61,8 +61,7 @@ class PARQUET_EXPORT RowGroupReader { // Construct a RecordReader for the indicated row group-relative column i. // Ownership is shared with the RowGroupReader. - std::shared_ptr RecordReader( - int i, bool read_dictionary = false, bool read_dense_for_nullable = false); + std::shared_ptr RecordReader(int i); // Construct a ColumnReader, trying to enable exposed encoding. // diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index bdc5b15332d09..6bdb21c7fda80 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -79,6 +79,10 @@ class PARQUET_EXPORT ReaderProperties { /// Disable buffered stream reading. void disable_buffered_stream() { buffered_stream_enabled_ = false; } + bool read_dense_for_nullable() const { return read_dense_for_nullable_; } + void enable_read_dense_for_nullable() { read_dense_for_nullable_ = true; } + void disable_read_dense_for_nullable() { read_dense_for_nullable_ = false; } + /// Return the size of the buffered stream buffer. int64_t buffer_size() const { return buffer_size_; } /// Set the size of the buffered stream buffer in bytes. @@ -123,6 +127,8 @@ class PARQUET_EXPORT ReaderProperties { int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit; bool buffered_stream_enabled_ = false; bool page_checksum_verification_ = false; + // Used with a RecordReader. + bool read_dense_for_nullable_ = false; std::shared_ptr file_decryption_properties_; }; diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 6c0de71b6eeb1..b981d8e069252 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -503,12 +503,16 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { } // Tests getting a record reader from a row group reader. -TEST_F(TestAllTypesPlain, TestRecordReader) { - std::shared_ptr group = reader_->RowGroup(0); +TEST(TestFileReader, GetRecordReader) { + ReaderProperties reader_props; + reader_props.enable_read_dense_for_nullable(); + std::unique_ptr file_reader = ParquetFileReader::OpenFile( + alltypes_plain(), /* memory_map = */ false, reader_props); + std::shared_ptr group = file_reader->RowGroup(0); - std::shared_ptr col_record_reader_ = - group->RecordReader(0, /*read_dictionary=*/false, /*read_dense_for_null=*/false); + std::shared_ptr col_record_reader_ = group->RecordReader(0); + ASSERT_TRUE(col_record_reader_->read_dense_for_nullable()); ASSERT_TRUE(col_record_reader_->HasMoreData()); auto records_read = col_record_reader_->ReadRecords(4); ASSERT_EQ(records_read, 4); From bcf29bbb37d9b35d645b2eb66cca5172fc342e81 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Wed, 4 Oct 2023 20:00:54 +0000 Subject: [PATCH 6/7] Add a test that checks that read_dense_for_nullable is passed down to the underlying record reader. --- cpp/src/parquet/file_reader.h | 5 +++- cpp/src/parquet/reader_test.cc | 45 ++++++++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 6edc4a3b41269..5047580fd938c 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -24,7 +24,6 @@ #include "arrow/io/caching.h" #include "arrow/util/type_fwd.h" -#include "parquet/column_reader.h" #include "parquet/metadata.h" // IWYU pragma: keep #include "parquet/platform.h" #include "parquet/properties.h" @@ -38,6 +37,10 @@ class BloomFilterReader; class PageReader; class RowGroupMetaData; +namespace internal { +class RecordReader; +} + class PARQUET_EXPORT RowGroupReader { public: // Forward declare a virtual class 'Contents' to aid dependency injection and more diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index b981d8e069252..067a2075a0adc 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -502,17 +502,58 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException); } +// Tests that read_dense_for_nullable is passed down to the record +// reader. The functionality of read_dense_for_nullable is tested +// elsewhere. +TEST(TestFileReader, RecordReaderReadDenseForNullable) { + // Default is false. + { + ReaderProperties reader_props; + std::unique_ptr file_reader = ParquetFileReader::OpenFile( + alltypes_plain(), /* memory_map = */ false, reader_props); + std::shared_ptr group = file_reader->RowGroup(0); + + std::shared_ptr col_record_reader_ = group->RecordReader(0); + + ASSERT_FALSE(col_record_reader_->read_dense_for_nullable()); + } + // Test enabling it. + { + ReaderProperties reader_props; + reader_props.enable_read_dense_for_nullable(); + std::unique_ptr file_reader = ParquetFileReader::OpenFile( + alltypes_plain(), /* memory_map = */ false, reader_props); + std::shared_ptr group = file_reader->RowGroup(0); + + std::shared_ptr col_record_reader_ = group->RecordReader(0); + + ASSERT_TRUE(col_record_reader_->read_dense_for_nullable()); + } + // Test disabling it. + { + ReaderProperties reader_props; + // We tested that enabling it works above. + reader_props.enable_read_dense_for_nullable(); + reader_props.disable_read_dense_for_nullable(); + std::unique_ptr file_reader = ParquetFileReader::OpenFile( + alltypes_plain(), /* memory_map = */ false, reader_props); + std::shared_ptr group = file_reader->RowGroup(0); + + std::shared_ptr col_record_reader_ = group->RecordReader(0); + + ASSERT_FALSE(col_record_reader_->read_dense_for_nullable()); + } +} + // Tests getting a record reader from a row group reader. TEST(TestFileReader, GetRecordReader) { ReaderProperties reader_props; - reader_props.enable_read_dense_for_nullable(); std::unique_ptr file_reader = ParquetFileReader::OpenFile( alltypes_plain(), /* memory_map = */ false, reader_props); std::shared_ptr group = file_reader->RowGroup(0); std::shared_ptr col_record_reader_ = group->RecordReader(0); - ASSERT_TRUE(col_record_reader_->read_dense_for_nullable()); ASSERT_TRUE(col_record_reader_->HasMoreData()); auto records_read = col_record_reader_->ReadRecords(4); ASSERT_EQ(records_read, 4); From 7b4f8964c6ebe44f4c7be141351271770d47f078 Mon Sep 17 00:00:00 2001 From: Fatemah Panahi Date: Mon, 9 Oct 2023 21:19:57 +0000 Subject: [PATCH 7/7] Address reviewer comments. --- cpp/src/parquet/file_reader.h | 2 +- cpp/src/parquet/reader_test.cc | 42 ++++++++-------------------------- 2 files changed, 10 insertions(+), 34 deletions(-) diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 5047580fd938c..da85b73fc2dfe 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -62,7 +62,7 @@ class PARQUET_EXPORT RowGroupReader { // column. Ownership is shared with the RowGroupReader. std::shared_ptr Column(int i); - // Construct a RecordReader for the indicated row group-relative column i. + // EXPERIMENTAL: Construct a RecordReader for the indicated column of the row group. // Ownership is shared with the RowGroupReader. std::shared_ptr RecordReader(int i); diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 067a2075a0adc..8fe12d3de0b6c 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -506,42 +506,18 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) { // reader. The functionality of read_dense_for_nullable is tested // elsewhere. TEST(TestFileReader, RecordReaderReadDenseForNullable) { - // Default is false. - { - ReaderProperties reader_props; + // We test the default which is false, and also test enabling and disabling + // read_dense_for_nullable. + std::vector reader_properties(3); + reader_properties[1].enable_read_dense_for_nullable(); + reader_properties[2].disable_read_dense_for_nullable(); + for (const auto& reader_props : reader_properties) { std::unique_ptr file_reader = ParquetFileReader::OpenFile( alltypes_plain(), /* memory_map = */ false, reader_props); std::shared_ptr group = file_reader->RowGroup(0); - - std::shared_ptr col_record_reader_ = group->RecordReader(0); - - ASSERT_FALSE(col_record_reader_->read_dense_for_nullable()); - } - // Test enabling it. - { - ReaderProperties reader_props; - reader_props.enable_read_dense_for_nullable(); - std::unique_ptr file_reader = ParquetFileReader::OpenFile( - alltypes_plain(), /* memory_map = */ false, reader_props); - std::shared_ptr group = file_reader->RowGroup(0); - - std::shared_ptr col_record_reader_ = group->RecordReader(0); - - ASSERT_TRUE(col_record_reader_->read_dense_for_nullable()); - } - // Test disabling it. - { - ReaderProperties reader_props; - // We tested that enabling it works above. - reader_props.enable_read_dense_for_nullable(); - reader_props.disable_read_dense_for_nullable(); - std::unique_ptr file_reader = ParquetFileReader::OpenFile( - alltypes_plain(), /* memory_map = */ false, reader_props); - std::shared_ptr group = file_reader->RowGroup(0); - - std::shared_ptr col_record_reader_ = group->RecordReader(0); - - ASSERT_FALSE(col_record_reader_->read_dense_for_nullable()); + std::shared_ptr col_record_reader = group->RecordReader(0); + ASSERT_EQ(reader_props.read_dense_for_nullable(), + col_record_reader->read_dense_for_nullable()); } }