From 047be8e91a92e0ee20fc677c06e8b8a28e544acb Mon Sep 17 00:00:00 2001 From: Hongbin Ma Date: Thu, 18 Jan 2024 18:21:54 +0800 Subject: [PATCH] skip io coding done one thread for each logical range fix prebuffer conflict with datappagefilter fix coalesce problem ading tests fix style --- cpp/src/arrow/io/caching.cc | 185 ++++++++++++++++++++---- cpp/src/arrow/io/caching.h | 3 +- cpp/src/arrow/io/interfaces.cc | 15 ++ cpp/src/arrow/io/interfaces.h | 4 + cpp/src/parquet/CMakeLists.txt | 7 +- cpp/src/parquet/arrow/reader.cc | 15 +- cpp/src/parquet/arrow/reader_internal.h | 1 + cpp/src/parquet/file_reader.cc | 79 ++++++++-- cpp/src/parquet/file_reader.h | 10 +- cpp/src/parquet/range_reader_test.cc | 117 ++++++++++++--- 10 files changed, 363 insertions(+), 73 deletions(-) diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index bd61c40693a27..75a3c47a3063f 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -24,6 +24,11 @@ #include "arrow/buffer.h" #include "arrow/io/caching.h" + +#include +#include +#include + #include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/util/future.h" @@ -132,13 +137,56 @@ CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_mil namespace internal { +std::vector GetReadRangesExcludingHoles(const ReadRange& read_range, + const std::vector& holes) { + std::vector ranges; + int64_t offset = read_range.offset; + for (const auto& hole : holes) { + if (hole.offset >= read_range.offset + read_range.length || + hole.offset + hole.length <= read_range.offset) { + throw parquet::ParquetException("Parquet error: holes not subset of read range"); + } + if (hole.offset > offset) { + ranges.push_back({offset, hole.offset - offset}); + } + offset = hole.offset + hole.length; + } + if (offset < read_range.offset + read_range.length) { + ranges.push_back({offset, read_range.offset + read_range.length - offset}); + } + return ranges; +} + +int64_t GetActualBufferOffset(const int64_t offset, const int64_t buffer_start_offset, + const std::vector& holes) { + int padding = 0; + for (const auto& hole : holes) { + if (hole.offset >= offset) { + break; + } + if (hole.offset + hole.length <= offset) { + padding += hole.length; + } else { + padding += offset - hole.offset; + } + } + return offset - padding - buffer_start_offset; +} + struct RangeCacheEntry { - ReadRange range; - Future> future; + ReadRange range; // nominal range for this entry + std::vector holes; // nominal range - holes = actual read ranges + Future future; // the future for actual read ranges + std::shared_ptr buffer; // actual read ranges are read into this buffer with + // pre-calculated position RangeCacheEntry() = default; - RangeCacheEntry(const ReadRange& range_, Future> future_) - : range(range_), future(std::move(future_)) {} + RangeCacheEntry(const ReadRange& range, std::vector& holes, + Future& future, std::unique_ptr& buffer) + : range(range), + holes(std::move(holes)), + future(std::move(future)), + buffer(std::move(buffer)) {} friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) { return left.range.offset < right.range.offset; @@ -156,28 +204,87 @@ struct ReadRangeCache::Impl { virtual ~Impl() = default; - // Get the future corresponding to a range - virtual Future> MaybeRead(RangeCacheEntry* entry) { - return entry->future; + virtual Future MaybeRead(RangeCacheEntry* entry) { return entry->future; } + + Future DoAsyncRead(const ReadRange& range, const std::vector& holes, + std::unique_ptr& buffer) const { + int64_t total_size = range.length; + for (const auto& hole : holes) { + total_size -= hole.length; + } + + buffer = *AllocateBuffer(total_size, 64, ctx.pool()); + + auto actual_read_ranges = GetReadRangesExcludingHoles(range, holes); + return file->ReadAsync(ctx, actual_read_ranges, buffer->mutable_data()); } // Make cache entries for ranges virtual std::vector MakeCacheEntries( - const std::vector& ranges) { + const std::vector& ranges, + std::vector>& holes_foreach_range) { std::vector new_entries; new_entries.reserve(ranges.size()); - for (const auto& range : ranges) { - new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length)); + + for (size_t i = 0; i < ranges.size(); i++) { + std::unique_ptr buffer; + auto future = DoAsyncRead(ranges[i], holes_foreach_range[i], buffer); + new_entries.emplace_back(ranges[i], holes_foreach_range[i], future, buffer); } return new_entries; } + void CoalesceRanges( + std::vector& ranges, + const std::vector>& holes_foreach_range_orig, + std::vector>& holes_foreach_range_new) const { + auto result = CoalesceReadRanges(std::move(ranges), options.hole_size_limit, + options.range_size_limit); + if (!result.ok()) { + throw parquet::ParquetException("Failed to coalesce ranges: " + + result.status().message()); + } + ranges = std::move(result.ValueOrDie()); + holes_foreach_range_new.resize(ranges.size()); + + std::vector flatten_holes; + size_t index = 0; + flatten_holes.reserve( + std::accumulate(holes_foreach_range_orig.begin(), holes_foreach_range_orig.end(), + 0, [](int sum, const auto& v) { return sum + v.size(); })); + for (const auto& v : holes_foreach_range_orig) { + std::move(v.begin(), v.end(), std::back_inserter(flatten_holes)); + } + for (size_t i = 0; i < ranges.size(); ++i) { + std::vector current_range_holes; + const auto& range = ranges.at(i); + for (; index < flatten_holes.size(); ++index) { + const auto& hole = flatten_holes.at(index); + if (hole.offset >= range.offset + range.length) { + break; + } + if (!(hole.offset >= range.offset && + hole.offset + hole.length <= range.offset + range.length)) { + throw parquet::ParquetException( + "Parquet error: holes not subset of read range"); + } + current_range_holes.push_back({hole.offset, hole.length}); + } + holes_foreach_range_new.at(i) = std::move(current_range_holes); + } + if (ranges.size() != holes_foreach_range_new.size()) { + throw parquet::ParquetException("ranges.size() != holes_foreach_range_new.size()"); + } + } + // Add the given ranges to the cache, coalescing them where possible - virtual Status Cache(std::vector ranges) { - ARROW_ASSIGN_OR_RAISE( - ranges, internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit, - options.range_size_limit)); - std::vector new_entries = MakeCacheEntries(ranges); + virtual Status Cache(std::vector ranges, + std::vector> holes_foreach_range_orig) { + std::vector> holes_foreach_range_new; + CoalesceRanges(ranges, holes_foreach_range_orig, holes_foreach_range_new); + + std::vector new_entries = + MakeCacheEntries(ranges, holes_foreach_range_new); // Add new entries, themselves ordered by offset if (entries.size() > 0) { std::vector merged(entries.size() + new_entries.size()); @@ -205,21 +312,32 @@ struct ReadRangeCache::Impl { return entry.range.offset + entry.range.length < range.offset + range.length; }); if (it != entries.end() && it->range.Contains(range)) { - auto fut = MaybeRead(&*it); - ARROW_ASSIGN_OR_RAISE(auto buf, fut.result()); + const auto fut = MaybeRead(&*it); + const auto result = fut.result(); + if (!result.ok()) { + throw parquet::ParquetException( + "Parquet error: read failed for one of the sub range"); + } + if (options.lazy && options.prefetch_limit > 0) { int64_t num_prefetched = 0; for (auto next_it = it + 1; next_it != entries.end() && num_prefetched < options.prefetch_limit; ++next_it) { if (!next_it->future.is_valid()) { - next_it->future = - file->ReadAsync(ctx, next_it->range.offset, next_it->range.length); + std::unique_ptr buffer; + next_it->future = DoAsyncRead(next_it->range, next_it->holes, buffer); + next_it->buffer = std::move(buffer); } ++num_prefetched; } } - return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length); + + const auto actual_start = + GetActualBufferOffset(range.offset, it->range.offset, it->holes); + const auto actual_end = + GetActualBufferOffset(range.offset + range.length, it->range.offset, it->holes); + return SliceBuffer(it->buffer, actual_start, actual_end - actual_start); } return Status::Invalid("ReadRangeCache did not find matching cache entry"); } @@ -264,29 +382,37 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl { virtual ~LazyImpl() = default; - Future> MaybeRead(RangeCacheEntry* entry) override { + Future MaybeRead(RangeCacheEntry* entry) override { // Called by superclass Read()/WaitFor() so we have the lock if (!entry->future.is_valid()) { - entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length); + std::unique_ptr buffer; + entry->future = DoAsyncRead(entry->range, entry->holes, buffer); + entry->buffer = std::move(buffer); } return entry->future; } std::vector MakeCacheEntries( - const std::vector& ranges) override { + const std::vector& ranges, + std::vector>& holes_foreach_range) override { std::vector new_entries; new_entries.reserve(ranges.size()); - for (const auto& range : ranges) { + for (size_t i = 0; i < ranges.size(); ++i) { + const auto& range = ranges[i]; + auto& holes = holes_foreach_range[i]; + auto temp_buffer = std::make_unique(NULLPTR, 0); + auto temp_future = Future(); // In the lazy variant, don't read data here - later, a call to Read or WaitFor // will call back to MaybeRead (under the lock) which will fill the future. - new_entries.emplace_back(range, Future>()); + new_entries.emplace_back(range, holes, temp_future, temp_buffer); } return new_entries; } - Status Cache(std::vector ranges) override { + Status Cache(std::vector ranges, + std::vector> holes_foreach_range) override { std::unique_lock guard(entry_mutex); - return ReadRangeCache::Impl::Cache(std::move(ranges)); + return ReadRangeCache::Impl::Cache(std::move(ranges), std::move(holes_foreach_range)); } Result> Read(ReadRange range) override { @@ -317,8 +443,9 @@ ReadRangeCache::ReadRangeCache(std::shared_ptr owned_file, ReadRangeCache::~ReadRangeCache() = default; -Status ReadRangeCache::Cache(std::vector ranges) { - return impl_->Cache(std::move(ranges)); +Status ReadRangeCache::Cache(std::vector ranges, + std::vector> holes_foreach_range) { + return impl_->Cache(std::move(ranges), std::move(holes_foreach_range)); } Result> ReadRangeCache::Read(ReadRange range) { diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h index e2b911fafdbbc..4f5f801826567 100644 --- a/cpp/src/arrow/io/caching.h +++ b/cpp/src/arrow/io/caching.h @@ -131,7 +131,8 @@ class ARROW_EXPORT ReadRangeCache { /// /// The caller must ensure that the ranges do not overlap with each other, /// nor with previously cached ranges. Otherwise, behaviour will be undefined. - Status Cache(std::vector ranges); + Status Cache(std::vector ranges, + std::vector> holes_foreach_range = {}); /// \brief Read a range previously given to Cache(). Result> Read(ReadRange range); diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 1d35549cc4345..cb294db7fa1d2 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -171,6 +171,21 @@ Future> RandomAccessFile::ReadAsync(const IOContext& ctx ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); })); } +Future RandomAccessFile::ReadAsync(const IOContext& ctx, + std::vector& ranges, void* out) { + auto self = checked_pointer_cast(shared_from_this()); + return DeferNotOk(internal::SubmitIO( + ctx, [self, ranges = std::move(ranges), out]() mutable -> Result { + int64_t read_size = 0; + for (const auto& r : ranges) { + RETURN_NOT_OK( + self->ReadAt(r.offset, r.length, static_cast(out) + read_size)); + read_size += r.length; + } + return read_size; + })); +} + Future> RandomAccessFile::ReadAsync(int64_t position, int64_t nbytes) { return ReadAsync(io_context(), position, nbytes); diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index b36c38c6d4868..7e29ff1613ad7 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -296,6 +296,10 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { virtual Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes); + /// EXPERIMENTAL: Read data asynchronously. The dest is provided by `out` + virtual Future ReadAsync(const IOContext& ctx, std::vector& ranges, + void* out); + /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext. Future> ReadAsync(int64_t position, int64_t nbytes); diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 19a9ccf58aa3a..39e0794e750be 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -360,6 +360,11 @@ add_parquet_test(internals-test set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON SKIP_UNITY_BUILD_INCLUSION ON) +add_parquet_test(rowrange-read-test + SOURCES + row_range_test.cc + range_reader_test.cc +) add_parquet_test(reader-test SOURCES @@ -367,8 +372,6 @@ add_parquet_test(reader-test level_conversion_test.cc column_scanner_test.cc reader_test.cc - range_reader_test.cc - row_range_test.cc stream_reader_test.cc test_util.cc) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e151fb8c4dee6..c258692dc4677 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -223,6 +223,7 @@ class FileReaderImpl : public FileReader { ctx->included_leaves = included_leaves; ctx->row_ranges_per_rg = row_ranges_per_rg; // copy the shared pointer to extend its lifecycle + ctx->pre_buffer_enabled = properties().pre_buffer(); return GetReader(manifest_.schema_fields[i], ctx, out); } @@ -667,7 +668,13 @@ class LeafReader : public ColumnReaderImpl { checkAndGetPageRanges(*row_ranges, page_ranges); // part 1, skip decompressing & decoding unnecessary pages - page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges)); + if (!ctx_->pre_buffer_enabled) { + page_reader->set_data_page_filter( + RowRangesPageFilter(*row_ranges, page_ranges)); + } else { + // Pre buffer already skipped useless pages, so should not apply + // data_page_filter here again. + } // part 2, skip unnecessary rows in necessary pages record_reader_->set_record_skipper( @@ -1158,7 +1165,7 @@ Status FileReaderImpl::GetRecordBatchReaderWithRowRanges( // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled BEGIN_PARQUET_CATCH_EXCEPTIONS reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); + reader_properties_.cache_options(), row_ranges_per_rg); END_PARQUET_CATCH_EXCEPTIONS } @@ -1366,7 +1373,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, if (reader_properties_.pre_buffer()) { BEGIN_PARQUET_CATCH_EXCEPTIONS reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); + reader_properties_.cache_options(), {}); END_PARQUET_CATCH_EXCEPTIONS } ::arrow::AsyncGenerator row_group_generator = @@ -1403,7 +1410,7 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, BEGIN_PARQUET_CATCH_EXCEPTIONS parquet_reader()->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), - reader_properties_.cache_options()); + reader_properties_.cache_options(), {}); END_PARQUET_CATCH_EXCEPTIONS } diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h index b30aef2691c19..961e26052d3e6 100644 --- a/cpp/src/parquet/arrow/reader_internal.h +++ b/cpp/src/parquet/arrow/reader_internal.h @@ -114,6 +114,7 @@ struct ReaderContext { bool filter_leaves; std::shared_ptr> included_leaves; std::shared_ptr>> row_ranges_per_rg; + bool pre_buffer_enabled; bool IncludesLeaf(int leaf_index) const { if (this->filter_leaves) { diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index b3dd1d6054ac8..39c16e5bddf6f 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -375,13 +375,39 @@ class SerializedFile : public ParquetFileReader::Contents { file_metadata_ = std::move(metadata); } - void PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options) { + auto GetRowGroupPageIndexReader(int row_group_index, bool required) { + auto ret = GetPageIndexReader()->RowGroup(row_group_index); + + if (required) { + if (!ret) { + throw ParquetException("Page index is required but not found for row group " + + std::to_string(row_group_index)); + } + } + return ret; + } + + static auto GetColumnOffsetIndex(std::shared_ptr rg_page_idx, + int column_index, bool required) { + auto ret = rg_page_idx->GetOffsetIndex(column_index); + + if (required) { + if (!ret) { + throw ParquetException("Offset index is required but not found for column " + + std::to_string(column_index)); + } + } + return ret; + } + + void PreBuffer( + const std::vector& row_groups, const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options, + const std::shared_ptr>>& row_ranges_per_rg) { cached_source_ = std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options); std::vector<::arrow::io::ReadRange> ranges; + std::vector> holes_foreach_range; prebuffered_column_chunks_.clear(); int num_cols = file_metadata_->num_columns(); // a bitmap for buffered columns. @@ -395,12 +421,45 @@ class SerializedFile : public ParquetFileReader::Contents { } for (int row : row_groups) { prebuffered_column_chunks_[row] = buffer_columns; + const int64_t num_rows = file_metadata_->RowGroup(row)->num_rows(); + const auto rg_page_idx = GetRowGroupPageIndexReader(row, true); + for (int col : column_indices) { ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); + + std::vector<::arrow::io::ReadRange> holes; + const auto offset_idx = GetColumnOffsetIndex(rg_page_idx, col, true); + const auto& page_locations = offset_idx->page_locations(); + for (size_t i = 0; i < page_locations.size(); i++) { + IntervalRange page_row_interval = {-1, -1}; + if (i != page_locations.size() - 1) { + page_row_interval = IntervalRange{page_locations[i].first_row_index, + page_locations[i + 1].first_row_index - 1}; + } else { + page_row_interval = + IntervalRange{page_locations[i].first_row_index, num_rows - 1}; + } + auto page_offset_range = ::arrow::io::ReadRange{ + page_locations[i].offset, page_locations[i].compressed_page_size}; + if (!row_ranges_per_rg->at(row)->IsOverlapping(page_row_interval.start, + page_row_interval.end)) { + if (holes.empty()) { + holes.push_back({page_offset_range.offset, page_offset_range.length}); + } else { + if (holes.back().offset + holes.back().length == page_offset_range.offset) { + holes.back().length += page_offset_range.length; + } else { + holes.push_back({page_offset_range.offset, page_offset_range.length}); + } + } + } + } + holes_foreach_range.emplace_back(std::move(holes)); } } - PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); + PARQUET_THROW_NOT_OK( + cached_source_->Cache(std::move(ranges), std::move(holes_foreach_range))); } ::arrow::Future<> WhenBuffered(const std::vector& row_groups, @@ -887,14 +946,14 @@ std::shared_ptr ParquetFileReader::RowGroup(int i) { return contents_->GetRowGroup(i); } -void ParquetFileReader::PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options) { +void ParquetFileReader::PreBuffer( + const std::vector& row_groups, const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options, + const std::shared_ptr>>& row_ranges_per_rg) { // Access private methods here SerializedFile* file = ::arrow::internal::checked_cast(contents_.get()); - file->PreBuffer(row_groups, column_indices, ctx, options); + file->PreBuffer(row_groups, column_indices, ctx, options, row_ranges_per_rg); } ::arrow::Future<> ParquetFileReader::WhenBuffered( diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index b59b59f95c2d8..c5e5f13eaa1e7 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -29,7 +29,7 @@ #include "parquet/properties.h" namespace parquet { - +class RowRanges; class ColumnReader; class FileMetaData; class PageIndexReader; @@ -196,10 +196,10 @@ class PARQUET_EXPORT ParquetFileReader { /// only one row group at a time may be useful. /// /// This method may throw. - void PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options); + void PreBuffer( + const std::vector& row_groups, const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options, + const std::shared_ptr>>& row_ranges_per_rg); /// Wait for the specified row groups and column indices to be pre-buffered. /// diff --git a/cpp/src/parquet/range_reader_test.cc b/cpp/src/parquet/range_reader_test.cc index 04510143e54c0..80f95f5811c5c 100644 --- a/cpp/src/parquet/range_reader_test.cc +++ b/cpp/src/parquet/range_reader_test.cc @@ -30,18 +30,21 @@ #include #include +#include +#include #include #include using parquet::IntervalRange; using parquet::IntervalRanges; -std::string random_string(std::string::size_type length) { +std::string random_string() { static auto& chrs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - - static std::mt19937 rg = std::mt19937(std::random_device()()); + static std::mt19937 rg{std::random_device()()}; static std::uniform_int_distribution pick(0, sizeof(chrs) - 2); + int length = std::rand() % 100; + std::string s; s.reserve(length); while (length--) s += chrs[pick(rg)]; @@ -51,10 +54,10 @@ std::string random_string(std::string::size_type length) { /// The table looks like (with_nulls = false): // { -// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, -// { a: {x: 1, y: 1}, b: {1, 1, 1}, c: "1", d: 1}, +// { a: {x: 0, y: 0}, b: {0, 0, 0}, c: "0", d: 0}, +// { a: {x: 1, y: 1}, b: {1, 1, 1}, c: "1", d: 1}, // ... -// { a: {x: 99, y: 99}, b: {99, 99, 99}, c: "99", d: 99} +// { a: {x: 99, y: 99}, b: {99, 99, 99}, c: "99", d: 99} // } arrow::Result> GetTable(bool with_nulls = false) { // if with_nulls, the generated table should null values @@ -117,7 +120,7 @@ arrow::Result> GetTable(bool with_nulls = false) { uint8_t valid_bytes[100]; for (size_t i = 0; i < 100; i++) { // add more chars to make this column unaligned with other columns' page - strs.push_back(std::to_string(i) + random_string(20)); + strs.push_back(std::to_string(i) + random_string()); valid_bytes[i] = flags[i]; } ARROW_RETURN_NOT_OK(string_builder.AppendValues(strs, &valid_bytes[0])); @@ -225,8 +228,13 @@ void check_rb(std::unique_ptr rb_reader, std::dynamic_pointer_cast(batch->GetColumnByName("c")); for (auto iter = c_array->begin(); iter != c_array->end(); ++iter) { sum_c += std::stoi(std::string( - (*iter).has_value() ? (*iter).value().substr(0, (*iter).value().size() - 20) - : "0")); + (*iter).has_value() + ? (*iter).value().substr( + 0, std::distance( + (*iter).value().begin(), + std::find_if((*iter).value().begin(), (*iter).value().end(), + [](char c) { return !std::isdigit(c); }))) + : "0")); } } @@ -254,9 +262,37 @@ void check_rb(std::unique_ptr rb_reader, } } -class TestRecordBatchReaderWithRanges : public testing::Test { +class CountingBytesBufferReader : public arrow::io::BufferReader { + public: + using BufferReader::BufferReader; + + arrow::Future> ReadAsync( + const arrow::io::IOContext& context, int64_t position, int64_t nbytes) override { + read_bytes_ += nbytes; + return BufferReader::ReadAsync(context, position, nbytes); + } + + arrow::Future ReadAsync(const arrow::io::IOContext& ctx, + std::vector& ranges, + void* out) override { + read_bytes_ += std::accumulate(ranges.begin(), ranges.end(), 0, + [](int64_t sum, const arrow::io::ReadRange& range) { + return sum + range.length; + }); + return RandomAccessFile::ReadAsync(ctx, ranges, out); + } + + int64_t read_bytes() const { return read_bytes_; } + + private: + int64_t read_bytes_ = 0; +}; + +class TestRecordBatchReaderWithRanges : public testing::TestWithParam { public: void SetUp() { + int mode = GetParam(); + ASSERT_OK_AND_ASSIGN(auto buffer, WriteFullFile()); arrow::MemoryPool* pool = arrow::default_memory_pool(); @@ -267,9 +303,20 @@ class TestRecordBatchReaderWithRanges : public testing::Test { auto arrow_reader_props = parquet::ArrowReaderProperties(); arrow_reader_props.set_batch_size(10); // default 64 * 1024 + if (mode != 0) { + arrow_reader_props.set_pre_buffer(true); + } + + if (mode == 2) { + arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults(); + cache_options.hole_size_limit = 0; + cache_options.lazy = true; + cache_options.prefetch_limit = 2; + arrow_reader_props.set_cache_options(cache_options); + } parquet::arrow::FileReaderBuilder reader_builder; - const auto in_file = std::make_shared(buffer); + in_file = std::make_shared(buffer); ASSERT_OK(reader_builder.Open(in_file, /*memory_map=*/reader_properties)); reader_builder.memory_pool(pool); reader_builder.properties(arrow_reader_props); @@ -281,11 +328,10 @@ class TestRecordBatchReaderWithRanges : public testing::Test { protected: std::unique_ptr arrow_reader; + std::shared_ptr in_file; }; -TEST_F(TestRecordBatchReaderWithRanges, TestRangesSplit) {} - -TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { +TEST_P(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { std::unique_ptr rb_reader; IntervalRanges rows{{{0, 9}, {40, 49}, {80, 89}, {90, 99}}}; @@ -296,7 +342,7 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOnePageForEachRG) { check_rb(std::move(rb_reader), 40, 2280); } -TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { +TEST_P(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { std::unique_ptr rb_reader; IntervalRanges rows{{IntervalRange{0, 7}, IntervalRange{16, 23}}}; @@ -307,7 +353,7 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectSomePageForOneRG) { check_rb(std::move(rb_reader), 16, 184); } -TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { +TEST_P(TestRecordBatchReaderWithRanges, SelectAllRange) { std::unique_ptr rb_reader; IntervalRanges rows{{IntervalRange{0, 29}, IntervalRange{30, 59}, IntervalRange{60, 89}, IntervalRange{90, 99}}}; @@ -319,7 +365,24 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectAllRange) { check_rb(std::move(rb_reader), 100, 4950); } -TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { +TEST_P(TestRecordBatchReaderWithRanges, CheckSkipIOEffective) { + std::unique_ptr rb_reader; + IntervalRanges rows{{IntervalRange{3, 3}}}; + + const std::vector column_indices{0, 1, 2, 3, 4}; + ASSERT_OK(arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader)); + + check_rb(std::move(rb_reader), 1, 3); + + // only one page should be touched when we enable pre_buffer + // the total read bytes should be small (the first RG is about 3000 bytes) + auto mode = GetParam(); + if (mode == 1 || mode == 2) { + ASSERT_LT(in_file->read_bytes(), 1000); + } +} + +TEST_P(TestRecordBatchReaderWithRanges, SelectEmptyRange) { std::unique_ptr rb_reader; IntervalRanges rows{}; @@ -330,13 +393,15 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectEmptyRange) { check_rb(std::move(rb_reader), 0, 0); } -TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { +TEST_P(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { // case 1: only care about RG 0 { std::unique_ptr rb_reader; std::vector ranges; for (int64_t i = 0; i < 30; i++) { - if (i % 2 == 0) ranges.push_back({i, i}); + if (i % 2 == 0) { + ranges.push_back({i, i}); + } } const std::vector column_indices{0, 1, 2, 3, 4}; ASSERT_OK(arrow_reader->GetRecordBatchReader(IntervalRanges(ranges), column_indices, @@ -369,7 +434,7 @@ TEST_F(TestRecordBatchReaderWithRanges, SelectOneRowSkipOneRow) { } } -TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { +TEST_P(TestRecordBatchReaderWithRanges, InvalidRanges) { std::unique_ptr rb_reader; { auto create_ranges = []() -> IntervalRanges { @@ -397,6 +462,12 @@ TEST_F(TestRecordBatchReaderWithRanges, InvalidRanges) { } } +// mode 0: normal read with pre_buffer = false +// mode 1: normal read with pre_buffer = true +// mode 2: with pre_buffer = true and set cache options +INSTANTIATE_TEST_SUITE_P(ParameterizedTestRecordBatchReaderWithRanges, + TestRecordBatchReaderWithRanges, testing::Values(0, 1, 2)); + TEST(TestRecordBatchReaderWithRangesBadCases, NoPageIndex) { using parquet::ArrowWriterProperties; using parquet::WriterProperties; @@ -439,8 +510,10 @@ TEST(TestRecordBatchReaderWithRangesBadCases, NoPageIndex) { std::vector column_indices{0, 1, 2, 3, 4}; auto status = arrow_reader->GetRecordBatchReader(rows, column_indices, &rb_reader); ASSERT_NOT_OK(status); - EXPECT_TRUE(status.message().find("Attempting to read with Ranges but Page Index is " - "not found for Row Group: 0") != std::string::npos); + + EXPECT_TRUE( + status.message().find("Page index is required but not found for row group 0") != + std::string::npos); } class TestRecordBatchReaderWithRangesWithNulls : public testing::Test {