Skip to content

Commit

Permalink
ARROW-12597: [C++] Enable per-row-group parallelism in async Parquet …
Browse files Browse the repository at this point in the history
…reader

This adds an OptionalParallelForAsync which lets us have per-row-group parallelism without nested parallelism in the async Parquet reader. This also uses TransferAlways, taking care of ARROW-12916. `enable_parallel_column_conversion` is kept as it still affects the threaded scanner.

Closes #10482 from lidavidm/arrow-12597

Authored-by: David Li <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
lidavidm committed Jun 15, 2021
1 parent 889291b commit b73bcf0
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 64 deletions.
6 changes: 1 addition & 5 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,7 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
parquet_scan_options->arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options->arrow_reader_properties->io_context());
// TODO: ARROW-12597 will let us enable parallel conversion
if (!options->use_threads) {
arrow_properties.set_use_threads(
parquet_scan_options->enable_parallel_column_conversion);
}
arrow_properties.set_use_threads(options->use_threads);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
/// across files and columns.
/// across files and columns. Only affects the threaded reader; the async reader
/// will parallelize across columns if use_threads is enabled.
bool enable_parallel_column_conversion = false;
};

Expand Down
14 changes: 11 additions & 3 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,

struct TestFormatParams {
bool use_async;
bool use_threads;
int num_batches;
int items_per_batch;

Expand All @@ -318,7 +319,8 @@ struct TestFormatParams {
std::string ToString() const {
// GTest requires this to be alphanumeric
std::stringstream ss;
ss << (use_async ? "Async" : "Sync") << num_batches << "b" << items_per_batch << "r";
ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
<< num_batches << "b" << items_per_batch << "r";
return ss.str();
}

Expand All @@ -328,8 +330,12 @@ struct TestFormatParams {
}

static std::vector<TestFormatParams> Values() {
std::vector<TestFormatParams> values{{/*async=*/false, 16, 1024},
{/*async=*/true, 16, 1024}};
std::vector<TestFormatParams> values;
for (const bool async : std::vector<bool>{true, false}) {
for (const bool use_threads : std::vector<bool>{true, false}) {
values.push_back(TestFormatParams{async, use_threads, 16, 1024});
}
}
return values;
}
};
Expand Down Expand Up @@ -511,6 +517,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
auto dataset = std::make_shared<FragmentDataset>(schema, FragmentVector{fragment});
ScannerBuilder builder(dataset, opts_);
ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async));
ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));
EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches());
return MakeMapIterator([](TaggedRecordBatch tagged) { return tagged.record_batch; },
Expand All @@ -519,6 +526,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,

// Scan the fragment directly, without using the scanner.
RecordBatchIterator PhysicalBatches(std::shared_ptr<Fragment> fragment) {
opts_->use_threads = GetParam().use_threads;
if (GetParam().use_async) {
EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_));
EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen)));
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "arrow/util/string.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/ubsan.h"
#include "arrow/util/vector.h"
#include "arrow/visitor_inline.h"

#include "generated/File_generated.h" // IWYU pragma: export
Expand Down Expand Up @@ -1368,12 +1369,10 @@ Future<IpcFileRecordBatchGenerator::Item> IpcFileRecordBatchGenerator::operator(
auto read_messages = All(std::move(messages));
if (executor_) read_messages = executor_->Transfer(read_messages);
read_dictionaries_ = read_messages.Then(
[=](const std::vector<Result<std::shared_ptr<Message>>> maybe_messages)
[=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-> Status {
std::vector<std::shared_ptr<Message>> messages(state->num_dictionaries());
for (size_t i = 0; i < messages.size(); i++) {
ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
}
ARROW_ASSIGN_OR_RAISE(auto messages,
arrow::internal::UnwrapOrRaise(maybe_messages));
return ReadDictionaries(state.get(), std::move(messages));
});
}
Expand Down
37 changes: 37 additions & 0 deletions cpp/src/arrow/util/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <vector>

#include "arrow/status.h"
#include "arrow/util/functional.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/vector.h"

namespace arrow {
namespace internal {
Expand All @@ -44,6 +46,21 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,
return st;
}

template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> ParallelForAsync(
std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
std::vector<Future<R>> futures(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
}
return All(std::move(futures))
.Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {
return UnwrapOrRaise(results);
});
}

// A parallelizer that takes a `Status(int)` function and calls it with
// arguments between 0 and `num_tasks - 1`, in sequence or in parallel,
// depending on the input boolean.
Expand All @@ -61,5 +78,25 @@ Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
}
}

// A parallelizer that takes a `Result<R>(int index, T item)` function and
// calls it with each item from the input array, in sequence or in parallel,
// depending on the input boolean.

template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> OptionalParallelForAsync(
bool use_threads, std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
if (use_threads) {
return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor);
} else {
std::vector<R> result(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(result[i], func(i, inputs[i]));
}
return result;
}
}

} // namespace internal
} // namespace arrow
13 changes: 13 additions & 0 deletions cpp/src/arrow/util/vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,18 @@ Result<std::vector<T>> UnwrapOrRaise(std::vector<Result<T>>&& results) {
return std::move(out);
}

template <typename T>
Result<std::vector<T>> UnwrapOrRaise(const std::vector<Result<T>>& results) {
std::vector<T> out;
out.reserve(results.size());
for (const auto& result : results) {
if (!result.ok()) {
return result.status();
}
out.push_back(result.ValueUnsafe());
}
return std::move(out);
}

} // namespace internal
} // namespace arrow
110 changes: 60 additions & 50 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,12 @@ class FileReaderImpl : public FileReader {
const std::vector<int>& indices,
std::shared_ptr<Table>* table) override;

// Helper method used by ReadRowGroups/Generator - read the given row groups/columns,
// skipping bounds checks and pre-buffering.
Status DecodeRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices, std::shared_ptr<Table>* table);
// Helper method used by ReadRowGroups - read the given row groups/columns, skipping
// bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
// alive in async contexts.
Future<std::shared_ptr<Table>> DecodeRowGroups(
std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor);

Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* table) override {
Expand Down Expand Up @@ -1007,10 +1009,9 @@ class RowGroupGenerator {
return SubmitRead(cpu_executor_, reader, row_group, column_indices);
}
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
// TODO(ARROW-12916): always transfer here
if (cpu_executor_) ready = cpu_executor_->Transfer(ready);
return ready.Then([=]() -> ::arrow::Result<RecordBatchGenerator> {
return ReadOneRowGroup(reader, row_group, column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
});
}

Expand All @@ -1024,31 +1025,25 @@ class RowGroupGenerator {
::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
const int row_group, const std::vector<int>& column_indices) {
if (!cpu_executor) {
return Future<RecordBatchGenerator>::MakeFinished(
ReadOneRowGroup(self, row_group, column_indices));
return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
}
// If we have an executor, then force transfer (even if I/O was complete)
return ::arrow::DeferNotOk(
cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
row_group, column_indices));
}

static ::arrow::Result<RecordBatchGenerator> ReadOneRowGroup(
std::shared_ptr<FileReaderImpl> self, const int row_group,
const std::vector<int>& column_indices) {
std::shared_ptr<::arrow::Table> table;
static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
const int row_group, const std::vector<int>& column_indices) {
// Skips bound checks/pre-buffering, since we've done that already
RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
::arrow::RecordBatchVector batches;
while (true) {
std::shared_ptr<::arrow::RecordBatch> batch;
RETURN_NOT_OK(table_reader->ReadNext(&batch));
if (!batch) {
break;
}
batches.push_back(batch);
}
return ::arrow::MakeVectorGenerator(std::move(batches));
return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
.Then([](const std::shared_ptr<Table>& table)
-> ::arrow::Result<RecordBatchGenerator> {
::arrow::TableBatchReader table_reader(*table);
::arrow::RecordBatchVector batches;
RETURN_NOT_OK(table_reader.ReadAll(&batches));
return ::arrow::MakeVectorGenerator(std::move(batches));
});
}

std::shared_ptr<FileReaderImpl> arrow_reader_;
Expand Down Expand Up @@ -1104,34 +1099,49 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
END_PARQUET_CATCH_EXCEPTIONS
}

return DecodeRowGroups(row_groups, column_indices, out);
auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
/*cpu_executor=*/nullptr);
ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
return Status::OK();
}

// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice
Status FileReaderImpl::DecodeRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) {
// `self` is used solely to keep `this` alive in an async context - but we use this
// in a sync context too so use `this` over `self`
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));

::arrow::ChunkedArrayVector columns(readers.size());
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(readers.size()), [&](int i) {
return ReadColumn(static_cast<int>(i), row_groups, readers[i].get(), &columns[i]);
}));

int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
} else {
for (int i : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();

auto read_column = [row_groups, self, this](size_t i,
std::shared_ptr<ColumnReaderImpl> reader)
-> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> {
std::shared_ptr<::arrow::ChunkedArray> column;
RETURN_NOT_OK(ReadColumn(static_cast<int>(i), row_groups, reader.get(), &column));
return column;
};
auto make_table = [result_schema, row_groups, self,
this](const ::arrow::ChunkedArrayVector& columns)
-> ::arrow::Result<std::shared_ptr<Table>> {
int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
} else {
for (int i : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
}
}
}

*out = Table::Make(std::move(result_schema), std::move(columns), num_rows);
return (*out)->Validate();
auto table = Table::Make(std::move(result_schema), columns, num_rows);
RETURN_NOT_OK(table->Validate());
return table;
};
return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
std::move(readers), read_column,
cpu_executor)
.Then(std::move(make_table));
}

std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
Expand Down

0 comments on commit b73bcf0

Please sign in to comment.