Skip to content

Commit

Permalink
support rowrange interface
Browse files Browse the repository at this point in the history
  • Loading branch information
binmahone committed Nov 29, 2023
1 parent ba5c679 commit bfee4b7
Show file tree
Hide file tree
Showing 7 changed files with 976 additions and 24 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ add_parquet_test(reader-test
level_conversion_test.cc
column_scanner_test.cc
reader_test.cc
range_reader_test.cc
stream_reader_test.cc
test_util.cc)

Expand Down
185 changes: 166 additions & 19 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "parquet/arrow/reader.h"

#include <parquet/page_index.h>

#include <algorithm>
#include <cstring>
#include <memory>
Expand Down Expand Up @@ -72,6 +74,8 @@ using arrow::internal::Iota;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;

using parquet::Range;
using parquet::RowRangesPtr;
using parquet::internal::RecordReader;

namespace bit_util = arrow::bit_util;
Expand Down Expand Up @@ -200,10 +204,11 @@ class FileReaderImpl : public FileReader {
return ReadRowGroups(Iota(reader_->metadata()->num_row_groups()), indices, out);
}

Status GetFieldReader(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
std::unique_ptr<ColumnReaderImpl>* out) {
Status GetFieldReader(
int i, const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
std::unique_ptr<ColumnReaderImpl>* out) {
// Should be covered by GetRecordBatchReader checks but
// manifest_.schema_fields is a separate variable so be extra careful.
if (ARROW_PREDICT_FALSE(i < 0 ||
Expand All @@ -219,13 +224,15 @@ class FileReaderImpl : public FileReader {
ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
ctx->filter_leaves = true;
ctx->included_leaves = included_leaves;
ctx->row_ranges_map = row_ranges_map;
return GetReader(manifest_.schema_fields[i], ctx, out);
}

Status GetFieldReaders(const std::vector<int>& column_indices,
const std::vector<int>& row_groups,
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
Status GetFieldReaders(
const std::vector<int>& column_indices, const std::vector<int>& row_groups,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
// We only need to read schema fields which have columns indicated
// in the indices vector
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
Expand All @@ -237,8 +244,8 @@ class FileReaderImpl : public FileReader {
::arrow::FieldVector out_fields(field_indices.size());
for (size_t i = 0; i < out->size(); ++i) {
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(
GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));
RETURN_NOT_OK(GetFieldReader(field_indices[i], included_leaves, row_groups,
row_ranges_map, &reader));

out_fields[i] = reader->field();
out->at(i) = std::move(reader);
Expand All @@ -265,7 +272,7 @@ class FileReaderImpl : public FileReader {
std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups());

std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));
RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, NULLPTR, &reader));

return ReadColumn(i, row_groups, reader.get(), out);
}
Expand Down Expand Up @@ -336,19 +343,26 @@ class FileReaderImpl : public FileReader {
return ReadRowGroup(i, Iota(reader_->metadata()->num_columns()), table);
}

Status GetRecordBatchReader(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
std::unique_ptr<RecordBatchReader>* out) override;

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) override;
std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(row_group_indices, column_indices, NULLPTR, out);
}

Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(row_group_indices,
Iota(reader_->metadata()->num_columns()), out);
Iota(reader_->metadata()->num_columns()), NULLPTR, out);
}

Status GetRecordBatchReader(std::unique_ptr<RecordBatchReader>* out) override {
return GetRecordBatchReader(Iota(num_row_groups()),
Iota(reader_->metadata()->num_columns()), out);
Iota(reader_->metadata()->num_columns()), NULLPTR, out);
}

::arrow::Result<::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>>
Expand Down Expand Up @@ -451,6 +465,42 @@ class RowGroupReaderImpl : public RowGroupReader {
// ----------------------------------------------------------------------
// Column reader implementations

struct RowRangesPageFilter {
explicit RowRangesPageFilter(const RowRangesPtr& row_ranges_,
const RowRangesPtr& page_ranges_)
: row_ranges(row_ranges_), page_ranges(page_ranges_) {
assert(page_ranges != nullptr);
assert(page_ranges->getRanges().size() > 0);
}

bool operator()(const DataPageStats& stats) {
++page_range_idx;

if (row_range_idx >= row_ranges->getRanges().size()) {
return true;
}

Range current_page_range = (*page_ranges)[page_range_idx];

if (current_page_range.isBefore((*row_ranges)[row_range_idx])) {
return true;
}

while (row_range_idx < row_ranges->getRanges().size() &&
current_page_range.isAfter((*row_ranges)[row_range_idx])) {
row_range_idx++;
}

return row_range_idx >= row_ranges->getRanges().size();
}

size_t row_range_idx = 0;
const RowRangesPtr row_ranges;

int page_range_idx = -1;
const RowRangesPtr page_ranges;
};

// Leaf reader is for primitive arrays and primitive children of nested arrays
class LeafReader : public ColumnReaderImpl {
public:
Expand Down Expand Up @@ -512,8 +562,90 @@ class LeafReader : public ColumnReaderImpl {

private:
std::shared_ptr<ChunkedArray> out_;

void checkAndGetPageRanges(const std::shared_ptr<RowRanges>& row_ranges,
std::shared_ptr<RowRanges>& page_ranges) {
// check offset exists
const auto rg_pg_index_reader =
ctx_->reader->GetPageIndexReader()->RowGroup(input_->current_row_group());

if (!rg_pg_index_reader) {
throw ParquetException(
"Attempting to read with Ranges but Page Index is not found for Row "
"Group: " +
std::to_string(input_->current_row_group()));
}
const auto offset_index = rg_pg_index_reader->GetOffsetIndex(input_->column_index());

if (!offset_index) {
throw ParquetException(
"Attempting to read with Ranges but Offset index is not found for "
"column: " +
field_->name());
}

if (!row_ranges->isValid()) {
throw ParquetException(
"The provided row range is invalid, keep it monotone and non-interleaving: " +
row_ranges->toString());
}

const auto page_locations = offset_index->page_locations();
page_ranges = std::make_shared<RowRanges>();
for (size_t i = 0; i < page_locations.size() - 1; i++) {
page_ranges->add(
{page_locations[i].first_row_index, page_locations[i + 1].first_row_index - 1},
false);
}
if (page_locations.size() >= 1) {
page_ranges->add(
{page_locations[page_locations.size() - 1].first_row_index,
ctx_->reader->metadata()->RowGroup(input_->current_row_group())->num_rows() -
1},
false);
}

if (row_ranges->getRanges().size() > 0) {
if ((*row_ranges).getRanges().back().to > page_ranges->getRanges().back().to) {
throw ParquetException(
"The provided row range " + row_ranges->toString() +
" exceeds last page :" + page_ranges->getRanges().back().toString());
}
}
}

void NextRowGroup() {
std::unique_ptr<PageReader> page_reader = input_->NextChunk();

/// using page index to reduce cost
if (page_reader != nullptr && ctx_->row_ranges_map) {
// reset skipper
record_reader_->set_record_skipper(NULLPTR);

// if specific row range is provided for this rg
if (const auto iter = ctx_->row_ranges_map->find(input_->current_row_group());
iter != ctx_->row_ranges_map->end()) {
if (iter->second != nullptr && iter->second->rowCount() != 0) {
std::shared_ptr<RowRanges> page_ranges;
checkAndGetPageRanges(iter->second, page_ranges);

// part 1, skip decompressing & decoding unnecessary pages
page_reader->set_data_page_filter(
RowRangesPageFilter(iter->second, page_ranges));

// part 2, skip unnecessary rows in necessary pages
record_reader_->set_record_skipper(
std::make_shared<parquet::internal::RecordSkipper>(*page_ranges,
*iter->second));
} else {
NextRowGroup();
return;
}
}
// Else iff row_ranges_map exists but no row_ranges is found for this RG key, this RG will be read
}

record_reader_->reset_current_rg_processed_records();
record_reader_->SetPageReader(std::move(page_reader));
}

Expand Down Expand Up @@ -982,9 +1114,10 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>&

} // namespace

Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
Status FileReaderImpl::GetRecordBatchReader(
const std::vector<int>& row_groups, const std::vector<int>& column_indices,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
std::unique_ptr<RecordBatchReader>* out) {
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

if (reader_properties_.pre_buffer()) {
Expand All @@ -997,7 +1130,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,

std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> batch_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, row_ranges_map, &readers,
&batch_schema));

if (readers.empty()) {
// Just generate all batches right now; they're cheap since they have no columns.
Expand Down Expand Up @@ -1218,6 +1352,7 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto
ctx->pool = pool_;
ctx->iterator_factory = iterator_factory;
ctx->filter_leaves = false;

std::unique_ptr<ColumnReaderImpl> result;
RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
*out = std::move(result);
Expand Down Expand Up @@ -1251,7 +1386,8 @@ Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
// in a sync context too so use `this` over `self`
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
RETURN_NOT_OK(
GetFieldReaders(column_indices, row_groups, NULLPTR, &readers, &result_schema));
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();

Expand Down Expand Up @@ -1314,6 +1450,17 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice
return Status::OK();
}

Status FileReader::GetRecordBatchReader(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<RecordBatchReader> tmp;
RETURN_NOT_OK(
GetRecordBatchReader(row_group_indices, column_indices, row_ranges_map, &tmp));
out->reset(tmp.release());
return Status::OK();
}

Status FileReader::Make(::arrow::MemoryPool* pool,
std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties,
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <vector>

#include "parquet/column_reader.h"
#include "parquet/file_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
Expand Down Expand Up @@ -187,6 +188,11 @@ class PARQUET_EXPORT FileReader {
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;

virtual ::arrow::Status GetRecordBatchReader(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;

/// \brief Return a RecordBatchReader of row groups selected from
/// row_group_indices, whose columns are selected by column_indices.
///
Expand All @@ -199,6 +205,10 @@ class PARQUET_EXPORT FileReader {
///
/// \returns error Status if either row_group_indices or column_indices
/// contains an invalid index
::arrow::Status GetRecordBatchReader(
const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
const std::shared_ptr<std::map<int, RowRangesPtr>>& row_ranges_map,
std::shared_ptr<::arrow::RecordBatchReader>* out);
::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::shared_ptr<::arrow::RecordBatchReader>* out);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/arrow/reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class FileColumnIterator {
}

auto row_group_reader = reader_->RowGroup(row_groups_.front());
current_rg = row_groups_.front();
row_groups_.pop_front();
return row_group_reader->GetColumnPageReader(column_index_);
}
Expand All @@ -88,11 +89,14 @@ class FileColumnIterator {

int column_index() const { return column_index_; }

int current_row_group() const { return current_rg; }

protected:
int column_index_;
ParquetFileReader* reader_;
const SchemaDescriptor* schema_;
std::deque<int> row_groups_;
int current_rg = 0;
};

using FileColumnIteratorFactory =
Expand All @@ -109,6 +113,7 @@ struct ReaderContext {
FileColumnIteratorFactory iterator_factory;
bool filter_leaves;
std::shared_ptr<std::unordered_set<int>> included_leaves;
std::shared_ptr<std::map<int, RowRangesPtr>> row_ranges_map;

bool IncludesLeaf(int leaf_index) const {
if (this->filter_leaves) {
Expand Down
Loading

0 comments on commit bfee4b7

Please sign in to comment.