Skip to content

Commit

Permalink
skip io coding done
Browse files Browse the repository at this point in the history
one thread for each logical range

fix prebuffer conflict with datappagefilter

fix coalesce problem

ading tests

fix style
  • Loading branch information
binmahone committed Jan 22, 2024
1 parent 60db7df commit 047be8e
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 73 deletions.
185 changes: 156 additions & 29 deletions cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@

#include "arrow/buffer.h"
#include "arrow/io/caching.h"

#include <arrow/memory_pool.h>
#include <parquet/exception.h>
#include <numeric>

#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/future.h"
Expand Down Expand Up @@ -132,13 +137,56 @@ CacheOptions CacheOptions::MakeFromNetworkMetrics(int64_t time_to_first_byte_mil

namespace internal {

std::vector<ReadRange> GetReadRangesExcludingHoles(const ReadRange& read_range,
const std::vector<ReadRange>& holes) {
std::vector<ReadRange> ranges;
int64_t offset = read_range.offset;
for (const auto& hole : holes) {
if (hole.offset >= read_range.offset + read_range.length ||
hole.offset + hole.length <= read_range.offset) {
throw parquet::ParquetException("Parquet error: holes not subset of read range");
}
if (hole.offset > offset) {
ranges.push_back({offset, hole.offset - offset});
}
offset = hole.offset + hole.length;
}
if (offset < read_range.offset + read_range.length) {
ranges.push_back({offset, read_range.offset + read_range.length - offset});
}
return ranges;
}

int64_t GetActualBufferOffset(const int64_t offset, const int64_t buffer_start_offset,
const std::vector<ReadRange>& holes) {
int padding = 0;
for (const auto& hole : holes) {
if (hole.offset >= offset) {
break;
}
if (hole.offset + hole.length <= offset) {
padding += hole.length;
} else {
padding += offset - hole.offset;
}
}
return offset - padding - buffer_start_offset;
}

struct RangeCacheEntry {
ReadRange range;
Future<std::shared_ptr<Buffer>> future;
ReadRange range; // nominal range for this entry
std::vector<ReadRange> holes; // nominal range - holes = actual read ranges
Future<int64_t> future; // the future for actual read ranges
std::shared_ptr<Buffer> buffer; // actual read ranges are read into this buffer with
// pre-calculated position

RangeCacheEntry() = default;
RangeCacheEntry(const ReadRange& range_, Future<std::shared_ptr<Buffer>> future_)
: range(range_), future(std::move(future_)) {}
RangeCacheEntry(const ReadRange& range, std::vector<ReadRange>& holes,
Future<int64_t>& future, std::unique_ptr<Buffer>& buffer)
: range(range),
holes(std::move(holes)),
future(std::move(future)),
buffer(std::move(buffer)) {}

friend bool operator<(const RangeCacheEntry& left, const RangeCacheEntry& right) {
return left.range.offset < right.range.offset;
Expand All @@ -156,28 +204,87 @@ struct ReadRangeCache::Impl {

virtual ~Impl() = default;

// Get the future corresponding to a range
virtual Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) {
return entry->future;
virtual Future<int64_t> MaybeRead(RangeCacheEntry* entry) { return entry->future; }

Future<int64_t> DoAsyncRead(const ReadRange& range, const std::vector<ReadRange>& holes,
std::unique_ptr<Buffer>& buffer) const {
int64_t total_size = range.length;
for (const auto& hole : holes) {
total_size -= hole.length;
}

buffer = *AllocateBuffer(total_size, 64, ctx.pool());

auto actual_read_ranges = GetReadRangesExcludingHoles(range, holes);
return file->ReadAsync(ctx, actual_read_ranges, buffer->mutable_data());
}

// Make cache entries for ranges
virtual std::vector<RangeCacheEntry> MakeCacheEntries(
const std::vector<ReadRange>& ranges) {
const std::vector<ReadRange>& ranges,
std::vector<std::vector<ReadRange>>& holes_foreach_range) {
std::vector<RangeCacheEntry> new_entries;
new_entries.reserve(ranges.size());
for (const auto& range : ranges) {
new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset, range.length));

for (size_t i = 0; i < ranges.size(); i++) {
std::unique_ptr<Buffer> buffer;
auto future = DoAsyncRead(ranges[i], holes_foreach_range[i], buffer);
new_entries.emplace_back(ranges[i], holes_foreach_range[i], future, buffer);
}
return new_entries;
}

void CoalesceRanges(
std::vector<ReadRange>& ranges,
const std::vector<std::vector<ReadRange>>& holes_foreach_range_orig,
std::vector<std::vector<ReadRange>>& holes_foreach_range_new) const {
auto result = CoalesceReadRanges(std::move(ranges), options.hole_size_limit,
options.range_size_limit);
if (!result.ok()) {
throw parquet::ParquetException("Failed to coalesce ranges: " +
result.status().message());
}
ranges = std::move(result.ValueOrDie());
holes_foreach_range_new.resize(ranges.size());

std::vector<ReadRange> flatten_holes;
size_t index = 0;
flatten_holes.reserve(
std::accumulate(holes_foreach_range_orig.begin(), holes_foreach_range_orig.end(),
0, [](int sum, const auto& v) { return sum + v.size(); }));
for (const auto& v : holes_foreach_range_orig) {
std::move(v.begin(), v.end(), std::back_inserter(flatten_holes));
}
for (size_t i = 0; i < ranges.size(); ++i) {
std::vector<ReadRange> current_range_holes;
const auto& range = ranges.at(i);
for (; index < flatten_holes.size(); ++index) {
const auto& hole = flatten_holes.at(index);
if (hole.offset >= range.offset + range.length) {
break;
}
if (!(hole.offset >= range.offset &&
hole.offset + hole.length <= range.offset + range.length)) {
throw parquet::ParquetException(
"Parquet error: holes not subset of read range");
}
current_range_holes.push_back({hole.offset, hole.length});
}
holes_foreach_range_new.at(i) = std::move(current_range_holes);
}
if (ranges.size() != holes_foreach_range_new.size()) {
throw parquet::ParquetException("ranges.size() != holes_foreach_range_new.size()");
}
}

// Add the given ranges to the cache, coalescing them where possible
virtual Status Cache(std::vector<ReadRange> ranges) {
ARROW_ASSIGN_OR_RAISE(
ranges, internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit,
options.range_size_limit));
std::vector<RangeCacheEntry> new_entries = MakeCacheEntries(ranges);
virtual Status Cache(std::vector<ReadRange> ranges,
std::vector<std::vector<ReadRange>> holes_foreach_range_orig) {
std::vector<std::vector<ReadRange>> holes_foreach_range_new;
CoalesceRanges(ranges, holes_foreach_range_orig, holes_foreach_range_new);

std::vector<RangeCacheEntry> new_entries =
MakeCacheEntries(ranges, holes_foreach_range_new);
// Add new entries, themselves ordered by offset
if (entries.size() > 0) {
std::vector<RangeCacheEntry> merged(entries.size() + new_entries.size());
Expand Down Expand Up @@ -205,21 +312,32 @@ struct ReadRangeCache::Impl {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it != entries.end() && it->range.Contains(range)) {
auto fut = MaybeRead(&*it);
ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
const auto fut = MaybeRead(&*it);
const auto result = fut.result();
if (!result.ok()) {
throw parquet::ParquetException(
"Parquet error: read failed for one of the sub range");
}

if (options.lazy && options.prefetch_limit > 0) {
int64_t num_prefetched = 0;
for (auto next_it = it + 1;
next_it != entries.end() && num_prefetched < options.prefetch_limit;
++next_it) {
if (!next_it->future.is_valid()) {
next_it->future =
file->ReadAsync(ctx, next_it->range.offset, next_it->range.length);
std::unique_ptr<Buffer> buffer;
next_it->future = DoAsyncRead(next_it->range, next_it->holes, buffer);
next_it->buffer = std::move(buffer);
}
++num_prefetched;
}
}
return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);

const auto actual_start =
GetActualBufferOffset(range.offset, it->range.offset, it->holes);
const auto actual_end =
GetActualBufferOffset(range.offset + range.length, it->range.offset, it->holes);
return SliceBuffer(it->buffer, actual_start, actual_end - actual_start);
}
return Status::Invalid("ReadRangeCache did not find matching cache entry");
}
Expand Down Expand Up @@ -264,29 +382,37 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {

virtual ~LazyImpl() = default;

Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
Future<int64_t> MaybeRead(RangeCacheEntry* entry) override {
// Called by superclass Read()/WaitFor() so we have the lock
if (!entry->future.is_valid()) {
entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length);
std::unique_ptr<Buffer> buffer;
entry->future = DoAsyncRead(entry->range, entry->holes, buffer);
entry->buffer = std::move(buffer);
}
return entry->future;
}

std::vector<RangeCacheEntry> MakeCacheEntries(
const std::vector<ReadRange>& ranges) override {
const std::vector<ReadRange>& ranges,
std::vector<std::vector<ReadRange>>& holes_foreach_range) override {
std::vector<RangeCacheEntry> new_entries;
new_entries.reserve(ranges.size());
for (const auto& range : ranges) {
for (size_t i = 0; i < ranges.size(); ++i) {
const auto& range = ranges[i];
auto& holes = holes_foreach_range[i];
auto temp_buffer = std::make_unique<Buffer>(NULLPTR, 0);
auto temp_future = Future<int64_t>();
// In the lazy variant, don't read data here - later, a call to Read or WaitFor
// will call back to MaybeRead (under the lock) which will fill the future.
new_entries.emplace_back(range, Future<std::shared_ptr<Buffer>>());
new_entries.emplace_back(range, holes, temp_future, temp_buffer);
}
return new_entries;
}

Status Cache(std::vector<ReadRange> ranges) override {
Status Cache(std::vector<ReadRange> ranges,
std::vector<std::vector<ReadRange>> holes_foreach_range) override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::Cache(std::move(ranges));
return ReadRangeCache::Impl::Cache(std::move(ranges), std::move(holes_foreach_range));
}

Result<std::shared_ptr<Buffer>> Read(ReadRange range) override {
Expand Down Expand Up @@ -317,8 +443,9 @@ ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file,

ReadRangeCache::~ReadRangeCache() = default;

Status ReadRangeCache::Cache(std::vector<ReadRange> ranges) {
return impl_->Cache(std::move(ranges));
Status ReadRangeCache::Cache(std::vector<ReadRange> ranges,
std::vector<std::vector<ReadRange>> holes_foreach_range) {
return impl_->Cache(std::move(ranges), std::move(holes_foreach_range));
}

Result<std::shared_ptr<Buffer>> ReadRangeCache::Read(ReadRange range) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/io/caching.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class ARROW_EXPORT ReadRangeCache {
///
/// The caller must ensure that the ranges do not overlap with each other,
/// nor with previously cached ranges. Otherwise, behaviour will be undefined.
Status Cache(std::vector<ReadRange> ranges);
Status Cache(std::vector<ReadRange> ranges,
std::vector<std::vector<ReadRange>> holes_foreach_range = {});

/// \brief Read a range previously given to Cache().
Result<std::shared_ptr<Buffer>> Read(ReadRange range);
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,21 @@ Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& ctx
ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); }));
}

Future<int64_t> RandomAccessFile::ReadAsync(const IOContext& ctx,
std::vector<ReadRange>& ranges, void* out) {
auto self = checked_pointer_cast<RandomAccessFile>(shared_from_this());
return DeferNotOk(internal::SubmitIO(
ctx, [self, ranges = std::move(ranges), out]() mutable -> Result<int64_t> {
int64_t read_size = 0;
for (const auto& r : ranges) {
RETURN_NOT_OK(
self->ReadAt(r.offset, r.length, static_cast<char*>(out) + read_size));
read_size += r.length;
}
return read_size;
}));
}

Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
int64_t nbytes) {
return ReadAsync(io_context(), position, nbytes);
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
int64_t nbytes);

/// EXPERIMENTAL: Read data asynchronously. The dest is provided by `out`
virtual Future<int64_t> ReadAsync(const IOContext& ctx, std::vector<ReadRange>& ranges,
void* out);

/// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);

Expand Down
7 changes: 5 additions & 2 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,18 @@ add_parquet_test(internals-test

set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON
SKIP_UNITY_BUILD_INCLUSION ON)
add_parquet_test(rowrange-read-test
SOURCES
row_range_test.cc
range_reader_test.cc
)

add_parquet_test(reader-test
SOURCES
column_reader_test.cc
level_conversion_test.cc
column_scanner_test.cc
reader_test.cc
range_reader_test.cc
row_range_test.cc
stream_reader_test.cc
test_util.cc)

Expand Down
15 changes: 11 additions & 4 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class FileReaderImpl : public FileReader {
ctx->included_leaves = included_leaves;
ctx->row_ranges_per_rg =
row_ranges_per_rg; // copy the shared pointer to extend its lifecycle
ctx->pre_buffer_enabled = properties().pre_buffer();
return GetReader(manifest_.schema_fields[i], ctx, out);
}

Expand Down Expand Up @@ -667,7 +668,13 @@ class LeafReader : public ColumnReaderImpl {
checkAndGetPageRanges(*row_ranges, page_ranges);

// part 1, skip decompressing & decoding unnecessary pages
page_reader->set_data_page_filter(RowRangesPageFilter(*row_ranges, page_ranges));
if (!ctx_->pre_buffer_enabled) {
page_reader->set_data_page_filter(
RowRangesPageFilter(*row_ranges, page_ranges));
} else {
// Pre buffer already skipped useless pages, so should not apply
// data_page_filter here again.
}

// part 2, skip unnecessary rows in necessary pages
record_reader_->set_record_skipper(
Expand Down Expand Up @@ -1158,7 +1165,7 @@ Status FileReaderImpl::GetRecordBatchReaderWithRowRanges(
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
reader_properties_.cache_options());
reader_properties_.cache_options(), row_ranges_per_rg);
END_PARQUET_CATCH_EXCEPTIONS
}

Expand Down Expand Up @@ -1366,7 +1373,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
reader_properties_.cache_options());
reader_properties_.cache_options(), {});
END_PARQUET_CATCH_EXCEPTIONS
}
::arrow::AsyncGenerator<RowGroupGenerator::RecordBatchGenerator> row_group_generator =
Expand Down Expand Up @@ -1403,7 +1410,7 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
BEGIN_PARQUET_CATCH_EXCEPTIONS
parquet_reader()->PreBuffer(row_groups, column_indices,
reader_properties_.io_context(),
reader_properties_.cache_options());
reader_properties_.cache_options(), {});
END_PARQUET_CATCH_EXCEPTIONS
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/arrow/reader_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ struct ReaderContext {
bool filter_leaves;
std::shared_ptr<std::unordered_set<int>> included_leaves;
std::shared_ptr<std::vector<std::unique_ptr<RowRanges>>> row_ranges_per_rg;
bool pre_buffer_enabled;

bool IncludesLeaf(int leaf_index) const {
if (this->filter_leaves) {
Expand Down
Loading

0 comments on commit 047be8e

Please sign in to comment.