From e2e3a9df28db03b09b5a83a60fa8293dfef6d0d8 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 16 May 2023 00:32:07 +0800 Subject: [PATCH] GH-35423: [C++][Parquet] Parquet PageReader Force decompression buffer resize smaller (#35428) ### Rationale for this change See in the issue. 1. SerializedPageReader will reuse same decompression buffer, **and didn't resize if next page is smaller than previous one**, so, `buffer->size()` might not equal to `page.uncompressed_size()`. 2. So, `data_size` in decoder would be equal to previous page size. 3. When it goes to BYTE_STREAM_SPLIT, BYTE_STREAM_SPLIT will check the size not too large. This will cause throw an exception. When it goes to other readers, here is no bug, because uninitialized memory will not be accessed ### What changes are included in this PR? Change reader and add test ### Are these changes tested? Yes ### Are there any user-facing changes? No **This PR contains a "Critical Fix".** * Closes: #35423 Authored-by: mwish Signed-off-by: Antoine Pitrou --- cpp/src/parquet/column_reader.cc | 6 ++-- cpp/src/parquet/file_deserialize_test.cc | 35 +++++++++++++++++++----- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index abcf83a899e47..3294aaaf283f1 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -587,10 +587,8 @@ std::shared_ptr SerializedPageReader::DecompressIfNeeded( } // Grow the uncompressed buffer if we need to. - if (uncompressed_len > static_cast(decompression_buffer_->size())) { - PARQUET_THROW_NOT_OK( - decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false)); - } + PARQUET_THROW_NOT_OK( + decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false)); if (levels_byte_len > 0) { // First copy the levels as-is diff --git a/cpp/src/parquet/file_deserialize_test.cc b/cpp/src/parquet/file_deserialize_test.cc index a0467e2a33b2a..b2a918915e4e6 100644 --- a/cpp/src/parquet/file_deserialize_test.cc +++ b/cpp/src/parquet/file_deserialize_test.cc @@ -196,6 +196,8 @@ class TestPageSerde : public ::testing::Test { bool verification_checksum, bool has_dictionary = false, bool write_data_page_v2 = false); + void TestPageCompressionRoundTrip(const std::vector& page_sizes); + protected: std::shared_ptr<::arrow::io::BufferOutputStream> out_stream_; std::shared_ptr out_buffer_; @@ -704,20 +706,17 @@ TEST_F(TestPageSerde, TestFailLargePageHeaders) { ASSERT_THROW(page_reader_->NextPage(), ParquetException); } -TEST_F(TestPageSerde, Compression) { +void TestPageSerde::TestPageCompressionRoundTrip(const std::vector& page_sizes) { auto codec_types = GetSupportedCodecTypes(); const int32_t num_rows = 32; // dummy value data_page_header_.num_values = num_rows; - const int num_pages = 10; - std::vector> faux_data; + int num_pages = static_cast(page_sizes.size()); faux_data.resize(num_pages); for (int i = 0; i < num_pages; ++i) { - // The pages keep getting larger - int page_size = (i + 1) * 64; - test::random_bytes(page_size, 0, &faux_data[i]); + test::random_bytes(page_sizes[i], 0, &faux_data[i]); } for (auto codec_type : codec_types) { auto codec = GetCodec(codec_type); @@ -753,7 +752,29 @@ TEST_F(TestPageSerde, Compression) { ResetStream(); } -} // namespace parquet +} + +TEST_F(TestPageSerde, Compression) { + std::vector page_sizes; + page_sizes.reserve(10); + for (int i = 0; i < 10; ++i) { + // The pages keep getting larger + page_sizes.push_back((i + 1) * 64); + } + this->TestPageCompressionRoundTrip(page_sizes); +} + +TEST_F(TestPageSerde, PageSizeResetWhenRead) { + // GH-35423: Parquet SerializedPageReader need to + // reset the size after getting a smaller page. + std::vector page_sizes; + page_sizes.reserve(10); + for (int i = 0; i < 10; ++i) { + // The pages keep getting smaller + page_sizes.push_back((10 - i) * 64); + } + this->TestPageCompressionRoundTrip(page_sizes); +} TEST_F(TestPageSerde, LZONotSupported) { // Must await PARQUET-530