diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 8c325d21da19b..0ebbd0a53335e 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -362,11 +362,7 @@ Future> ParquetFileFormat::GetReader parquet_scan_options->arrow_reader_properties->cache_options()); arrow_properties.set_io_context( parquet_scan_options->arrow_reader_properties->io_context()); - // TODO: ARROW-12597 will let us enable parallel conversion - if (!options->use_threads) { - arrow_properties.set_use_threads( - parquet_scan_options->enable_parallel_column_conversion); - } + arrow_properties.set_use_threads(options->use_threads); std::unique_ptr arrow_reader; RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), std::move(arrow_properties), diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 8286e2776cbeb..347f40320469e 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -222,7 +222,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions { /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a /// scan is already parallelized across input files to avoid thread contention. This /// option will be removed after support is added for simultaneous parallelization - /// across files and columns. + /// across files and columns. Only affects the threaded reader; the async reader + /// will parallelize across columns if use_threads is enabled. bool enable_parallel_column_conversion = false; }; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 1e4222eec8c33..39223eba35b89 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -310,6 +310,7 @@ class DatasetFixtureMixinWithParam : public DatasetFixtureMixin, struct TestFormatParams { bool use_async; + bool use_threads; int num_batches; int items_per_batch; @@ -318,7 +319,8 @@ struct TestFormatParams { std::string ToString() const { // GTest requires this to be alphanumeric std::stringstream ss; - ss << (use_async ? "Async" : "Sync") << num_batches << "b" << items_per_batch << "r"; + ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial") + << num_batches << "b" << items_per_batch << "r"; return ss.str(); } @@ -328,8 +330,12 @@ struct TestFormatParams { } static std::vector Values() { - std::vector values{{/*async=*/false, 16, 1024}, - {/*async=*/true, 16, 1024}}; + std::vector values; + for (const bool async : std::vector{true, false}) { + for (const bool use_threads : std::vector{true, false}) { + values.push_back(TestFormatParams{async, use_threads, 16, 1024}); + } + } return values; } }; @@ -511,6 +517,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, auto dataset = std::make_shared(schema, FragmentVector{fragment}); ScannerBuilder builder(dataset, opts_); ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async)); + ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads)); EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish()); EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches()); return MakeMapIterator([](TaggedRecordBatch tagged) { return tagged.record_batch; }, @@ -519,6 +526,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, // Scan the fragment directly, without using the scanner. RecordBatchIterator PhysicalBatches(std::shared_ptr fragment) { + opts_->use_threads = GetParam().use_threads; if (GetParam().use_async) { EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen))); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 7c26bce913df8..a3c345cc440e5 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -54,6 +54,7 @@ #include "arrow/util/string.h" #include "arrow/util/thread_pool.h" #include "arrow/util/ubsan.h" +#include "arrow/util/vector.h" #include "arrow/visitor_inline.h" #include "generated/File_generated.h" // IWYU pragma: export @@ -1368,12 +1369,10 @@ Future IpcFileRecordBatchGenerator::operator( auto read_messages = All(std::move(messages)); if (executor_) read_messages = executor_->Transfer(read_messages); read_dictionaries_ = read_messages.Then( - [=](const std::vector>> maybe_messages) + [=](const std::vector>>& maybe_messages) -> Status { - std::vector> messages(state->num_dictionaries()); - for (size_t i = 0; i < messages.size(); i++) { - ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]); - } + ARROW_ASSIGN_OR_RAISE(auto messages, + arrow::internal::UnwrapOrRaise(maybe_messages)); return ReadDictionaries(state.get(), std::move(messages)); }); } diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h index e56a71b91af72..80f60fbdb3676 100644 --- a/cpp/src/arrow/util/parallel.h +++ b/cpp/src/arrow/util/parallel.h @@ -21,7 +21,9 @@ #include #include "arrow/status.h" +#include "arrow/util/functional.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/vector.h" namespace arrow { namespace internal { @@ -44,6 +46,21 @@ Status ParallelFor(int num_tasks, FUNCTION&& func, return st; } +template ::ValueType> +Future> ParallelForAsync( + std::vector inputs, FUNCTION&& func, + Executor* executor = internal::GetCpuThreadPool()) { + std::vector> futures(inputs.size()); + for (size_t i = 0; i < inputs.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i]))); + } + return All(std::move(futures)) + .Then([](const std::vector>& results) -> Result> { + return UnwrapOrRaise(results); + }); +} + // A parallelizer that takes a `Status(int)` function and calls it with // arguments between 0 and `num_tasks - 1`, in sequence or in parallel, // depending on the input boolean. @@ -61,5 +78,25 @@ Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func, } } +// A parallelizer that takes a `Result(int index, T item)` function and +// calls it with each item from the input array, in sequence or in parallel, +// depending on the input boolean. + +template ::ValueType> +Future> OptionalParallelForAsync( + bool use_threads, std::vector inputs, FUNCTION&& func, + Executor* executor = internal::GetCpuThreadPool()) { + if (use_threads) { + return ParallelForAsync(std::move(inputs), std::forward(func), executor); + } else { + std::vector result(inputs.size()); + for (size_t i = 0; i < inputs.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(result[i], func(i, inputs[i])); + } + return result; + } +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/vector.h b/cpp/src/arrow/util/vector.h index b9f2e2a45aa18..3ef0074aa9de8 100644 --- a/cpp/src/arrow/util/vector.h +++ b/cpp/src/arrow/util/vector.h @@ -133,5 +133,18 @@ Result> UnwrapOrRaise(std::vector>&& results) { return std::move(out); } +template +Result> UnwrapOrRaise(const std::vector>& results) { + std::vector out; + out.reserve(results.size()); + for (const auto& result : results) { + if (!result.ok()) { + return result.status(); + } + out.push_back(result.ValueUnsafe()); + } + return std::move(out); +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 14eb74958058c..4f5f79c964a1b 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -293,10 +293,12 @@ class FileReaderImpl : public FileReader { const std::vector& indices, std::shared_ptr* table) override; - // Helper method used by ReadRowGroups/Generator - read the given row groups/columns, - // skipping bounds checks and pre-buffering. - Status DecodeRowGroups(const std::vector& row_groups, - const std::vector& indices, std::shared_ptr
* table); + // Helper method used by ReadRowGroups - read the given row groups/columns, skipping + // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader + // alive in async contexts. + Future> DecodeRowGroups( + std::shared_ptr self, const std::vector& row_groups, + const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor); Status ReadRowGroups(const std::vector& row_groups, std::shared_ptr
* table) override { @@ -1007,10 +1009,9 @@ class RowGroupGenerator { return SubmitRead(cpu_executor_, reader, row_group, column_indices); } auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); - // TODO(ARROW-12916): always transfer here - if (cpu_executor_) ready = cpu_executor_->Transfer(ready); - return ready.Then([=]() -> ::arrow::Result { - return ReadOneRowGroup(reader, row_group, column_indices); + if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); + return ready.Then([=]() -> ::arrow::Future { + return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); }); } @@ -1024,31 +1025,25 @@ class RowGroupGenerator { ::arrow::internal::Executor* cpu_executor, std::shared_ptr self, const int row_group, const std::vector& column_indices) { if (!cpu_executor) { - return Future::MakeFinished( - ReadOneRowGroup(self, row_group, column_indices)); + return ReadOneRowGroup(cpu_executor, self, row_group, column_indices); } // If we have an executor, then force transfer (even if I/O was complete) - return ::arrow::DeferNotOk( - cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices)); + return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self, + row_group, column_indices)); } - static ::arrow::Result ReadOneRowGroup( - std::shared_ptr self, const int row_group, - const std::vector& column_indices) { - std::shared_ptr<::arrow::Table> table; + static ::arrow::Future ReadOneRowGroup( + ::arrow::internal::Executor* cpu_executor, std::shared_ptr self, + const int row_group, const std::vector& column_indices) { // Skips bound checks/pre-buffering, since we've done that already - RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table)); - auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table); - ::arrow::RecordBatchVector batches; - while (true) { - std::shared_ptr<::arrow::RecordBatch> batch; - RETURN_NOT_OK(table_reader->ReadNext(&batch)); - if (!batch) { - break; - } - batches.push_back(batch); - } - return ::arrow::MakeVectorGenerator(std::move(batches)); + return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor) + .Then([](const std::shared_ptr
& table) + -> ::arrow::Result { + ::arrow::TableBatchReader table_reader(*table); + ::arrow::RecordBatchVector batches; + RETURN_NOT_OK(table_reader.ReadAll(&batches)); + return ::arrow::MakeVectorGenerator(std::move(batches)); + }); } std::shared_ptr arrow_reader_; @@ -1104,34 +1099,49 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, END_PARQUET_CATCH_EXCEPTIONS } - return DecodeRowGroups(row_groups, column_indices, out); + auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices, + /*cpu_executor=*/nullptr); + ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult()); + return Status::OK(); } -// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice -Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups, - const std::vector& column_indices, - std::shared_ptr
* out) { +Future> FileReaderImpl::DecodeRowGroups( + std::shared_ptr self, const std::vector& row_groups, + const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor) { + // `self` is used solely to keep `this` alive in an async context - but we use this + // in a sync context too so use `this` over `self` std::vector> readers; std::shared_ptr<::arrow::Schema> result_schema; RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); - - ::arrow::ChunkedArrayVector columns(readers.size()); - RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( - reader_properties_.use_threads(), static_cast(readers.size()), [&](int i) { - return ReadColumn(static_cast(i), row_groups, readers[i].get(), &columns[i]); - })); - - int64_t num_rows = 0; - if (!columns.empty()) { - num_rows = columns[0]->length(); - } else { - for (int i : row_groups) { - num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows(); + // OptionalParallelForAsync requires an executor + if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); + + auto read_column = [row_groups, self, this](size_t i, + std::shared_ptr reader) + -> ::arrow::Result> { + std::shared_ptr<::arrow::ChunkedArray> column; + RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); + return column; + }; + auto make_table = [result_schema, row_groups, self, + this](const ::arrow::ChunkedArrayVector& columns) + -> ::arrow::Result> { + int64_t num_rows = 0; + if (!columns.empty()) { + num_rows = columns[0]->length(); + } else { + for (int i : row_groups) { + num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows(); + } } - } - - *out = Table::Make(std::move(result_schema), std::move(columns), num_rows); - return (*out)->Validate(); + auto table = Table::Make(std::move(result_schema), columns, num_rows); + RETURN_NOT_OK(table->Validate()); + return table; + }; + return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(), + std::move(readers), read_column, + cpu_executor) + .Then(std::move(make_table)); } std::shared_ptr FileReaderImpl::RowGroup(int row_group_index) {