Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
binmahone committed Nov 30, 2023
1 parent 53ea5e5 commit dca69af
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 19 deletions.
17 changes: 7 additions & 10 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,22 +476,18 @@ struct RowRangesPageFilter {
bool operator()(const DataPageStats& stats) {
++page_range_idx;

if (row_range_idx >= row_ranges->getRanges().size()) {
return true;
}

Range current_page_range = (*page_ranges)[page_range_idx];

if (current_page_range.isBefore((*row_ranges)[row_range_idx])) {
return true;
}

while (row_range_idx < row_ranges->getRanges().size() &&
current_page_range.isAfter((*row_ranges)[row_range_idx])) {
row_range_idx++;
}

return row_range_idx >= row_ranges->getRanges().size();
if (row_range_idx >= row_ranges->getRanges().size()) {
return true;
}

return current_page_range.isBefore((*row_ranges)[row_range_idx]);
}

size_t row_range_idx = 0;
Expand Down Expand Up @@ -642,7 +638,8 @@ class LeafReader : public ColumnReaderImpl {
return;
}
}
// Else iff row_ranges_map exists but no row_ranges is found for this RG key, this RG will be read
// Else iff row_ranges_map exists but no row_ranges is found for this RG key, this
// RG will be read
}

record_reader_->reset_current_rg_processed_records();
Expand Down
57 changes: 48 additions & 9 deletions cpp/src/parquet/range_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@
#include <gtest/gtest.h>
#include <iostream>

#include <random>
#include <string>

std::string random_string(std::string::size_type length) {
static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";

static std::mt19937 rg{std::random_device{}()};
static std::uniform_int_distribution<std::string::size_type> pick(0, sizeof(chrs) - 2);

std::string s;
s.reserve(length);
while (length--) s += chrs[pick(rg)];

return s;
}

/// The table looks like (with_nulls = false):
// {
// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0},
Expand Down Expand Up @@ -97,7 +113,8 @@ arrow::Result<std::shared_ptr<arrow::Table>> GetTable(bool with_nulls = false) {
std::vector<std::string> strs;
uint8_t valid_bytes[100];
for (size_t i = 0; i < 100; i++) {
strs.push_back(std::to_string(i));
// add more chars to make this column unaligned with other columns' page
strs.push_back(std::to_string(i) + random_string(20));
valid_bytes[i] = flags[i];
}
ARROW_RETURN_NOT_OK(string_builder.AppendValues(strs, &valid_bytes[0]));
Expand Down Expand Up @@ -130,8 +147,9 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> WriteFullFile(
WriterProperties::Builder()
.max_row_group_length(30)
->enable_write_page_index()
->write_batch_size(13)
->data_pagesize(1) // this will cause every batch creating a page
->disable_dictionary()
->write_batch_size(1)
->data_pagesize(30) // small pages
->compression(arrow::Compression::SNAPPY)
->build();

Expand Down Expand Up @@ -203,7 +221,9 @@ void check_rb(std::shared_ptr<arrow::RecordBatchReader> rb_reader,
auto c_array =
std::dynamic_pointer_cast<arrow::StringArray>(batch->GetColumnByName("c"));
for (auto iter = c_array->begin(); iter != c_array->end(); ++iter) {
sum_c += std::stoi(std::string((*iter).has_value() ? (*iter).value() : "0"));
sum_c += std::stoi(std::string(
(*iter).has_value() ? (*iter).value().substr(0, (*iter).value().size() - 20)
: "0"));
}
}

Expand Down Expand Up @@ -252,7 +272,7 @@ class TestRecordBatchReaderWithRanges : public testing::Test {
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
};

TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForEachRG) {
TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) {
std::shared_ptr<arrow::RecordBatchReader> rb_reader;
const auto row_ranges_map = std::make_shared<std::map<int, parquet::RowRangesPtr>>();
row_ranges_map->insert({0, std::make_shared<parquet::RowRanges>(parquet::Range{0, 9})});
Expand All @@ -270,6 +290,24 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForEachRG) {
check_rb(rb_reader, 40, 2280);
}

TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) {
std::shared_ptr<arrow::RecordBatchReader> rb_reader;
const auto row_ranges_map = std::make_shared<std::map<int, parquet::RowRangesPtr>>();
row_ranges_map->insert(
{0, std::make_shared<parquet::RowRanges>(
std::vector<parquet::Range>{parquet::Range{0, 7}, parquet::Range{16, 23}})});
row_ranges_map->insert({1, nullptr});
row_ranges_map->insert({2, nullptr});
row_ranges_map->insert({3, nullptr});

const std::vector column_indices{0, 1, 2, 3, 4};
ASSERT_OK(arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices,
row_ranges_map, &rb_reader));

// (0+...+7) + (16+...+23) = 184
check_rb(rb_reader, 16, 184);
}

TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) {
std::shared_ptr<arrow::RecordBatchReader> rb_reader;
const auto row_ranges_map = std::make_shared<std::map<int, parquet::RowRangesPtr>>();
Expand Down Expand Up @@ -299,7 +337,8 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) {
row_ranges_map->insert(
{2, std::make_shared<parquet::RowRanges>(
std::vector<parquet::Range>())}); // value is empty -> will skip
row_ranges_map->insert({3, std::make_shared<parquet::RowRanges>()}); // value is empty -> will skip
row_ranges_map->insert(
{3, std::make_shared<parquet::RowRanges>()}); // value is empty -> will skip

const std::vector column_indices{0, 1, 2, 3, 4};
const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices,
Expand Down Expand Up @@ -384,9 +423,9 @@ TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) {
const auto status = arrow_reader->GetRecordBatchReader({0, 1, 2, 3}, column_indices,
row_ranges_map, &rb_reader);
ASSERT_NOT_OK(status);
EXPECT_TRUE(status.message().find(
"The provided row range [(0, 30)] exceeds last page :[26, 29]") !=
std::string::npos);
EXPECT_TRUE(
status.message().find("The provided row range [(0, 30)] exceeds last page :") !=
std::string::npos);
}
}

Expand Down

0 comments on commit dca69af

Please sign in to comment.