Skip to content

Commit

Permalink
separete definition 3
Browse files Browse the repository at this point in the history
  • Loading branch information
binmahone committed Jan 15, 2024
1 parent 8f5a88a commit 09286d7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 66 deletions.
1 change: 0 additions & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,6 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto
ctx->pool = pool_;
ctx->iterator_factory = iterator_factory;
ctx->filter_leaves = false;

std::unique_ptr<ColumnReaderImpl> result;
RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
*out = std::move(result);
Expand Down
67 changes: 67 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2447,5 +2447,72 @@ std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
return nullptr;
}

RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges) {
// copy row_ranges
IntervalRanges skip_pages;
for (auto& page : pages.GetRanges()) {
if (!orig_row_ranges.IsOverlapping(page)) {
skip_pages.Add(page);
}
}

AdjustRanges(skip_pages, orig_row_ranges, row_ranges_);
range_iter_ = row_ranges_->NewIterator();
current_range_variant = range_iter_->NextRange();

total_rows_to_process_ = pages.RowCount() - skip_pages.RowCount();
}


int64_t RecordSkipper::AdviseNext(const int64_t current_rg_processed) {
if (current_range_variant.index() == 2) {
return 0;
}

auto& current_range = std::get<IntervalRange>(current_range_variant);

if (current_range.end < current_rg_processed) {
current_range_variant = range_iter_->NextRange();
if (current_range_variant.index() == 2) {
// negative, skip the ramaining rows
return current_rg_processed - total_rows_to_process_;
}
}

current_range = std::get<IntervalRange>(current_range_variant);

if (current_range.start > current_rg_processed) {
// negative, skip
return current_rg_processed - current_range.start;
}

const auto ret = current_range.end - current_rg_processed + 1;
return ret;
}

void RecordSkipper::AdjustRanges(IntervalRanges& skip_pages,
const RowRanges& orig_row_ranges,
std::unique_ptr<RowRanges>& ret) {
std::unique_ptr<IntervalRanges> temp = std::make_unique<IntervalRanges>();

size_t skipped_rows = 0;
const auto orig_range_iter = orig_row_ranges.NewIterator();
auto orig_range_variant = orig_range_iter->NextRange();
auto skip_iter = skip_pages.GetRanges().begin();
while (orig_range_variant.index() != 2) {
const auto& origin_range = std::get<IntervalRange>(orig_range_variant);
while (skip_iter != skip_pages.GetRanges().end() &&
skip_iter->IsBefore(origin_range)) {
skipped_rows += skip_iter->Count();
++skip_iter;
}

temp->Add(IntervalRange(origin_range.start - skipped_rows,
origin_range.end - skipped_rows));
orig_range_variant = orig_range_iter->NextRange();
}
ret = std::move(temp);
}

} // namespace internal
} // namespace parquet
74 changes: 9 additions & 65 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,75 +426,19 @@ namespace internal {
// A RecordSkipper is used to skip uncessary rows within each pages.
class PARQUET_EXPORT RecordSkipper {
public:
RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges) {
// copy row_ranges
IntervalRanges skip_pages;
for (auto& page : pages.GetRanges()) {
if (!orig_row_ranges.IsOverlapping(page)) {
skip_pages.Add(page);
}
}

/// Since the skipped pages will be silently skipped without updating
/// current_rg_processed_records or records_read_, we need to pre-process the row
/// ranges as if these skipped pages never existed
AdjustRanges(skip_pages, orig_row_ranges, row_ranges_);
range_iter_ = row_ranges_->NewIterator();
current_range_variant = range_iter_->NextRange();

total_rows_to_process_ = pages.RowCount() - skip_pages.RowCount();
}

/// \brief Return the number of records to read or to skip
RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ranges);
/// Return the number of records to read or to skip
/// if return values is positive, it means to read N records
/// if return values is negative, it means to skip N records
/// if return values is 0, it means end of RG
int64_t AdviseNext(const int64_t current_rg_processed) {
if (current_range_variant.index() == 2) {
return 0;
}

auto & current_range = std::get<IntervalRange>(current_range_variant);

if (current_range.end < current_rg_processed) {
current_range_variant = range_iter_->NextRange();
if (current_range_variant.index() == 2) {
// negative, skip the ramaining rows
return current_rg_processed - total_rows_to_process_;
}
}
int64_t AdviseNext(const int64_t current_rg_processed);

current_range = std::get<IntervalRange>(current_range_variant);

if (current_range.start > current_rg_processed) {
// negative, skip
return current_rg_processed - current_range.start;
}

const auto ret = current_range.end - current_rg_processed + 1;
return ret;
}

private:
void AdjustRanges(IntervalRanges& skip_pages, const RowRanges& orig_row_ranges, std::unique_ptr<RowRanges>& ret) {
std::unique_ptr<IntervalRanges> temp = std::make_unique<IntervalRanges>();

size_t skipped_rows = 0;
const auto orig_range_iter = orig_row_ranges.NewIterator();
auto orig_range_variant = orig_range_iter->NextRange();
auto skip_iter = skip_pages.GetRanges().begin();
while (orig_range_variant.index() != 2) {
const auto & origin_range = std::get<IntervalRange>(orig_range_variant);
while (skip_iter != skip_pages.GetRanges().end() && skip_iter->IsBefore(origin_range)) {
skipped_rows += skip_iter->Count();
++skip_iter;
}

temp->Add(IntervalRange(origin_range.start - skipped_rows, origin_range.end - skipped_rows));
orig_range_variant = orig_range_iter->NextRange();
}
ret = std::move(temp);
}
private:
/// Since the skipped pages will be silently skipped without updating
/// current_rg_processed_records or records_read_, we need to pre-process the row
/// ranges as if these skipped pages never existed
static void AdjustRanges(IntervalRanges& skip_pages, const RowRanges& orig_row_ranges,
std::unique_ptr<RowRanges>& ret);

std::unique_ptr<RowRanges> row_ranges_;
std::unique_ptr<RowRanges::Iterator> range_iter_;
Expand Down

0 comments on commit 09286d7

Please sign in to comment.