From b73bcf0a0ddacd5adc80389b10b8c1b3820ee97a Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 15 Jun 2021 11:22:25 -0400
Subject: [PATCH] ARROW-12597: [C++] Enable per-row-group parallelism in async
Parquet reader
This adds an OptionalParallelForAsync which lets us have per-row-group parallelism without nested parallelism in the async Parquet reader. This also uses TransferAlways, taking care of ARROW-12916. `enable_parallel_column_conversion` is kept as it still affects the threaded scanner.
Closes #10482 from lidavidm/arrow-12597
Authored-by: David Li
Signed-off-by: David Li
---
cpp/src/arrow/dataset/file_parquet.cc | 6 +-
cpp/src/arrow/dataset/file_parquet.h | 3 +-
cpp/src/arrow/dataset/test_util.h | 14 +++-
cpp/src/arrow/ipc/reader.cc | 9 +--
cpp/src/arrow/util/parallel.h | 37 +++++++++
cpp/src/arrow/util/vector.h | 13 +++
cpp/src/parquet/arrow/reader.cc | 110 ++++++++++++++------------
7 files changed, 128 insertions(+), 64 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 8c325d21da19b..0ebbd0a53335e 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -362,11 +362,7 @@ Future> ParquetFileFormat::GetReader
parquet_scan_options->arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options->arrow_reader_properties->io_context());
- // TODO: ARROW-12597 will let us enable parallel conversion
- if (!options->use_threads) {
- arrow_properties.set_use_threads(
- parquet_scan_options->enable_parallel_column_conversion);
- }
+ arrow_properties.set_use_threads(options->use_threads);
std::unique_ptr arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 8286e2776cbeb..347f40320469e 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -222,7 +222,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
- /// across files and columns.
+ /// across files and columns. Only affects the threaded reader; the async reader
+ /// will parallelize across columns if use_threads is enabled.
bool enable_parallel_column_conversion = false;
};
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 1e4222eec8c33..39223eba35b89 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -310,6 +310,7 @@ class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
struct TestFormatParams {
bool use_async;
+ bool use_threads;
int num_batches;
int items_per_batch;
@@ -318,7 +319,8 @@ struct TestFormatParams {
std::string ToString() const {
// GTest requires this to be alphanumeric
std::stringstream ss;
- ss << (use_async ? "Async" : "Sync") << num_batches << "b" << items_per_batch << "r";
+ ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+ << num_batches << "b" << items_per_batch << "r";
return ss.str();
}
@@ -328,8 +330,12 @@ struct TestFormatParams {
}
static std::vector Values() {
- std::vector values{{/*async=*/false, 16, 1024},
- {/*async=*/true, 16, 1024}};
+ std::vector values;
+ for (const bool async : std::vector{true, false}) {
+ for (const bool use_threads : std::vector{true, false}) {
+ values.push_back(TestFormatParams{async, use_threads, 16, 1024});
+ }
+ }
return values;
}
};
@@ -511,6 +517,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin,
auto dataset = std::make_shared(schema, FragmentVector{fragment});
ScannerBuilder builder(dataset, opts_);
ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async));
+ ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));
EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches());
return MakeMapIterator([](TaggedRecordBatch tagged) { return tagged.record_batch; },
@@ -519,6 +526,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin,
// Scan the fragment directly, without using the scanner.
RecordBatchIterator PhysicalBatches(std::shared_ptr fragment) {
+ opts_->use_threads = GetParam().use_threads;
if (GetParam().use_async) {
EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_));
EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen)));
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 7c26bce913df8..a3c345cc440e5 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -54,6 +54,7 @@
#include "arrow/util/string.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/ubsan.h"
+#include "arrow/util/vector.h"
#include "arrow/visitor_inline.h"
#include "generated/File_generated.h" // IWYU pragma: export
@@ -1368,12 +1369,10 @@ Future IpcFileRecordBatchGenerator::operator(
auto read_messages = All(std::move(messages));
if (executor_) read_messages = executor_->Transfer(read_messages);
read_dictionaries_ = read_messages.Then(
- [=](const std::vector>> maybe_messages)
+ [=](const std::vector>>& maybe_messages)
-> Status {
- std::vector> messages(state->num_dictionaries());
- for (size_t i = 0; i < messages.size(); i++) {
- ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
- }
+ ARROW_ASSIGN_OR_RAISE(auto messages,
+ arrow::internal::UnwrapOrRaise(maybe_messages));
return ReadDictionaries(state.get(), std::move(messages));
});
}
diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h
index e56a71b91af72..80f60fbdb3676 100644
--- a/cpp/src/arrow/util/parallel.h
+++ b/cpp/src/arrow/util/parallel.h
@@ -21,7 +21,9 @@
#include
#include "arrow/status.h"
+#include "arrow/util/functional.h"
#include "arrow/util/thread_pool.h"
+#include "arrow/util/vector.h"
namespace arrow {
namespace internal {
@@ -44,6 +46,21 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,
return st;
}
+template ::ValueType>
+Future> ParallelForAsync(
+ std::vector inputs, FUNCTION&& func,
+ Executor* executor = internal::GetCpuThreadPool()) {
+ std::vector> futures(inputs.size());
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
+ }
+ return All(std::move(futures))
+ .Then([](const std::vector>& results) -> Result> {
+ return UnwrapOrRaise(results);
+ });
+}
+
// A parallelizer that takes a `Status(int)` function and calls it with
// arguments between 0 and `num_tasks - 1`, in sequence or in parallel,
// depending on the input boolean.
@@ -61,5 +78,25 @@ Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
}
}
+// A parallelizer that takes a `Result(int index, T item)` function and
+// calls it with each item from the input array, in sequence or in parallel,
+// depending on the input boolean.
+
+template ::ValueType>
+Future> OptionalParallelForAsync(
+ bool use_threads, std::vector inputs, FUNCTION&& func,
+ Executor* executor = internal::GetCpuThreadPool()) {
+ if (use_threads) {
+ return ParallelForAsync(std::move(inputs), std::forward(func), executor);
+ } else {
+ std::vector result(inputs.size());
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(result[i], func(i, inputs[i]));
+ }
+ return result;
+ }
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/arrow/util/vector.h b/cpp/src/arrow/util/vector.h
index b9f2e2a45aa18..3ef0074aa9de8 100644
--- a/cpp/src/arrow/util/vector.h
+++ b/cpp/src/arrow/util/vector.h
@@ -133,5 +133,18 @@ Result> UnwrapOrRaise(std::vector>&& results) {
return std::move(out);
}
+template
+Result> UnwrapOrRaise(const std::vector>& results) {
+ std::vector out;
+ out.reserve(results.size());
+ for (const auto& result : results) {
+ if (!result.ok()) {
+ return result.status();
+ }
+ out.push_back(result.ValueUnsafe());
+ }
+ return std::move(out);
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 14eb74958058c..4f5f79c964a1b 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -293,10 +293,12 @@ class FileReaderImpl : public FileReader {
const std::vector& indices,
std::shared_ptr* table) override;
- // Helper method used by ReadRowGroups/Generator - read the given row groups/columns,
- // skipping bounds checks and pre-buffering.
- Status DecodeRowGroups(const std::vector& row_groups,
- const std::vector& indices, std::shared_ptr* table);
+ // Helper method used by ReadRowGroups - read the given row groups/columns, skipping
+ // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
+ // alive in async contexts.
+ Future> DecodeRowGroups(
+ std::shared_ptr self, const std::vector& row_groups,
+ const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor);
Status ReadRowGroups(const std::vector& row_groups,
std::shared_ptr* table) override {
@@ -1007,10 +1009,9 @@ class RowGroupGenerator {
return SubmitRead(cpu_executor_, reader, row_group, column_indices);
}
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
- // TODO(ARROW-12916): always transfer here
- if (cpu_executor_) ready = cpu_executor_->Transfer(ready);
- return ready.Then([=]() -> ::arrow::Result {
- return ReadOneRowGroup(reader, row_group, column_indices);
+ if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
+ return ready.Then([=]() -> ::arrow::Future {
+ return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
});
}
@@ -1024,31 +1025,25 @@ class RowGroupGenerator {
::arrow::internal::Executor* cpu_executor, std::shared_ptr self,
const int row_group, const std::vector& column_indices) {
if (!cpu_executor) {
- return Future::MakeFinished(
- ReadOneRowGroup(self, row_group, column_indices));
+ return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
}
// If we have an executor, then force transfer (even if I/O was complete)
- return ::arrow::DeferNotOk(
- cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+ return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
+ row_group, column_indices));
}
- static ::arrow::Result ReadOneRowGroup(
- std::shared_ptr self, const int row_group,
- const std::vector& column_indices) {
- std::shared_ptr<::arrow::Table> table;
+ static ::arrow::Future ReadOneRowGroup(
+ ::arrow::internal::Executor* cpu_executor, std::shared_ptr self,
+ const int row_group, const std::vector& column_indices) {
// Skips bound checks/pre-buffering, since we've done that already
- RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
- auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
- ::arrow::RecordBatchVector batches;
- while (true) {
- std::shared_ptr<::arrow::RecordBatch> batch;
- RETURN_NOT_OK(table_reader->ReadNext(&batch));
- if (!batch) {
- break;
- }
- batches.push_back(batch);
- }
- return ::arrow::MakeVectorGenerator(std::move(batches));
+ return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
+ .Then([](const std::shared_ptr& table)
+ -> ::arrow::Result {
+ ::arrow::TableBatchReader table_reader(*table);
+ ::arrow::RecordBatchVector batches;
+ RETURN_NOT_OK(table_reader.ReadAll(&batches));
+ return ::arrow::MakeVectorGenerator(std::move(batches));
+ });
}
std::shared_ptr arrow_reader_;
@@ -1104,34 +1099,49 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups,
END_PARQUET_CATCH_EXCEPTIONS
}
- return DecodeRowGroups(row_groups, column_indices, out);
+ auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
+ /*cpu_executor=*/nullptr);
+ ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
+ return Status::OK();
}
-// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice
-Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups,
- const std::vector& column_indices,
- std::shared_ptr* out) {
+Future> FileReaderImpl::DecodeRowGroups(
+ std::shared_ptr self, const std::vector& row_groups,
+ const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor) {
+ // `self` is used solely to keep `this` alive in an async context - but we use this
+ // in a sync context too so use `this` over `self`
std::vector> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
-
- ::arrow::ChunkedArrayVector columns(readers.size());
- RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
- reader_properties_.use_threads(), static_cast(readers.size()), [&](int i) {
- return ReadColumn(static_cast(i), row_groups, readers[i].get(), &columns[i]);
- }));
-
- int64_t num_rows = 0;
- if (!columns.empty()) {
- num_rows = columns[0]->length();
- } else {
- for (int i : row_groups) {
- num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
+ // OptionalParallelForAsync requires an executor
+ if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
+
+ auto read_column = [row_groups, self, this](size_t i,
+ std::shared_ptr reader)
+ -> ::arrow::Result> {
+ std::shared_ptr<::arrow::ChunkedArray> column;
+ RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column));
+ return column;
+ };
+ auto make_table = [result_schema, row_groups, self,
+ this](const ::arrow::ChunkedArrayVector& columns)
+ -> ::arrow::Result> {
+ int64_t num_rows = 0;
+ if (!columns.empty()) {
+ num_rows = columns[0]->length();
+ } else {
+ for (int i : row_groups) {
+ num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
+ }
}
- }
-
- *out = Table::Make(std::move(result_schema), std::move(columns), num_rows);
- return (*out)->Validate();
+ auto table = Table::Make(std::move(result_schema), columns, num_rows);
+ RETURN_NOT_OK(table->Validate());
+ return table;
+ };
+ return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
+ std::move(readers), read_column,
+ cpu_executor)
+ .Then(std::move(make_table));
}
std::shared_ptr FileReaderImpl::RowGroup(int row_group_index) {