From dca69af1b5b61ae93cc992dfda1631ebc4edf694 Mon Sep 17 00:00:00 2001 From: Hongbin Ma Date: Thu, 30 Nov 2023 12:18:27 +0800 Subject: [PATCH] fix bug --- cpp/src/parquet/arrow/reader.cc | 17 ++++----- cpp/src/parquet/range_reader_test.cc | 57 +++++++++++++++++++++++----- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 10c731a6a8b9e..1606c60d64e38 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -476,22 +476,18 @@ struct RowRangesPageFilter { bool operator()(const DataPageStats& stats) { ++page_range_idx; - if (row_range_idx >= row_ranges->getRanges().size()) { - return true; - } - Range current_page_range = (*page_ranges)[page_range_idx]; - if (current_page_range.isBefore((*row_ranges)[row_range_idx])) { - return true; - } - while (row_range_idx < row_ranges->getRanges().size() && current_page_range.isAfter((*row_ranges)[row_range_idx])) { row_range_idx++; } - return row_range_idx >= row_ranges->getRanges().size(); + if (row_range_idx >= row_ranges->getRanges().size()) { + return true; + } + + return current_page_range.isBefore((*row_ranges)[row_range_idx]); } size_t row_range_idx = 0; @@ -642,7 +638,8 @@ class LeafReader : public ColumnReaderImpl { return; } } - // Else iff row_ranges_map exists but no row_ranges is found for this RG key, this RG will be read + // Else iff row_ranges_map exists but no row_ranges is found for this RG key, this + // RG will be read } record_reader_->reset_current_rg_processed_records(); diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc index 7a7c7e001bb71..5bccaaa0c0f60 100644 --- a/cpp/src/parquet/range_reader_test.cc +++ b/cpp/src/parquet/range_reader_test.cc @@ -30,6 +30,22 @@ #include #include +#include +#include + +std::string random_string(std::string::size_type length) { + static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + static std::mt19937 rg{std::random_device{}()}; + static std::uniform_int_distribution pick(0, sizeof(chrs) - 2); + + std::string s; + s.reserve(length); + while (length--) s += chrs[pick(rg)]; + + return s; +} + /// The table looks like (with_nulls = false): // { // { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, @@ -97,7 +113,8 @@ arrow::Result> GetTable(bool with_nulls = false) { std::vector strs; uint8_t valid_bytes[100]; for (size_t i = 0; i < 100; i++) { - strs.push_back(std::to_string(i)); + // add more chars to make this column unaligned with other columns' page + strs.push_back(std::to_string(i) + random_string(20)); valid_bytes[i] = flags[i]; } ARROW_RETURN_NOT_OK(string_builder.AppendValues(strs, &valid_bytes[0])); @@ -130,8 +147,9 @@ arrow::Result> WriteFullFile( WriterProperties::Builder() .max_row_group_length(30) ->enable_write_page_index() - ->write_batch_size(13) - ->data_pagesize(1) // this will cause every batch creating a page + ->disable_dictionary() + ->write_batch_size(1) + ->data_pagesize(30) // small pages ->compression(arrow::Compression::SNAPPY) ->build(); @@ -203,7 +221,9 @@ void check_rb(std::shared_ptr rb_reader, auto c_array = std::dynamic_pointer_cast(batch->GetColumnByName("c")); for (auto iter = c_array->begin(); iter != c_array->end(); ++iter) { - sum_c += std::stoi(std::string((*iter).has_value() ? (*iter).value() : "0")); + sum_c += std::stoi(std::string( + (*iter).has_value() ? (*iter).value().substr(0, (*iter).value().size() - 20) + : "0")); } } @@ -252,7 +272,7 @@ class TestRecordBatchReaderWithRanges : public testing::Test { std::unique_ptr arrow_reader; }; -TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForEachRG) { +TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { std::shared_ptr rb_reader; const auto row_ranges_map = std::make_shared>(); row_ranges_map->insert({0, std::make_shared(parquet::Range{0, 9})}); @@ -270,6 +290,24 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForEachRG) { check_rb(rb_reader, 40, 2280); } +TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { + std::shared_ptr rb_reader; + const auto row_ranges_map = std::make_shared>(); + row_ranges_map->insert( + {0, std::make_shared( + std::vector{parquet::Range{0, 7}, parquet::Range{16, 23}})}); + row_ranges_map->insert({1, nullptr}); + row_ranges_map->insert({2, nullptr}); + row_ranges_map->insert({3, nullptr}); + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, + row_ranges_map, &rb_reader)); + + // (0+...+7) + (16+...+23) = 184 + check_rb(rb_reader, 16, 184); +} + TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { std::shared_ptr rb_reader; const auto row_ranges_map = std::make_shared>(); @@ -299,7 +337,8 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { row_ranges_map->insert( {2, std::make_shared( std::vector())}); // value is empty -> will skip - row_ranges_map->insert({3, std::make_shared()}); // value is empty -> will skip + row_ranges_map->insert( + {3, std::make_shared()}); // value is empty -> will skip const std::vector column_indices{0, 1, 2, 3, 4}; const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, @@ -384,9 +423,9 @@ TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices, row_ranges_map, &rb_reader); ASSERT_NOT_OK(status); - EXPECT_TRUE(status.message().find( - "The provided row range [(0, 30)] exceeds last page :[26, 29]") != - std::string::npos); + EXPECT_TRUE( + status.message().find("The provided row range [(0, 30)] exceeds last page :") != + std::string::npos); } }