From 8aa0e0c592fe8d39a728e56f8d45e38b80f8f4c0 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 29 Apr 2021 11:11:34 -0400
Subject: [PATCH 1/6] ARROW-12597: [C++] Avoid nested parallelism in Parquet
reader
---
cpp/src/arrow/util/parallel.h | 40 +++++++++++++++++
cpp/src/parquet/arrow/reader.cc | 80 +++++++++++++++++++++++----------
2 files changed, 97 insertions(+), 23 deletions(-)
diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h
index e56a71b91af72..a04011e004ff5 100644
--- a/cpp/src/arrow/util/parallel.h
+++ b/cpp/src/arrow/util/parallel.h
@@ -21,6 +21,7 @@
#include
#include "arrow/status.h"
+#include "arrow/util/functional.h"
#include "arrow/util/thread_pool.h"
namespace arrow {
@@ -44,6 +45,25 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,
return st;
}
+template ::ValueType>
+Future> ParallelForAsync(
+ std::vector inputs, FUNCTION&& func,
+ Executor* executor = internal::GetCpuThreadPool()) {
+ std::vector> futures(inputs.size());
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
+ }
+ return All(std::move(futures))
+ .Then([](const std::vector>& results) -> Result> {
+ std::vector result(results.size());
+ for (size_t i = 0; i < results.size(); i++) {
+ ARROW_ASSIGN_OR_RAISE(result[i], results[i]);
+ }
+ return result;
+ });
+}
+
// A parallelizer that takes a `Status(int)` function and calls it with
// arguments between 0 and `num_tasks - 1`, in sequence or in parallel,
// depending on the input boolean.
@@ -61,5 +81,25 @@ Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
}
}
+// A parallelizer that takes a `Result(int index, T item)` function and
+// calls it with each item from the input array, in sequence or in parallel,
+// depending on the input boolean.
+
+template ::ValueType>
+Future> OptionalParallelForAsync(
+ bool use_threads, std::vector inputs, FUNCTION&& func,
+ Executor* executor = internal::GetCpuThreadPool()) {
+ if (use_threads) {
+ return ParallelForAsync(std::move(inputs), std::forward(func), executor);
+ } else {
+ std::vector result(inputs.size());
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(result[i], func(i, inputs[i]));
+ }
+ return result;
+ }
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 14eb74958058c..e341e40820118 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -293,10 +293,14 @@ class FileReaderImpl : public FileReader {
const std::vector& indices,
std::shared_ptr* table) override;
- // Helper method used by ReadRowGroups/Generator - read the given row groups/columns,
+ // Helper method used by ReadRowGroups - read the given row groups/columns,
// skipping bounds checks and pre-buffering.
Status DecodeRowGroups(const std::vector& row_groups,
const std::vector& indices, std::shared_ptr* table);
+ // Async equivalent helper for the generator reader.
+ Future> DecodeRowGroups(std::shared_ptr self,
+ const std::vector& row_groups,
+ const std::vector& column_indices);
Status ReadRowGroups(const std::vector& row_groups,
std::shared_ptr* table) override {
@@ -1009,7 +1013,7 @@ class RowGroupGenerator {
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
// TODO(ARROW-12916): always transfer here
if (cpu_executor_) ready = cpu_executor_->Transfer(ready);
- return ready.Then([=]() -> ::arrow::Result {
+ return ready.Then([=]() -> ::arrow::Future {
return ReadOneRowGroup(reader, row_group, column_indices);
});
}
@@ -1023,32 +1027,29 @@ class RowGroupGenerator {
static ::arrow::Future SubmitRead(
::arrow::internal::Executor* cpu_executor, std::shared_ptr self,
const int row_group, const std::vector& column_indices) {
- if (!cpu_executor) {
- return Future::MakeFinished(
- ReadOneRowGroup(self, row_group, column_indices));
- }
- // If we have an executor, then force transfer (even if I/O was complete)
- return ::arrow::DeferNotOk(
- cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+ // TODO: cpu_executor needs to get passed all the way through
+ return ReadOneRowGroup(self, row_group, column_indices);
}
- static ::arrow::Result ReadOneRowGroup(
+ static ::arrow::Future ReadOneRowGroup(
std::shared_ptr self, const int row_group,
const std::vector& column_indices) {
- std::shared_ptr<::arrow::Table> table;
// Skips bound checks/pre-buffering, since we've done that already
- RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
- auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
- ::arrow::RecordBatchVector batches;
- while (true) {
- std::shared_ptr<::arrow::RecordBatch> batch;
- RETURN_NOT_OK(table_reader->ReadNext(&batch));
- if (!batch) {
- break;
- }
- batches.push_back(batch);
- }
- return ::arrow::MakeVectorGenerator(std::move(batches));
+ return self->DecodeRowGroups(self, {row_group}, column_indices)
+ .Then([](const std::shared_ptr& table)
+ -> ::arrow::Result {
+ auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+ ::arrow::RecordBatchVector batches;
+ while (true) {
+ std::shared_ptr<::arrow::RecordBatch> batch;
+ RETURN_NOT_OK(table_reader->ReadNext(&batch));
+ if (!batch) {
+ break;
+ }
+ batches.push_back(batch);
+ }
+ return ::arrow::MakeVectorGenerator(std::move(batches));
+ });
}
std::shared_ptr arrow_reader_;
@@ -1134,6 +1135,39 @@ Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups,
return (*out)->Validate();
}
+Future> FileReaderImpl::DecodeRowGroups(
+ std::shared_ptr self, const std::vector& row_groups,
+ const std::vector& column_indices) {
+ std::vector> readers;
+ std::shared_ptr<::arrow::Schema> result_schema;
+ RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
+
+ return ::arrow::internal::OptionalParallelForAsync(
+ reader_properties_.use_threads(), std::move(readers),
+ [row_groups, self](int i, std::shared_ptr reader)
+ -> ::arrow::Result> {
+ std::shared_ptr<::arrow::ChunkedArray> column;
+ RETURN_NOT_OK(self->ReadColumn(static_cast(i), row_groups,
+ reader.get(), &column));
+ return column;
+ })
+ .Then([result_schema, row_groups, self](const ::arrow::ChunkedArrayVector& columns)
+ -> ::arrow::Result> {
+ int64_t num_rows = 0;
+ if (!columns.empty()) {
+ num_rows = columns[0]->length();
+ } else {
+ for (int i : row_groups) {
+ num_rows += self->parquet_reader()->metadata()->RowGroup(i)->num_rows();
+ }
+ }
+
+ auto table = Table::Make(std::move(result_schema), columns, num_rows);
+ RETURN_NOT_OK(table->Validate());
+ return table;
+ });
+}
+
std::shared_ptr FileReaderImpl::RowGroup(int row_group_index) {
return std::make_shared(this, row_group_index);
}
From 78f47ad740385a1f97ac1893f311b71caa6cb496 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 4 Jun 2021 08:26:41 -0500
Subject: [PATCH 2/6] ARROW-12597: [C++] Enable per-batch fanout when scanning
Parquet
---
cpp/src/arrow/dataset/file_parquet.cc | 6 +-----
cpp/src/arrow/dataset/file_parquet.h | 3 ++-
cpp/src/arrow/dataset/test_util.h | 14 ++++++++++---
cpp/src/parquet/arrow/reader.cc | 29 +++++++++++++++++----------
4 files changed, 32 insertions(+), 20 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 8c325d21da19b..0ebbd0a53335e 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -362,11 +362,7 @@ Future> ParquetFileFormat::GetReader
parquet_scan_options->arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options->arrow_reader_properties->io_context());
- // TODO: ARROW-12597 will let us enable parallel conversion
- if (!options->use_threads) {
- arrow_properties.set_use_threads(
- parquet_scan_options->enable_parallel_column_conversion);
- }
+ arrow_properties.set_use_threads(options->use_threads);
std::unique_ptr arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 8286e2776cbeb..347f40320469e 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -222,7 +222,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
- /// across files and columns.
+ /// across files and columns. Only affects the threaded reader; the async reader
+ /// will parallelize across columns if use_threads is enabled.
bool enable_parallel_column_conversion = false;
};
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index 1e4222eec8c33..39223eba35b89 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -310,6 +310,7 @@ class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,
struct TestFormatParams {
bool use_async;
+ bool use_threads;
int num_batches;
int items_per_batch;
@@ -318,7 +319,8 @@ struct TestFormatParams {
std::string ToString() const {
// GTest requires this to be alphanumeric
std::stringstream ss;
- ss << (use_async ? "Async" : "Sync") << num_batches << "b" << items_per_batch << "r";
+ ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
+ << num_batches << "b" << items_per_batch << "r";
return ss.str();
}
@@ -328,8 +330,12 @@ struct TestFormatParams {
}
static std::vector Values() {
- std::vector values{{/*async=*/false, 16, 1024},
- {/*async=*/true, 16, 1024}};
+ std::vector values;
+ for (const bool async : std::vector{true, false}) {
+ for (const bool use_threads : std::vector{true, false}) {
+ values.push_back(TestFormatParams{async, use_threads, 16, 1024});
+ }
+ }
return values;
}
};
@@ -511,6 +517,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin,
auto dataset = std::make_shared(schema, FragmentVector{fragment});
ScannerBuilder builder(dataset, opts_);
ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async));
+ ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));
EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches());
return MakeMapIterator([](TaggedRecordBatch tagged) { return tagged.record_batch; },
@@ -519,6 +526,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin,
// Scan the fragment directly, without using the scanner.
RecordBatchIterator PhysicalBatches(std::shared_ptr fragment) {
+ opts_->use_threads = GetParam().use_threads;
if (GetParam().use_async) {
EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_));
EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen)));
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index e341e40820118..a35c9dcbb98c5 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -298,9 +298,9 @@ class FileReaderImpl : public FileReader {
Status DecodeRowGroups(const std::vector& row_groups,
const std::vector& indices, std::shared_ptr* table);
// Async equivalent helper for the generator reader.
- Future> DecodeRowGroups(std::shared_ptr self,
- const std::vector& row_groups,
- const std::vector& column_indices);
+ Future> DecodeRowGroups(
+ std::shared_ptr self, const std::vector& row_groups,
+ const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor);
Status ReadRowGroups(const std::vector& row_groups,
std::shared_ptr* table) override {
@@ -1014,7 +1014,7 @@ class RowGroupGenerator {
// TODO(ARROW-12916): always transfer here
if (cpu_executor_) ready = cpu_executor_->Transfer(ready);
return ready.Then([=]() -> ::arrow::Future {
- return ReadOneRowGroup(reader, row_group, column_indices);
+ return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
});
}
@@ -1027,15 +1027,19 @@ class RowGroupGenerator {
static ::arrow::Future SubmitRead(
::arrow::internal::Executor* cpu_executor, std::shared_ptr self,
const int row_group, const std::vector& column_indices) {
- // TODO: cpu_executor needs to get passed all the way through
- return ReadOneRowGroup(self, row_group, column_indices);
+ if (!cpu_executor) {
+ return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
+ }
+ // If we have an executor, then force transfer (even if I/O was complete)
+ return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
+ row_group, column_indices));
}
static ::arrow::Future ReadOneRowGroup(
- std::shared_ptr self, const int row_group,
- const std::vector& column_indices) {
+ ::arrow::internal::Executor* cpu_executor, std::shared_ptr self,
+ const int row_group, const std::vector& column_indices) {
// Skips bound checks/pre-buffering, since we've done that already
- return self->DecodeRowGroups(self, {row_group}, column_indices)
+ return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
.Then([](const std::shared_ptr& table)
-> ::arrow::Result {
auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
@@ -1137,11 +1141,13 @@ Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups,
Future> FileReaderImpl::DecodeRowGroups(
std::shared_ptr self, const std::vector& row_groups,
- const std::vector& column_indices) {
+ const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor) {
std::vector> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
+ // OptionalParallelForAsync requires an executor
+ if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
return ::arrow::internal::OptionalParallelForAsync(
reader_properties_.use_threads(), std::move(readers),
[row_groups, self](int i, std::shared_ptr reader)
@@ -1150,7 +1156,8 @@ Future> FileReaderImpl::DecodeRowGroups(
RETURN_NOT_OK(self->ReadColumn(static_cast(i), row_groups,
reader.get(), &column));
return column;
- })
+ },
+ cpu_executor)
.Then([result_schema, row_groups, self](const ::arrow::ChunkedArrayVector& columns)
-> ::arrow::Result> {
int64_t num_rows = 0;
From 90f799571203e01f7d01e665e6eab8d8002d6774 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 8 Jun 2021 09:05:08 -0400
Subject: [PATCH 3/6] ARROW-12916: [C++] Always transfer to executor
---
cpp/src/parquet/arrow/reader.cc | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index a35c9dcbb98c5..93da66f9c4cff 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1011,8 +1011,7 @@ class RowGroupGenerator {
return SubmitRead(cpu_executor_, reader, row_group, column_indices);
}
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
- // TODO(ARROW-12916): always transfer here
- if (cpu_executor_) ready = cpu_executor_->Transfer(ready);
+ if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
return ready.Then([=]() -> ::arrow::Future {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
});
From f6e88ea3dfc4f31228eff7783a79def8aa6013e1 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 10 Jun 2021 12:36:41 -0400
Subject: [PATCH 4/6] ARROW-12597: [C++] Address review feedback
---
cpp/src/arrow/ipc/reader.cc | 9 +++--
cpp/src/arrow/util/parallel.h | 7 ++--
cpp/src/arrow/util/vector.h | 13 +++++++
cpp/src/parquet/arrow/reader.cc | 62 +++++++++++++++------------------
4 files changed, 47 insertions(+), 44 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 7c26bce913df8..a3c345cc440e5 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -54,6 +54,7 @@
#include "arrow/util/string.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/ubsan.h"
+#include "arrow/util/vector.h"
#include "arrow/visitor_inline.h"
#include "generated/File_generated.h" // IWYU pragma: export
@@ -1368,12 +1369,10 @@ Future IpcFileRecordBatchGenerator::operator(
auto read_messages = All(std::move(messages));
if (executor_) read_messages = executor_->Transfer(read_messages);
read_dictionaries_ = read_messages.Then(
- [=](const std::vector>> maybe_messages)
+ [=](const std::vector>>& maybe_messages)
-> Status {
- std::vector> messages(state->num_dictionaries());
- for (size_t i = 0; i < messages.size(); i++) {
- ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
- }
+ ARROW_ASSIGN_OR_RAISE(auto messages,
+ arrow::internal::UnwrapOrRaise(maybe_messages));
return ReadDictionaries(state.get(), std::move(messages));
});
}
diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h
index a04011e004ff5..80f60fbdb3676 100644
--- a/cpp/src/arrow/util/parallel.h
+++ b/cpp/src/arrow/util/parallel.h
@@ -23,6 +23,7 @@
#include "arrow/status.h"
#include "arrow/util/functional.h"
#include "arrow/util/thread_pool.h"
+#include "arrow/util/vector.h"
namespace arrow {
namespace internal {
@@ -56,11 +57,7 @@ Future> ParallelForAsync(
}
return All(std::move(futures))
.Then([](const std::vector>& results) -> Result> {
- std::vector result(results.size());
- for (size_t i = 0; i < results.size(); i++) {
- ARROW_ASSIGN_OR_RAISE(result[i], results[i]);
- }
- return result;
+ return UnwrapOrRaise(results);
});
}
diff --git a/cpp/src/arrow/util/vector.h b/cpp/src/arrow/util/vector.h
index b9f2e2a45aa18..3ef0074aa9de8 100644
--- a/cpp/src/arrow/util/vector.h
+++ b/cpp/src/arrow/util/vector.h
@@ -133,5 +133,18 @@ Result> UnwrapOrRaise(std::vector>&& results) {
return std::move(out);
}
+template
+Result> UnwrapOrRaise(const std::vector>& results) {
+ std::vector out;
+ out.reserve(results.size());
+ for (const auto& result : results) {
+ if (!result.ok()) {
+ return result.status();
+ }
+ out.push_back(result.ValueUnsafe());
+ }
+ return std::move(out);
+}
+
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 93da66f9c4cff..d52f0b1794b18 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1041,16 +1041,9 @@ class RowGroupGenerator {
return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
.Then([](const std::shared_ptr& table)
-> ::arrow::Result {
- auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+ ::arrow::TableBatchReader table_reader(*table);
::arrow::RecordBatchVector batches;
- while (true) {
- std::shared_ptr<::arrow::RecordBatch> batch;
- RETURN_NOT_OK(table_reader->ReadNext(&batch));
- if (!batch) {
- break;
- }
- batches.push_back(batch);
- }
+ RETURN_NOT_OK(table_reader.ReadAll(&batches));
return ::arrow::MakeVectorGenerator(std::move(batches));
});
}
@@ -1144,34 +1137,35 @@ Future> FileReaderImpl::DecodeRowGroups(
std::vector> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
-
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
- return ::arrow::internal::OptionalParallelForAsync(
- reader_properties_.use_threads(), std::move(readers),
- [row_groups, self](int i, std::shared_ptr reader)
- -> ::arrow::Result> {
- std::shared_ptr<::arrow::ChunkedArray> column;
- RETURN_NOT_OK(self->ReadColumn(static_cast(i), row_groups,
- reader.get(), &column));
- return column;
- },
- cpu_executor)
- .Then([result_schema, row_groups, self](const ::arrow::ChunkedArrayVector& columns)
- -> ::arrow::Result> {
- int64_t num_rows = 0;
- if (!columns.empty()) {
- num_rows = columns[0]->length();
- } else {
- for (int i : row_groups) {
- num_rows += self->parquet_reader()->metadata()->RowGroup(i)->num_rows();
- }
- }
- auto table = Table::Make(std::move(result_schema), columns, num_rows);
- RETURN_NOT_OK(table->Validate());
- return table;
- });
+ auto read_column = [row_groups, self](int i, std::shared_ptr reader)
+ -> ::arrow::Result> {
+ std::shared_ptr<::arrow::ChunkedArray> column;
+ RETURN_NOT_OK(
+ self->ReadColumn(static_cast(i), row_groups, reader.get(), &column));
+ return column;
+ };
+ auto make_table = [result_schema, row_groups,
+ self](const ::arrow::ChunkedArrayVector& columns)
+ -> ::arrow::Result> {
+ int64_t num_rows = 0;
+ if (!columns.empty()) {
+ num_rows = columns[0]->length();
+ } else {
+ for (int i : row_groups) {
+ num_rows += self->parquet_reader()->metadata()->RowGroup(i)->num_rows();
+ }
+ }
+ auto table = Table::Make(std::move(result_schema), columns, num_rows);
+ RETURN_NOT_OK(table->Validate());
+ return table;
+ };
+ return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
+ std::move(readers), read_column,
+ cpu_executor)
+ .Then(std::move(make_table));
}
std::shared_ptr FileReaderImpl::RowGroup(int row_group_index) {
From 39fd5f5047fb78e912dbdfb8388d7366a2f8454b Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 10 Jun 2021 12:48:40 -0400
Subject: [PATCH 5/6] ARROW-12597: [C++] Combine ReadRowGroups
---
cpp/src/parquet/arrow/reader.cc | 54 +++++++++------------------------
1 file changed, 15 insertions(+), 39 deletions(-)
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index d52f0b1794b18..879a9010ab68d 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -293,11 +293,9 @@ class FileReaderImpl : public FileReader {
const std::vector& indices,
std::shared_ptr* table) override;
- // Helper method used by ReadRowGroups - read the given row groups/columns,
- // skipping bounds checks and pre-buffering.
- Status DecodeRowGroups(const std::vector& row_groups,
- const std::vector& indices, std::shared_ptr* table);
- // Async equivalent helper for the generator reader.
+ // Helper method used by ReadRowGroups - read the given row groups/columns, skipping
+ // bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
+ // alive in async contexts.
Future> DecodeRowGroups(
std::shared_ptr self, const std::vector& row_groups,
const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor);
@@ -1101,61 +1099,39 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups,
END_PARQUET_CATCH_EXCEPTIONS
}
- return DecodeRowGroups(row_groups, column_indices, out);
-}
-
-// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice
-Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups,
- const std::vector& column_indices,
- std::shared_ptr* out) {
- std::vector> readers;
- std::shared_ptr<::arrow::Schema> result_schema;
- RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
-
- ::arrow::ChunkedArrayVector columns(readers.size());
- RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
- reader_properties_.use_threads(), static_cast(readers.size()), [&](int i) {
- return ReadColumn(static_cast(i), row_groups, readers[i].get(), &columns[i]);
- }));
-
- int64_t num_rows = 0;
- if (!columns.empty()) {
- num_rows = columns[0]->length();
- } else {
- for (int i : row_groups) {
- num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
- }
- }
-
- *out = Table::Make(std::move(result_schema), std::move(columns), num_rows);
- return (*out)->Validate();
+ auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
+ /*cpu_executor=*/nullptr);
+ ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
+ return Status::OK();
}
Future> FileReaderImpl::DecodeRowGroups(
std::shared_ptr self, const std::vector& row_groups,
const std::vector& column_indices, ::arrow::internal::Executor* cpu_executor) {
+ // `self` is used solely to keep `this` alive in an async context - but we use this
+ // in a sync context too so use `this` over `self`
std::vector> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
- auto read_column = [row_groups, self](int i, std::shared_ptr reader)
+ auto read_column = [row_groups, self, this](int i,
+ std::shared_ptr reader)
-> ::arrow::Result> {
std::shared_ptr<::arrow::ChunkedArray> column;
- RETURN_NOT_OK(
- self->ReadColumn(static_cast(i), row_groups, reader.get(), &column));
+ RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column));
return column;
};
- auto make_table = [result_schema, row_groups,
- self](const ::arrow::ChunkedArrayVector& columns)
+ auto make_table = [result_schema, row_groups, self,
+ this](const ::arrow::ChunkedArrayVector& columns)
-> ::arrow::Result> {
int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
} else {
for (int i : row_groups) {
- num_rows += self->parquet_reader()->metadata()->RowGroup(i)->num_rows();
+ num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
}
}
auto table = Table::Make(std::move(result_schema), columns, num_rows);
From dc6320beffc6d9510a2b91608ed5e204dc32c9a4 Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 10 Jun 2021 13:58:40 -0400
Subject: [PATCH 6/6] ARROW-12597: [C++] Fix MSVC warning
---
cpp/src/parquet/arrow/reader.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 879a9010ab68d..4f5f79c964a1b 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1116,7 +1116,7 @@ Future> FileReaderImpl::DecodeRowGroups(
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
- auto read_column = [row_groups, self, this](int i,
+ auto read_column = [row_groups, self, this](size_t i,
std::shared_ptr reader)
-> ::arrow::Result> {
std::shared_ptr<::arrow::ChunkedArray> column;