Skip to content

Commit

Permalink
GH-35423: [C++][Parquet] Parquet PageReader Force decompression buffe…
Browse files Browse the repository at this point in the history
…r resize smaller (#35428)

### Rationale for this change

See in the issue. 

1. SerializedPageReader will reuse same decompression buffer, **and didn't resize if next page is smaller than previous one**, so, `buffer->size()` might not equal to `page.uncompressed_size()`.
2. So, `data_size` in decoder would be equal to previous page size.
3. When it goes to BYTE_STREAM_SPLIT, BYTE_STREAM_SPLIT will check the size not too large. This will cause throw an exception. When it goes to other readers, here is no bug, because uninitialized memory will not be accessed

### What changes are included in this PR?

Change reader and add test

### Are these changes tested?

Yes

### Are there any user-facing changes?

No

**This PR contains a "Critical Fix".**
* Closes: #35423

Authored-by: mwish <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
mapleFU authored May 15, 2023
1 parent 660cb6e commit e2e3a9d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
6 changes: 2 additions & 4 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,8 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
}

// Grow the uncompressed buffer if we need to.
if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
PARQUET_THROW_NOT_OK(
decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false));
}
PARQUET_THROW_NOT_OK(
decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false));

if (levels_byte_len > 0) {
// First copy the levels as-is
Expand Down
35 changes: 28 additions & 7 deletions cpp/src/parquet/file_deserialize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class TestPageSerde : public ::testing::Test {
bool verification_checksum, bool has_dictionary = false,
bool write_data_page_v2 = false);

void TestPageCompressionRoundTrip(const std::vector<int>& page_sizes);

protected:
std::shared_ptr<::arrow::io::BufferOutputStream> out_stream_;
std::shared_ptr<Buffer> out_buffer_;
Expand Down Expand Up @@ -704,20 +706,17 @@ TEST_F(TestPageSerde, TestFailLargePageHeaders) {
ASSERT_THROW(page_reader_->NextPage(), ParquetException);
}

TEST_F(TestPageSerde, Compression) {
void TestPageSerde::TestPageCompressionRoundTrip(const std::vector<int>& page_sizes) {
auto codec_types = GetSupportedCodecTypes();

const int32_t num_rows = 32; // dummy value
data_page_header_.num_values = num_rows;

const int num_pages = 10;

std::vector<std::vector<uint8_t>> faux_data;
int num_pages = static_cast<int>(page_sizes.size());
faux_data.resize(num_pages);
for (int i = 0; i < num_pages; ++i) {
// The pages keep getting larger
int page_size = (i + 1) * 64;
test::random_bytes(page_size, 0, &faux_data[i]);
test::random_bytes(page_sizes[i], 0, &faux_data[i]);
}
for (auto codec_type : codec_types) {
auto codec = GetCodec(codec_type);
Expand Down Expand Up @@ -753,7 +752,29 @@ TEST_F(TestPageSerde, Compression) {

ResetStream();
}
} // namespace parquet
}

TEST_F(TestPageSerde, Compression) {
std::vector<int> page_sizes;
page_sizes.reserve(10);
for (int i = 0; i < 10; ++i) {
// The pages keep getting larger
page_sizes.push_back((i + 1) * 64);
}
this->TestPageCompressionRoundTrip(page_sizes);
}

TEST_F(TestPageSerde, PageSizeResetWhenRead) {
// GH-35423: Parquet SerializedPageReader need to
// reset the size after getting a smaller page.
std::vector<int> page_sizes;
page_sizes.reserve(10);
for (int i = 0; i < 10; ++i) {
// The pages keep getting smaller
page_sizes.push_back((10 - i) * 64);
}
this->TestPageCompressionRoundTrip(page_sizes);
}

TEST_F(TestPageSerde, LZONotSupported) {
// Must await PARQUET-530
Expand Down

0 comments on commit e2e3a9d

Please sign in to comment.