diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 9f9a7f2336aa3..7d12d87e5d9c5 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -156,6 +156,7 @@ set(PARQUET_SRCS arrow/writer.cc bloom_filter.cc bloom_filter_reader.cc + row_range.cc column_reader.cc column_scanner.cc column_writer.cc diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e471696a401d1..cb15145b8a783 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -222,13 +222,14 @@ class FileReaderImpl : public FileReader { ctx->iterator_factory = SomeRowGroupsFactory(row_groups); ctx->filter_leaves = true; ctx->included_leaves = included_leaves; - ctx->row_ranges_per_rg = row_ranges_per_rg; // copy the shared pointer to extend its lifecycle + ctx->row_ranges_per_rg = + row_ranges_per_rg; // copy the shared pointer to extend its lifecycle return GetReader(manifest_.schema_fields[i], ctx, out); } Status GetFieldReaders( const std::vector& column_indices, const std::vector& row_groups, - const std::shared_ptr>> & row_ranges_per_rg, + const std::shared_ptr>>& row_ranges_per_rg, std::vector>* out, std::shared_ptr<::arrow::Schema>* out_schema) { // We only need to read schema fields which have columns indicated @@ -342,44 +343,43 @@ class FileReaderImpl : public FileReader { } // This is a internal API owned by FileReaderImpl, not exposed in FileReader - Status GetRecordBatchReaderWithRowRanges(const std::vector& row_group_indices, - const std::vector& column_indices, - const std::shared_ptr>> & row_ranges_per_rg, - std::unique_ptr* out); + Status GetRecordBatchReaderWithRowRanges( + const std::vector& row_group_indices, const std::vector& column_indices, + const std::shared_ptr>>& row_ranges_per_rg, + std::unique_ptr* out); Status GetRecordBatchReader(const RowRanges& rows_to_return, const std::vector& column_indices, std::unique_ptr* out) override { const auto metadata = reader_->metadata(); - // check if the row ranges are valid - if (!rows_to_return.IsValid()) { - return Status::Invalid("The provided row range is invalid, keep it monotone and non-interleaving: " + - rows_to_return.ToString()); - } // check if the row ranges are within the row group boundaries - if (rows_to_return.RowCount() != 0 && rows_to_return.LastRow() >= metadata->num_rows()) { + if (rows_to_return.num_rows() != 0 && + rows_to_return.last_row() >= metadata->num_rows()) { return Status::Invalid("The provided row range " + rows_to_return.ToString() + " exceeds the number of rows in the file: " + std::to_string(metadata->num_rows())); } - if (rows_to_return.RowCount() == 0) { + if (rows_to_return.num_rows() == 0) { return GetRecordBatchReaderWithRowRanges({}, column_indices, {}, out); } std::vector rows_per_rg; - for (int i = 0 ; i < metadata->num_row_groups(); i++) { - rows_per_rg.push_back( metadata->RowGroup(i)->num_rows()); + for (int i = 0; i < metadata->num_row_groups(); i++) { + rows_per_rg.push_back(metadata->RowGroup(i)->num_rows()); } // We'll assign a RowRanges for each RG, even if it's not required to return any rows - std::vector> row_ranges_per_rg = rows_to_return.SplitByRowGroups(rows_per_rg); + std::vector> row_ranges_per_rg = + rows_to_return.SplitByRowRange(rows_per_rg); std::vector row_group_indices; - for (int i = 0 ; i < metadata->num_row_groups(); i++) { - if (row_ranges_per_rg.at(i)->RowCount() > 0) - row_group_indices.push_back(i); + for (int i = 0; i < metadata->num_row_groups(); i++) { + if (row_ranges_per_rg.at(i)->num_rows() > 0) row_group_indices.push_back(i); } - return GetRecordBatchReaderWithRowRanges(row_group_indices, column_indices, - std::make_shared>>(std::move(row_ranges_per_rg)), out); + return GetRecordBatchReaderWithRowRanges( + row_group_indices, column_indices, + std::make_shared>>( + std::move(row_ranges_per_rg)), + out); } Status GetRecordBatchReader(const std::vector& row_group_indices, @@ -390,13 +390,13 @@ class FileReaderImpl : public FileReader { Status GetRecordBatchReader(const std::vector& row_group_indices, std::unique_ptr* out) override { - return GetRecordBatchReaderWithRowRanges(row_group_indices, - Iota(reader_->metadata()->num_columns()), {}, out); + return GetRecordBatchReaderWithRowRanges( + row_group_indices, Iota(reader_->metadata()->num_columns()), {}, out); } Status GetRecordBatchReader(std::unique_ptr* out) override { - return GetRecordBatchReaderWithRowRanges(Iota(num_row_groups()), - Iota(reader_->metadata()->num_columns()), {}, out); + return GetRecordBatchReaderWithRowRanges( + Iota(num_row_groups()), Iota(reader_->metadata()->num_columns()), {}, out); } ::arrow::Result<::arrow::AsyncGenerator>> @@ -499,22 +499,21 @@ class RowGroupReaderImpl : public RowGroupReader { // ---------------------------------------------------------------------- // Column reader implementations -// This class is used to skip decompressing & decoding unnecessary pages by comparing user-specified row_ranges -// and page_ranges from metadata. -// Only support IntervalRange case for now. +// This class is used to skip decompressing & decoding unnecessary pages by comparing +// user-specified row_ranges and page_ranges from metadata. Only support IntervalRange +// case for now. class RowRangesPageFilter { public: - RowRangesPageFilter(const RowRanges& row_ranges, const std::shared_ptr& page_ranges) - : row_ranges_(row_ranges), page_ranges_(page_ranges) { - } + RowRangesPageFilter(const RowRanges& row_ranges, + const std::shared_ptr& page_ranges) + : row_ranges_(row_ranges), page_ranges_(page_ranges) {} - // To avoid error "std::function target must be copy-constructible", we must define copy constructor + // To avoid error "std::function target must be copy-constructible", we must define copy + // constructor RowRangesPageFilter(const RowRangesPageFilter& other) - : row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) { - } + : row_ranges_(other.row_ranges_), page_ranges_(other.page_ranges_) {} bool operator()(const DataPageStats& stats) { - if (!initted) { row_ranges_itr_ = row_ranges_.NewIterator(); page_ranges_itr_ = page_ranges_->NewIterator(); @@ -522,19 +521,21 @@ class RowRangesPageFilter { current_row_range_ = row_ranges_itr_->NextRange(); if (current_row_range_.index() != 0) { - throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + throw ParquetException( + "RowRangesPageFilter expects first NextRange() to be a IntervalRange"); } initted = true; } current_page_range_ = page_ranges_itr_->NextRange(); if (current_page_range_.index() != 0) { - throw ParquetException("RowRangesPageFilter expects first NextRange() to be a IntervalRange"); + throw ParquetException( + "RowRangesPageFilter expects first NextRange() to be a IntervalRange"); } while (current_row_range_.index() == 0 && - std::get(current_page_range_).IsAfter( - std::get(current_row_range_))) { + IntervalRangeUtils::IsAfter(std::get(current_page_range_), + std::get(current_row_range_))) { current_row_range_ = row_ranges_itr_->NextRange(); } @@ -542,8 +543,8 @@ class RowRangesPageFilter { return true; } - return std::get(current_page_range_).IsBefore( - std::get(current_row_range_)); + return IntervalRangeUtils::IsBefore(std::get(current_page_range_), + std::get(current_row_range_)); } private: @@ -652,11 +653,11 @@ class LeafReader : public ColumnReaderImpl { 1}); } - if (row_ranges.RowCount() > 0) { - if (row_ranges.LastRow() > page_ranges->LastRow()) { + if (row_ranges.num_rows() > 0) { + if (row_ranges.last_row() > page_ranges->last_row()) { throw ParquetException( - "The provided row range " + row_ranges.ToString() + - " exceeds last page :" + page_ranges->GetRanges().back().ToString()); + "The provided row range " + row_ranges.ToString() + " exceeds last page :" + + IntervalRangeUtils::ToString(page_ranges->GetRanges().back())); } } } @@ -667,23 +668,21 @@ class LeafReader : public ColumnReaderImpl { /// using page index to reduce cost if (page_reader != nullptr && ctx_->row_ranges_per_rg) { // reset skipper - record_reader_->set_record_skipper(NULLPTR); + record_reader_->reset_record_skipper(); - const auto & row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()]; + const auto& row_ranges = (*ctx_->row_ranges_per_rg)[input_->current_row_group()]; // if specific row range is provided for this rg - if (row_ranges->RowCount() != 0) { - + if (row_ranges->num_rows() != 0) { // Use IntervalRanges to represent pages std::shared_ptr page_ranges; checkAndGetPageRanges(*row_ranges, page_ranges); // part 1, skip decompressing & decoding unnecessary pages - page_reader->set_data_page_filter( - RowRangesPageFilter(*row_ranges, page_ranges)); + page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges)); // part 2, skip unnecessary rows in necessary pages record_reader_->set_record_skipper( - std::make_shared(*page_ranges, + std::make_unique(*page_ranges, *row_ranges)); } else { NextRowGroup(); @@ -1162,7 +1161,7 @@ Status GetReader(const SchemaField& field, const std::shared_ptr& Status FileReaderImpl::GetRecordBatchReaderWithRowRanges( const std::vector& row_groups, const std::vector& column_indices, - const std::shared_ptr>> & row_ranges_per_rg, + const std::shared_ptr>>& row_ranges_per_rg, std::unique_ptr* out) { RETURN_NOT_OK(BoundsCheck(row_groups, column_indices)); diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 98ea6f5c1a055..1bcf04ee867e4 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -191,14 +191,15 @@ class PARQUET_EXPORT FileReader { /// \brief Return a RecordBatchReader of row groups selected from /// rows_to_return, whose columns are selected by column_indices. /// - /// Notice that rows_to_return is file based, it not only decides which row groups to read, - /// but also which rows to read in each row group. + /// Notice that rows_to_return is file based, it not only decides which row groups to + /// read, but also which rows to read in each row group. /// /// /// \returns error Status if either rows_to_return or column_indices /// contains an invalid index - virtual ::arrow::Status GetRecordBatchReader(const RowRanges& rows_to_return, - const std::vector& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + virtual ::arrow::Status GetRecordBatchReader( + const RowRanges& rows_to_return, const std::vector& column_indices, + std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; /// \brief Return a RecordBatchReader of row groups selected from /// row_group_indices, whose columns are selected by column_indices. diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 4ba8243f696e5..76fad7a754868 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1319,147 +1319,6 @@ std::shared_ptr ColumnReader::Make(const ColumnDescriptor* descr, return std::shared_ptr(nullptr); } -// ---------------------------------------------------------------------- -// RowRanges and ins implementations - -IntervalRanges::IntervalRanges() = default; - -IntervalRanges::IntervalRanges(const IntervalRange& range) { ranges_.push_back(range); } - -IntervalRanges::IntervalRanges(const std::vector& ranges) { - this->ranges_ = ranges; -} - -std::unique_ptr IntervalRanges::NewIterator() const { - return std::make_unique(ranges_); -} - -size_t IntervalRanges::RowCount() const { - size_t cnt = 0; - for (const IntervalRange& range : ranges_) { - cnt += range.Count(); - } - return cnt; -} - -int64_t IntervalRanges::LastRow() const { return ranges_.back().end; } - -bool IntervalRanges::IsValid() const { - if (ranges_.size() == 0) return true; - if (ranges_[0].start < 0) { - return false; - } - for (size_t i = 0; i < ranges_.size(); i++) { - if (!ranges_[i].IsValid()) { - return false; - } - } - for (size_t i = 1; i < ranges_.size(); i++) { - if (ranges_[i].start <= ranges_[i - 1].end) { - return false; - } - } - return true; -} - -bool IntervalRanges::IsOverlapping(const IntervalRange& searchRange) const { - auto it = std::lower_bound( - ranges_.begin(), ranges_.end(), searchRange, - [](const IntervalRange& r1, const IntervalRange& r2) { return r1.IsBefore(r2); }); - return it != ranges_.end() && !(*it).IsAfter(searchRange); -} - -std::string IntervalRanges::ToString() const { - std::string result = "["; - for (const IntervalRange& range : ranges_) { - result += range.ToString() + ", "; - } - if (!ranges_.empty()) { - result = result.substr(0, result.size() - 2); - } - result += "]"; - return result; -} - -std::vector> IntervalRanges::SplitByRowGroups( - const std::vector& rows_per_rg) const { - if (rows_per_rg.size() <= 1) { - std::unique_ptr single = - std::make_unique(*this); // return a copy of itself - auto ret = std::vector>(); - ret.push_back(std::move(single)); - return ret; - } - - std::vector> result; - - IntervalRanges spaces; - int64_t rows_so_far = 0; - for (size_t i = 0; i < rows_per_rg.size(); ++i) { - auto start = rows_so_far; - rows_so_far += rows_per_rg[i]; - auto end = rows_so_far - 1; - spaces.Add({start, end}); - } - - // each RG's row range forms a space, we need to adjust RowRanges in each space to - // zero based. - for (IntervalRange space : spaces.GetRanges()) { - auto intersection = Intersection(IntervalRanges(space), *this); - - std::unique_ptr zero_based_ranges = - std::make_unique(); - for (const IntervalRange& range : intersection.GetRanges()) { - zero_based_ranges->Add({range.start - space.start, range.end - space.start}); - } - result.push_back(std::move(zero_based_ranges)); - } - - return result; -} - -IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, - const IntervalRanges& right) { - IntervalRanges result; - - size_t rightIndex = 0; - for (const IntervalRange& l : left.ranges_) { - for (size_t i = rightIndex, n = right.ranges_.size(); i < n; ++i) { - const IntervalRange& r = right.ranges_[i]; - if (l.IsBefore(r)) { - break; - } else if (l.IsAfter(r)) { - rightIndex = i + 1; - continue; - } - result.Add(IntervalRange::Intersection(l, r)); - } - } - - return result; -} - -void IntervalRanges::Add(const IntervalRange& range) { - const IntervalRange rangeToAdd = range; - if (ranges_.size() > 1 && rangeToAdd.start <= ranges_.back().end) { - throw ParquetException("Ranges must be added in order"); - } - ranges_.push_back(rangeToAdd); -} - -const std::vector& IntervalRanges::GetRanges() const { return ranges_; } - -IntervalRowRangesIterator::IntervalRowRangesIterator( - const std::vector& ranges) - : ranges_(ranges) {} -IntervalRowRangesIterator::~IntervalRowRangesIterator() {} - -std::variant IntervalRowRangesIterator::NextRange() { - if (current_index_ >= ranges_.size()) return End(); - - return ranges_[current_index_++]; -} - // ---------------------------------------------------------------------- // RecordReader @@ -2451,7 +2310,7 @@ RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ra // copy row_ranges IntervalRanges skip_pages; for (auto& page : pages.GetRanges()) { - if (!orig_row_ranges.IsOverlapping(page)) { + if (!orig_row_ranges.IsOverlapping(page.start, page.end)) { skip_pages.Add(page); } } @@ -2460,10 +2319,9 @@ RecordSkipper::RecordSkipper(IntervalRanges& pages, const RowRanges& orig_row_ra range_iter_ = row_ranges_->NewIterator(); current_range_variant = range_iter_->NextRange(); - total_rows_to_process_ = pages.RowCount() - skip_pages.RowCount(); + total_rows_to_process_ = pages.num_rows() - skip_pages.num_rows(); } - int64_t RecordSkipper::AdviseNext(const int64_t current_rg_processed) { if (current_range_variant.index() == 2) { return 0; @@ -2502,8 +2360,8 @@ void RecordSkipper::AdjustRanges(IntervalRanges& skip_pages, while (orig_range_variant.index() != 2) { const auto& origin_range = std::get(orig_range_variant); while (skip_iter != skip_pages.GetRanges().end() && - skip_iter->IsBefore(origin_range)) { - skipped_rows += skip_iter->Count(); + IntervalRangeUtils::IsBefore(*skip_iter, origin_range)) { + skipped_rows += IntervalRangeUtils::Count(*skip_iter); ++skip_iter; } diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index cf11c8975dc52..f41995a0138f6 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -22,12 +22,12 @@ #include #include -#include "page_index.h" #include "parquet/exception.h" #include "parquet/level_conversion.h" #include "parquet/metadata.h" #include "parquet/platform.h" #include "parquet/properties.h" +#include "parquet/row_range.h" #include "parquet/schema.h" #include "parquet/types.h" @@ -303,124 +303,6 @@ class TypedColumnReader : public ColumnReader { int32_t* dict_len) = 0; }; -// Represent a range to read. The range is inclusive on both ends. -struct IntervalRange { - static IntervalRange Intersection(const IntervalRange& left, - const IntervalRange& right) { - if (left.start <= right.start) { - if (left.end >= right.start) { - return {right.start, std::min(left.end, right.end)}; - } - } else if (right.end >= left.start) { - return {left.start, std::min(left.end, right.end)}; - } - return {-1, -1}; // Return a default Range object if no intersection range found - } - - IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { - if (start > end) { - throw ParquetException("Invalid range with start: " + std::to_string(start) + - " and end: " + std::to_string(end)); - } - } - - size_t Count() const { - if(!IsValid()) { - throw ParquetException("Invalid range with start: " + std::to_string(start) + - " and end: " + std::to_string(end)); - } - return end - start + 1; - } - - bool IsBefore(const IntervalRange& other) const { return end < other.start; } - - bool IsAfter(const IntervalRange& other) const { return start > other.end; } - - bool IsOverlap(const IntervalRange& other) const { - return !IsBefore(other) && !IsAfter(other); - } - - bool IsValid() const { return start >= 0 && end >= 0 && end >= start; } - - std::string ToString() const { - return "(" + std::to_string(start) + ", " + std::to_string(end) + ")"; - } - - // inclusive - int64_t start = -1; - // inclusive - int64_t end = -1; -}; - -struct BitmapRange { - int64_t offset; - // zero added to, if there are less than 64 elements left in the column. - uint64_t bitmap; -}; - -struct End {}; - -// Represent a set of ranges to read. The ranges are sorted and non-overlapping. -class RowRanges { - public: - RowRanges() = default; - virtual ~RowRanges() = default; - virtual size_t RowCount() const = 0; - virtual int64_t LastRow() const = 0; - virtual bool IsValid() const = 0; - virtual bool IsOverlapping(const IntervalRange& searchRange) const = 0; - // Given a RowRanges with rows accross all RGs, split it into N RowRanges, where N = number of RGs - // e.g.: suppose we have 2 RGs: [0-99] and [100-199], and user is interested in RowRanges [90-110], then - // this function will return 2 RowRanges: [90-99] and [0-10] - virtual std::vector> SplitByRowGroups(const std::vector& rows_per_rg) const = 0; - virtual std::string ToString() const = 0; - - // Returns a vector of PageLocations that must be read all to get values for - // all included in this range virtual std::vector - // PageIndexesToInclude(const std::vector& all_pages) = 0; - - class Iterator { - public: - virtual std::variant NextRange() = 0; - virtual ~Iterator() = default; - }; - virtual std::unique_ptr NewIterator() const = 0; - -}; - -class IntervalRanges : public RowRanges { - public: - IntervalRanges(); - explicit IntervalRanges(const IntervalRange& range); - explicit IntervalRanges(const std::vector& ranges); - std::unique_ptr NewIterator() const override; - size_t RowCount() const override; - int64_t LastRow() const override; - bool IsValid() const override; - bool IsOverlapping(const IntervalRange& searchRange) const override; - std::string ToString() const override; - std::vector> SplitByRowGroups( - const std::vector& rows_per_rg) const override; - static IntervalRanges Intersection(const IntervalRanges& left, - const IntervalRanges& right); - void Add(const IntervalRange& range); - const std::vector& GetRanges() const; - - private: - std::vector ranges_; -}; - -class IntervalRowRangesIterator : public RowRanges::Iterator { - public: - IntervalRowRangesIterator(const std::vector& ranges); - ~IntervalRowRangesIterator() override; - std::variant NextRange() override; - - private: - const std::vector& ranges_; - size_t current_index_ = 0; -}; - namespace internal { // A RecordSkipper is used to skip uncessary rows within each pages. @@ -559,7 +441,11 @@ class PARQUET_EXPORT RecordReader { void reset_current_rg_processed_records() { current_rg_processed_records_ = 0; } - void set_record_skipper(const std::shared_ptr& skipper) { skipper_ = skipper; } + void set_record_skipper(std::unique_ptr skipper) { + skipper_ = std::move(skipper); + } + + void reset_record_skipper() { skipper_.reset(); } protected: /// \brief Indicates if we can have nullable values. Note that repeated fields @@ -613,7 +499,7 @@ class PARQUET_EXPORT RecordReader { // vector. bool read_dense_for_nullable_ = false; - std::shared_ptr skipper_ = NULLPTR; + std::unique_ptr skipper_ = NULLPTR; }; class BinaryRecordReader : virtual public RecordReader { diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc index cde60c583f507..04510143e54c0 100644 --- a/cpp/src/parquet/range_reader_test.cc +++ b/cpp/src/parquet/range_reader_test.cc @@ -39,7 +39,7 @@ using parquet::IntervalRanges; std::string random_string(std::string::size_type length) { static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - static std::mt19937 rg{std::random_device{}()}; + static std::mt19937 rg = std::mt19937(std::random_device()()); static std::uniform_int_distribution pick(0, sizeof(chrs) - 2); std::string s; @@ -240,10 +240,18 @@ void check_rb(std::unique_ptr rb_reader, } ASSERT_EQ(expected_rows, total_rows); - if (checking_col("a", column_names)) ASSERT_EQ(expected_sum * 2, sum_a); - if (checking_col("b", column_names)) ASSERT_EQ(expected_sum * 3, sum_b); - if (checking_col("c", column_names)) ASSERT_EQ(expected_sum, sum_c); - if (checking_col("d", column_names)) ASSERT_EQ(expected_sum, sum_d); + if (checking_col("a", column_names)) { + ASSERT_EQ(expected_sum * 2, sum_a); + } + if (checking_col("b", column_names)) { + ASSERT_EQ(expected_sum * 3, sum_b); + } + if (checking_col("c", column_names)) { + ASSERT_EQ(expected_sum, sum_c); + } + if (checking_col("d", column_names)) { + ASSERT_EQ(expected_sum, sum_d); + } } class TestRecordBatchReaderWithRanges : public testing::Test { @@ -279,7 +287,7 @@ TEST_F(TestRecordBatchReaderWithRanges, TestRangesSplit) {} TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { std::unique_ptr rb_reader; - IntervalRanges rows{{IntervalRange{0, 9}, IntervalRange{40, 49}, IntervalRange{80, 89}, IntervalRange{90, 99}}}; + IntervalRanges rows{{{0, 9}, {40, 49}, {80, 89}, {90, 99}}}; const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); @@ -301,7 +309,8 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { std::unique_ptr rb_reader; - IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, IntervalRange{90, 99}}}; + IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, + IntervalRange{90, 99}}}; const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); @@ -341,11 +350,15 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { std::unique_ptr rb_reader; std::vector ranges; for (int64_t i = 0; i < 30; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } for (int64_t i = 60; i < 90; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, @@ -359,25 +372,17 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { std::unique_ptr rb_reader; { - IntervalRanges rows{{IntervalRange{-1, 5}}}; - const std::vector column_indices{0, 1, 2, 3, 4}; - const auto status = - arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); - ASSERT_NOT_OK(status); - EXPECT_TRUE(status.message().find("The provided row range is invalid, keep it " - "monotone and non-interleaving: [(-1, 5)]") != - std::string::npos); + auto create_ranges = []() -> IntervalRanges { + return IntervalRanges{{IntervalRange{-1, 5}}}; + }; + EXPECT_THROW(create_ranges(), parquet::ParquetException); } { - IntervalRanges rows{{IntervalRange{0, 4}, {2, 5}}}; - const std::vector column_indices{0, 1, 2, 3, 4}; - const auto status = - arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); - ASSERT_NOT_OK(status); - EXPECT_TRUE( - status.message().find("The provided row range is invalid, keep it monotone and " - "non-interleaving: [(0, 4), (2, 5)]") != std::string::npos); + auto create_ranges = []() -> IntervalRanges { + return IntervalRanges{{{0, 4}, {2, 5}}}; + }; + EXPECT_THROW(create_ranges(), parquet::ParquetException); } { // will treat as {0,99} @@ -472,11 +477,15 @@ TEST_F(TestRecordBatchReaderWithRangesWithNulls, SelectOneRowSkipOneRow) { std::unique_ptr rb_reader; std::vector ranges; for (int64_t i = 0; i < 30; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } for (int64_t i = 60; i < 90; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, @@ -486,4 +495,4 @@ TEST_F(TestRecordBatchReaderWithRangesWithNulls, SelectOneRowSkipOneRow) { // (10 + 12 + ... + 28) + (60 + 62 ... + 88) = 1320 check_rb(std::move(rb_reader), 30, 1300); } -} \ No newline at end of file +} diff --git a/cpp/src/parquet/row_range.cc b/cpp/src/parquet/row_range.cc new file mode 100644 index 0000000000000..fa996a198f432 --- /dev/null +++ b/cpp/src/parquet/row_range.cc @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/row_range.h" + +#include + +#include "parquet/exception.h" + +namespace parquet { +// ---------------------------------------------------------------------- +// RowRanges and ins implementations +bool IsValid(const std::vector& ranges) { + if (ranges.size() == 0) return true; + if (ranges[0].start < 0) { + return false; + } + for (size_t i = 0; i < ranges.size(); i++) { + if (!IntervalRangeUtils::IsValid(ranges[i])) { + return false; + } + } + for (size_t i = 1; i < ranges.size(); i++) { + if (ranges[i].start <= ranges[i - 1].end) { + return false; + } + } + return true; +} + +IntervalRanges::IntervalRanges() = default; + +IntervalRanges::IntervalRanges(const IntervalRange& range) { + ranges_.push_back(range); + if (!IsValid(ranges_)) { + throw ParquetException("Invalid range with start: " + std::to_string(range.start) + + " and end: " + std::to_string(range.end) + + ", keep it monotone and non-interleaving"); + } +} + +IntervalRanges::IntervalRanges(const std::vector& ranges) { + this->ranges_ = ranges; + if (!IsValid(ranges_)) { + throw ParquetException("Invalid ranges: " + this->IntervalRanges::ToString() + + ", keep it monotone and non-interleaving"); + } +} + +std::unique_ptr IntervalRanges::NewIterator() const { + return std::make_unique(ranges_); +} + +size_t IntervalRanges::num_rows() const { + size_t cnt = 0; + for (const IntervalRange& range : ranges_) { + cnt += IntervalRangeUtils::Count(range); + } + return cnt; +} + +int64_t IntervalRanges::first_row() const { + if (ranges_.empty()) { + throw ParquetException("first_row() called on empty IntervalRanges"); + } + return ranges_.front().start; +} + +int64_t IntervalRanges::last_row() const { + if (ranges_.empty()) { + throw ParquetException("last_row() called on empty IntervalRanges"); + } + return ranges_.back().end; +} + +bool IntervalRanges::IsOverlapping(const int64_t start, const int64_t end) const { + auto searchRange = IntervalRange{start, end}; + auto it = std::lower_bound(ranges_.begin(), ranges_.end(), searchRange, + [](const IntervalRange& r1, const IntervalRange& r2) { + return IntervalRangeUtils::IsBefore(r1, r2); + }); + return it != ranges_.end() && !IntervalRangeUtils::IsAfter(*it, searchRange); +} + +std::string IntervalRanges::ToString() const { + std::string result = "["; + for (const IntervalRange& range : ranges_) { + result += IntervalRangeUtils::ToString(range) + ", "; + } + if (!ranges_.empty()) { + result = result.substr(0, result.size() - 2); + } + result += "]"; + return result; +} + +std::vector> IntervalRanges::SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const { + if (num_rows_per_sub_ranges.size() <= 1) { + std::unique_ptr single = + std::make_unique(*this); // return a copy of itself + auto ret = std::vector>(); + ret.push_back(std::move(single)); + return ret; + } + + std::vector> result; + + IntervalRanges spaces; + int64_t rows_so_far = 0; + for (size_t i = 0; i < num_rows_per_sub_ranges.size(); ++i) { + auto start = rows_so_far; + rows_so_far += num_rows_per_sub_ranges[i]; + auto end = rows_so_far - 1; + spaces.Add({start, end}); + } + + // each RG's row range forms a space, we need to adjust RowRanges in each space to + // zero based. + for (IntervalRange space : spaces.GetRanges()) { + auto intersection = Intersection(IntervalRanges(space), *this); + + std::unique_ptr zero_based_ranges = + std::make_unique(); + for (const IntervalRange& range : intersection.GetRanges()) { + zero_based_ranges->Add({range.start - space.start, range.end - space.start}); + } + result.push_back(std::move(zero_based_ranges)); + } + + return result; +} + +IntervalRanges IntervalRanges::Intersection(const IntervalRanges& left, + const IntervalRanges& right) { + IntervalRanges result; + + size_t rightIndex = 0; + for (const IntervalRange& l : left.ranges_) { + for (size_t i = rightIndex, n = right.ranges_.size(); i < n; ++i) { + const IntervalRange& r = right.ranges_[i]; + if (IntervalRangeUtils::IsBefore(l, r)) { + break; + } else if (IntervalRangeUtils::IsAfter(l, r)) { + rightIndex = i + 1; + continue; + } + result.Add(IntervalRangeUtils::Intersection(l, r)); + } + } + + return result; +} + +void IntervalRanges::Add(const IntervalRange& range) { + const IntervalRange rangeToAdd = range; + if (ranges_.size() > 1 && rangeToAdd.start <= ranges_.back().end) { + throw ParquetException("Ranges must be added in order"); + } + ranges_.push_back(rangeToAdd); +} + +const std::vector& IntervalRanges::GetRanges() const { return ranges_; } + +IntervalRowRangesIterator::IntervalRowRangesIterator( + const std::vector& ranges) + : ranges_(ranges) {} + +IntervalRowRangesIterator::~IntervalRowRangesIterator() {} + +std::variant IntervalRowRangesIterator::NextRange() { + if (current_index_ >= ranges_.size()) return End(); + + return ranges_[current_index_++]; +} +} // namespace parquet diff --git a/cpp/src/parquet/row_range.h b/cpp/src/parquet/row_range.h new file mode 100644 index 0000000000000..4e7c2631eb6a1 --- /dev/null +++ b/cpp/src/parquet/row_range.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module contains the logical parquet-cpp types (independent of Thrift +// structures), schema nodes, and related type tools + +#pragma once +#include + +#include "parquet/exception.h" + +namespace parquet { + +// Represent a range to read. The range is inclusive on both ends. +struct IntervalRange { + IntervalRange(const int64_t start_, const int64_t end_) : start(start_), end(end_) { + if (start > end) { + throw ParquetException("Invalid range with start: " + std::to_string(start) + + " bigger than end: " + std::to_string(end)); + } + } + + // inclusive + int64_t start = -1; + // inclusive + int64_t end = -1; +}; + +class IntervalRangeUtils { + public: + static IntervalRange Intersection(const IntervalRange& left, + const IntervalRange& right) { + if (left.start <= right.start) { + if (left.end >= right.start) { + return {right.start, std::min(left.end, right.end)}; + } + } else if (right.end >= left.start) { + return {left.start, std::min(left.end, right.end)}; + } + return {-1, -1}; // Return a default Range object if no intersection range found + } + + static std::string ToString(const IntervalRange& range) { + return "(" + std::to_string(range.start) + ", " + std::to_string(range.end) + ")"; + } + + static bool IsValid(const IntervalRange& range) { + return range.start >= 0 && range.end >= 0 && range.end >= range.start; + } + + static size_t Count(const IntervalRange& range) { + if (!IsValid(range)) { + throw ParquetException("Invalid range: " + ToString(range)); + } + return range.end - range.start + 1; + } + + static bool IsBefore(const IntervalRange& self, const IntervalRange& other) { + return self.end < other.start; + } + + static bool IsAfter(const IntervalRange& self, const IntervalRange& other) { + return self.start > other.end; + } + + static bool IsOverlap(const IntervalRange& self, const IntervalRange& other) { + return !IsBefore(self, other) && !IsAfter(self, other); + } +}; + +struct BitmapRange { + int64_t offset; + // zero added to, if there are less than 64 elements left in the column. + uint64_t bitmap; +}; + +struct End {}; + +// Represent a set of ranges to read. The ranges are sorted and non-overlapping. +class RowRanges { + public: + virtual ~RowRanges() = default; + /// \brief Total number of rows in the row ranges. + virtual size_t num_rows() const = 0; + /// \brief First row in the ranges + virtual int64_t first_row() const = 0; + /// \brief Last row in the ranges + virtual int64_t last_row() const = 0; + /// \brief Whether the given range from start to end overlaps with the row ranges. + virtual bool IsOverlapping(int64_t start, int64_t end) const = 0; + /// \brief Split the row ranges into sub row ranges according to the + /// specified number of rows per sub row ranges. A typical use case is + /// to convert file based RowRanges to row group based RowRanges. + /// + /// \param num_rows_per_sub_ranges number of rows per sub row range. + virtual std::vector> SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const = 0; + /// \brief Readable string representation + virtual std::string ToString() const = 0; + + class Iterator { + public: + virtual std::variant NextRange() = 0; + virtual ~Iterator() = default; + }; + /// \brief Create an iterator to iterate over the ranges + virtual std::unique_ptr NewIterator() const = 0; +}; + +class IntervalRanges : public RowRanges { + public: + IntervalRanges(); + explicit IntervalRanges(const IntervalRange& range); + explicit IntervalRanges(const std::vector& ranges); + std::unique_ptr NewIterator() const override; + size_t num_rows() const override; + int64_t first_row() const override; + int64_t last_row() const override; + bool IsOverlapping(int64_t start, int64_t end) const override; + std::string ToString() const override; + std::vector> SplitByRowRange( + const std::vector& num_rows_per_sub_ranges) const override; + static IntervalRanges Intersection(const IntervalRanges& left, + const IntervalRanges& right); + void Add(const IntervalRange& range); + const std::vector& GetRanges() const; + + private: + std::vector ranges_; +}; + +class IntervalRowRangesIterator : public RowRanges::Iterator { + public: + explicit IntervalRowRangesIterator(const std::vector& ranges); + ~IntervalRowRangesIterator() override; + std::variant NextRange() override; + + private: + const std::vector& ranges_; + size_t current_index_ = 0; +}; +} // namespace parquet diff --git a/cpp/src/parquet/row_range_test.cc b/cpp/src/parquet/row_range_test.cc index 44327baab04ce..bf0563211b8e6 100644 --- a/cpp/src/parquet/row_range_test.cc +++ b/cpp/src/parquet/row_range_test.cc @@ -17,7 +17,8 @@ #include #include "parquet/column_reader.h" -using namespace parquet; +using parquet::IntervalRange; +using parquet::IntervalRanges; class RowRangesTest : public ::testing::Test { protected: @@ -28,7 +29,7 @@ TEST_F(RowRangesTest, EmptyRG_ReturnsOriginalRowRanges) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 1); auto iter = result[0]->NewIterator(); @@ -42,7 +43,7 @@ TEST_F(RowRangesTest, SingleRG_ReturnsOriginalRowRanges2) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg = {11}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 1); auto iter = result[0]->NewIterator(); @@ -56,7 +57,7 @@ TEST_F(RowRangesTest, ReturnsTwoRowRanges) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg = {5, 6}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 2); { auto iter = result[0]->NewIterator(); @@ -78,7 +79,7 @@ TEST_F(RowRangesTest, ReturnsMultipleRowRanges) { row_ranges.Add(IntervalRange(0, 11)); std::vector rows_per_rg = {3, 4, 100}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 3); { auto iter = result[0]->NewIterator(); @@ -110,7 +111,7 @@ TEST_F(RowRangesTest, MultipleInputRange) { std::vector rows_per_rg = {100, 100}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 2); { auto iter = result[0]->NewIterator(); @@ -142,7 +143,7 @@ TEST_F(RowRangesTest, MultipleSplitPoints_ReturnWithEmptyRowRanges) { row_ranges.Add(IntervalRange(11, 18)); std::vector rows_per_rg = {5, 5, 5, 5, 5}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 5); { auto iter = result[0]->NewIterator(); @@ -176,7 +177,7 @@ TEST_F(RowRangesTest, RangeExceedRG) { row_ranges.Add(IntervalRange(0, 10)); std::vector rows_per_rg = {5, 3}; - auto result = row_ranges.SplitByRowGroups(rows_per_rg); + auto result = row_ranges.SplitByRowRange(rows_per_rg); ASSERT_EQ(result.size(), 2); { auto iter = result[0]->NewIterator();