Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
binmahone committed Jan 22, 2024
1 parent b75abdf commit e361c66
Show file tree
Hide file tree
Showing 9 changed files with 461 additions and 360 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ set(PARQUET_SRCS
arrow/writer.cc
bloom_filter.cc
bloom_filter_reader.cc
row_range.cc
column_reader.cc
column_scanner.cc
column_writer.cc
Expand Down
105 changes: 52 additions & 53 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,14 @@ class FileReaderImpl : public FileReader {
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
ctx->filter_leaves = true;
ctx->included_leaves = included_leaves;
ctx->row_ranges_per_rg = row_ranges_per_rg; // copy the shared pointer to extend its lifecycle
ctx->row_ranges_per_rg =
row_ranges_per_rg; // copy the shared pointer to extend its lifecycle
return GetReader(manifest_.schema_fields[i], ctx, out);
}

Status GetFieldReaders(
const std::vector<int>& column_indices, const std::vector<int>& row_groups,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>> & row_ranges_per_rg,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg,
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
// We only need to read schema fields which have columns indicated
Expand Down Expand Up @@ -342,44 +343,43 @@ class FileReaderImpl : public FileReader {
}

// This is a internal API owned by FileReaderImpl, not exposed in FileReader
Status GetRecordBatchReaderWithRowRanges(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>> & row_ranges_per_rg,
std::unique_ptr<RecordBatchReader>* out);
Status GetRecordBatchReaderWithRowRanges(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg,
std::unique_ptr<RecordBatchReader>* out);

Status GetRecordBatchReader(const RowRanges& rows_to_return,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) override {
const auto metadata = reader_->metadata();
// check if the row ranges are valid
if (!rows_to_return.IsValid()) {
return Status::Invalid("The provided row range is invalid, keep it monotone and non-interleaving: " +
rows_to_return.ToString());
}
// check if the row ranges are within the row group boundaries
if (rows_to_return.RowCount() != 0 && rows_to_return.LastRow() >= metadata->num_rows()) {
if (rows_to_return.num_rows() != 0 &&
rows_to_return.last_row() >= metadata->num_rows()) {
return Status::Invalid("The provided row range " + rows_to_return.ToString() +
" exceeds the number of rows in the file: " +
std::to_string(metadata->num_rows()));
}
if (rows_to_return.RowCount() == 0) {
if (rows_to_return.num_rows() == 0) {
return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out);
}

std::vector<int64_t> rows_per_rg;
for (int i = 0 ; i < metadata->num_row_groups(); i++) {
rows_per_rg.push_back( metadata->RowGroup(i)->num_rows());
for (int i = 0; i < metadata->num_row_groups(); i++) {
rows_per_rg.push_back(metadata->RowGroup(i)->num_rows());
}
// We'll assign a RowRanges for each RG, even if it's not required to return any rows
std::vector<std::unique_ptr<RowRanges>> row_ranges_per_rg = rows_to_return.SplitByRowGroups(rows_per_rg);
std::vector<std::unique_ptr<RowRanges>> row_ranges_per_rg =
rows_to_return.SplitByRowRange(rows_per_rg);
std::vector<int> row_group_indices;
for (int i = 0 ; i < metadata->num_row_groups(); i++) {
if (row_ranges_per_rg.at(i)->RowCount() > 0)
row_group_indices.push_back(i);
for (int i = 0; i < metadata->num_row_groups(); i++) {
if (row_ranges_per_rg.at(i)->num_rows() > 0) row_group_indices.push_back(i);
}

return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices,
std::make_shared<std::vector<std::unique_ptr<RowRanges>>>(std::move(row_ranges_per_rg)), out);
return GetRecordBatchReaderWithRowRanges(
row_group_indices, column_indices,
std::make_shared<std::vector<std::unique_ptr<RowRanges>>>(
std::move(row_ranges_per_rg)),
out);
}

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
Expand All @@ -390,13 +390,13 @@ class FileReaderImpl : public FileReader {

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReaderWithRowRanges(row_group_indices,
Iota(reader_->metadata()->num_columns()), {}, out);
return GetRecordBatchReaderWithRowRanges(
row_group_indices, Iota(reader_->metadata()->num_columns()), {}, out);
}

Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReaderWithRowRanges(Iota(num_row_groups()),
Iota(reader_->metadata()->num_columns()), {}, out);
return GetRecordBatchReaderWithRowRanges(
Iota(num_row_groups()), Iota(reader_->metadata()->num_columns()), {}, out);
}

::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
Expand Down Expand Up @@ -499,51 +499,52 @@ class RowGroupReaderImpl : public RowGroupReader {
// ----------------------------------------------------------------------
// Column reader implementations

// This class is used to skip decompressing & decoding unnecessary pages by comparing user-specified row_ranges
// and page_ranges from metadata.
// Only support IntervalRange case for now.
// This class is used to skip decompressing & decoding unnecessary pages by comparing
// user-specified row_ranges and page_ranges from metadata. Only support IntervalRange
// case for now.
class RowRangesPageFilter {
public:
RowRangesPageFilter(const RowRanges& row_ranges, const std::shared_ptr<RowRanges>& page_ranges)
: row_ranges_(row_ranges), page_ranges_(page_ranges) {
}
RowRangesPageFilter(const RowRanges& row_ranges,
const std::shared_ptr<RowRanges>& page_ranges)
: row_ranges_(row_ranges), page_ranges_(page_ranges) {}

// To avoid error "std::function target must be copy-constructible", we must define copy constructor
// To avoid error "std::function target must be copy-constructible", we must define copy
// constructor
RowRangesPageFilter(const RowRangesPageFilter& other)
: row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) {
}
: row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) {}

bool operator()(const DataPageStats& stats) {

if (!initted) {
row_ranges_itr_ = row_ranges_.NewIterator();
page_ranges_itr_ = page_ranges_->NewIterator();

current_row_range_ = row_ranges_itr_->NextRange();

if (current_row_range_.index() != 0) {
throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange");
throw ParquetException(
"RowRangesPageFilter expects first NextRange() to be a IntervalRange");
}
initted = true;
}

current_page_range_ = page_ranges_itr_->NextRange();
if (current_page_range_.index() != 0) {
throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange");
throw ParquetException(
"RowRangesPageFilter expects first NextRange() to be a IntervalRange");
}

while (current_row_range_.index() == 0 &&
std::get<IntervalRange>(current_page_range_).IsAfter(
std::get<IntervalRange>(current_row_range_))) {
IntervalRangeUtils::IsAfter(std::get<IntervalRange>(current_page_range_),
std::get<IntervalRange>(current_row_range_))) {
current_row_range_ = row_ranges_itr_->NextRange();
}

if (current_row_range_.index() != 0) {
return true;
}

return std::get<IntervalRange>(current_page_range_).IsBefore(
std::get<IntervalRange>(current_row_range_));
return IntervalRangeUtils::IsBefore(std::get<IntervalRange>(current_page_range_),
std::get<IntervalRange>(current_row_range_));
}

private:
Expand Down Expand Up @@ -652,11 +653,11 @@ class LeafReader : public ColumnReaderImpl {
1});
}

if (row_ranges.RowCount() > 0) {
if (row_ranges.LastRow() > page_ranges->LastRow()) {
if (row_ranges.num_rows() > 0) {
if (row_ranges.last_row() > page_ranges->last_row()) {
throw ParquetException(
"The provided row range " + row_ranges.ToString() +
" exceeds last page :" + page_ranges->GetRanges().back().ToString());
"The provided row range " + row_ranges.ToString() + " exceeds last page :" +
IntervalRangeUtils::ToString(page_ranges->GetRanges().back()));
}
}
}
Expand All @@ -667,23 +668,21 @@ class LeafReader : public ColumnReaderImpl {
/// using page index to reduce cost
if (page_reader != nullptr && ctx_->row_ranges_per_rg) {
// reset skipper
record_reader_->set_record_skipper(NULLPTR);
record_reader_->reset_record_skipper();

const auto & row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()];
const auto& row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()];
// if specific row range is provided for this rg
if (row_ranges->RowCount() != 0) {

if (row_ranges->num_rows() != 0) {
// Use IntervalRanges to represent pages
std::shared_ptr<IntervalRanges> page_ranges;
checkAndGetPageRanges(*row_ranges, page_ranges);

// part 1, skip decompressing & decoding unnecessary pages
page_reader->set_data_page_filter(
RowRangesPageFilter(*row_ranges, page_ranges));
page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges));

// part 2, skip unnecessary rows in necessary pages
record_reader_->set_record_skipper(
std::make_shared<parquet::internal::RecordSkipper>(*page_ranges,
std::make_unique<parquet::internal::RecordSkipper>(*page_ranges,
*row_ranges));
} else {
NextRowGroup();
Expand Down Expand Up @@ -1162,7 +1161,7 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>&

Status FileReaderImpl::GetRecordBatchReaderWithRowRanges(
const std::vector<int>& row_groups, const std::vector<int>& column_indices,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>> & row_ranges_per_rg,
const std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>>& row_ranges_per_rg,
std::unique_ptr<RecordBatchReader>* out) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

Expand Down
9 changes: 5 additions & 4 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,15 @@ class PARQUET_EXPORT FileReader {
/// \brief Return a RecordBatchReader of row groups selected from
/// rows_to_return, whose columns are selected by column_indices.
///
/// Notice that rows_to_return is file based, it not only decides which row groups to read,
/// but also which rows to read in each row group.
/// Notice that rows_to_return is file based, it not only decides which row groups to
/// read, but also which rows to read in each row group.
///
///
/// \returns error Status if either rows_to_return or column_indices
/// contains an invalid index
virtual ::arrow::Status GetRecordBatchReader(const RowRanges& rows_to_return,
const std::vector<int>& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
virtual ::arrow::Status GetRecordBatchReader(
const RowRanges& rows_to_return, const std::vector<int>& column_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;

/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.
Expand Down
Loading

0 comments on commit e361c66

Please sign in to comment.