Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12597: [C++] Enable per-row-group parallelism in async Parquet reader #10482

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,7 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
parquet_scan_options->arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options->arrow_reader_properties->io_context());
// TODO: ARROW-12597 will let us enable parallel conversion
if (!options->use_threads) {
arrow_properties.set_use_threads(
parquet_scan_options->enable_parallel_column_conversion);
}
arrow_properties.set_use_threads(options->use_threads);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
/// across files and columns.
/// across files and columns. Only affects the threaded reader; the async reader
/// will parallelize across columns if use_threads is enabled.
bool enable_parallel_column_conversion = false;
};

Expand Down
14 changes: 11 additions & 3 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ class DatasetFixtureMixinWithParam : public DatasetFixtureMixin,

struct TestFormatParams {
bool use_async;
bool use_threads;
int num_batches;
int items_per_batch;

Expand All @@ -318,7 +319,8 @@ struct TestFormatParams {
std::string ToString() const {
// GTest requires this to be alphanumeric
std::stringstream ss;
ss << (use_async ? "Async" : "Sync") << num_batches << "b" << items_per_batch << "r";
ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial")
<< num_batches << "b" << items_per_batch << "r";
return ss.str();
}

Expand All @@ -328,8 +330,12 @@ struct TestFormatParams {
}

static std::vector<TestFormatParams> Values() {
std::vector<TestFormatParams> values{{/*async=*/false, 16, 1024},
{/*async=*/true, 16, 1024}};
std::vector<TestFormatParams> values;
for (const bool async : std::vector<bool>{true, false}) {
for (const bool use_threads : std::vector<bool>{true, false}) {
values.push_back(TestFormatParams{async, use_threads, 16, 1024});
}
}
return values;
}
};
Expand Down Expand Up @@ -511,6 +517,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
auto dataset = std::make_shared<FragmentDataset>(schema, FragmentVector{fragment});
ScannerBuilder builder(dataset, opts_);
ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async));
ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads));
EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches());
return MakeMapIterator([](TaggedRecordBatch tagged) { return tagged.record_batch; },
Expand All @@ -519,6 +526,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,

// Scan the fragment directly, without using the scanner.
RecordBatchIterator PhysicalBatches(std::shared_ptr<Fragment> fragment) {
opts_->use_threads = GetParam().use_threads;
if (GetParam().use_async) {
EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_));
EXPECT_OK_AND_ASSIGN(auto batch_it, MakeGeneratorIterator(std::move(batch_gen)));
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "arrow/util/string.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/ubsan.h"
#include "arrow/util/vector.h"
#include "arrow/visitor_inline.h"

#include "generated/File_generated.h" // IWYU pragma: export
Expand Down Expand Up @@ -1368,12 +1369,10 @@ Future<IpcFileRecordBatchGenerator::Item> IpcFileRecordBatchGenerator::operator(
auto read_messages = All(std::move(messages));
if (executor_) read_messages = executor_->Transfer(read_messages);
read_dictionaries_ = read_messages.Then(
[=](const std::vector<Result<std::shared_ptr<Message>>> maybe_messages)
[=](const std::vector<Result<std::shared_ptr<Message>>>& maybe_messages)
-> Status {
std::vector<std::shared_ptr<Message>> messages(state->num_dictionaries());
for (size_t i = 0; i < messages.size(); i++) {
ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
}
ARROW_ASSIGN_OR_RAISE(auto messages,
arrow::internal::UnwrapOrRaise(maybe_messages));
return ReadDictionaries(state.get(), std::move(messages));
});
}
Expand Down
37 changes: 37 additions & 0 deletions cpp/src/arrow/util/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <vector>

#include "arrow/status.h"
#include "arrow/util/functional.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/vector.h"

namespace arrow {
namespace internal {
Expand All @@ -44,6 +46,21 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,
return st;
}

template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> ParallelForAsync(
std::vector<T> inputs, FUNCTION&& func,
pitrou marked this conversation as resolved.
Show resolved Hide resolved
Executor* executor = internal::GetCpuThreadPool()) {
std::vector<Future<R>> futures(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
}
return All(std::move(futures))
.Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {
pitrou marked this conversation as resolved.
Show resolved Hide resolved
return UnwrapOrRaise(results);
});
}

// A parallelizer that takes a `Status(int)` function and calls it with
// arguments between 0 and `num_tasks - 1`, in sequence or in parallel,
// depending on the input boolean.
Expand All @@ -61,5 +78,25 @@ Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
}
}

// A parallelizer that takes a `Result<R>(int index, T item)` function and
// calls it with each item from the input array, in sequence or in parallel,
// depending on the input boolean.

template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> OptionalParallelForAsync(
bool use_threads, std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
if (use_threads) {
return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor);
} else {
std::vector<R> result(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(result[i], func(i, inputs[i]));
}
return result;
}
}

} // namespace internal
} // namespace arrow
13 changes: 13 additions & 0 deletions cpp/src/arrow/util/vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,18 @@ Result<std::vector<T>> UnwrapOrRaise(std::vector<Result<T>>&& results) {
return std::move(out);
}

template <typename T>
Result<std::vector<T>> UnwrapOrRaise(const std::vector<Result<T>>& results) {
std::vector<T> out;
out.reserve(results.size());
for (const auto& result : results) {
if (!result.ok()) {
return result.status();
}
out.push_back(result.ValueUnsafe());
}
return std::move(out);
}

} // namespace internal
} // namespace arrow
110 changes: 60 additions & 50 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,12 @@ class FileReaderImpl : public FileReader {
const std::vector<int>& indices,
std::shared_ptr<Table>* table) override;

// Helper method used by ReadRowGroups/Generator - read the given row groups/columns,
// skipping bounds checks and pre-buffering.
Status DecodeRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices, std::shared_ptr<Table>* table);
// Helper method used by ReadRowGroups - read the given row groups/columns, skipping
// bounds checks and pre-buffering. Takes a shared_ptr to self to keep the reader
// alive in async contexts.
Future<std::shared_ptr<Table>> DecodeRowGroups(
std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor);

Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* table) override {
Expand Down Expand Up @@ -1007,10 +1009,9 @@ class RowGroupGenerator {
return SubmitRead(cpu_executor_, reader, row_group, column_indices);
}
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
// TODO(ARROW-12916): always transfer here
if (cpu_executor_) ready = cpu_executor_->Transfer(ready);
return ready.Then([=]() -> ::arrow::Result<RecordBatchGenerator> {
return ReadOneRowGroup(reader, row_group, column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> {
return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices);
});
}

Expand All @@ -1024,31 +1025,25 @@ class RowGroupGenerator {
::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
const int row_group, const std::vector<int>& column_indices) {
if (!cpu_executor) {
return Future<RecordBatchGenerator>::MakeFinished(
ReadOneRowGroup(self, row_group, column_indices));
return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
}
// If we have an executor, then force transfer (even if I/O was complete)
return ::arrow::DeferNotOk(
cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
row_group, column_indices));
}

static ::arrow::Result<RecordBatchGenerator> ReadOneRowGroup(
std::shared_ptr<FileReaderImpl> self, const int row_group,
const std::vector<int>& column_indices) {
std::shared_ptr<::arrow::Table> table;
static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
const int row_group, const std::vector<int>& column_indices) {
// Skips bound checks/pre-buffering, since we've done that already
RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
::arrow::RecordBatchVector batches;
while (true) {
std::shared_ptr<::arrow::RecordBatch> batch;
RETURN_NOT_OK(table_reader->ReadNext(&batch));
if (!batch) {
break;
}
batches.push_back(batch);
}
return ::arrow::MakeVectorGenerator(std::move(batches));
return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
.Then([](const std::shared_ptr<Table>& table)
-> ::arrow::Result<RecordBatchGenerator> {
pitrou marked this conversation as resolved.
Show resolved Hide resolved
::arrow::TableBatchReader table_reader(*table);
::arrow::RecordBatchVector batches;
RETURN_NOT_OK(table_reader.ReadAll(&batches));
return ::arrow::MakeVectorGenerator(std::move(batches));
});
}

std::shared_ptr<FileReaderImpl> arrow_reader_;
Expand Down Expand Up @@ -1104,34 +1099,49 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
END_PARQUET_CATCH_EXCEPTIONS
}

return DecodeRowGroups(row_groups, column_indices, out);
auto fut = DecodeRowGroups(/*self=*/nullptr, row_groups, column_indices,
/*cpu_executor=*/nullptr);
ARROW_ASSIGN_OR_RAISE(*out, fut.MoveResult());
return Status::OK();
}

// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice
Status FileReaderImpl::DecodeRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
pitrou marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) {
// `self` is used solely to keep `this` alive in an async context - but we use this
// in a sync context too so use `this` over `self`
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));

::arrow::ChunkedArrayVector columns(readers.size());
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast<int>(readers.size()), [&](int i) {
return ReadColumn(static_cast<int>(i), row_groups, readers[i].get(), &columns[i]);
}));

int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
} else {
for (int i : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
// OptionalParallelForAsync requires an executor
if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();

auto read_column = [row_groups, self, this](size_t i,
std::shared_ptr<ColumnReaderImpl> reader)
-> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> {
std::shared_ptr<::arrow::ChunkedArray> column;
RETURN_NOT_OK(ReadColumn(static_cast<int>(i), row_groups, reader.get(), &column));
return column;
};
auto make_table = [result_schema, row_groups, self,
this](const ::arrow::ChunkedArrayVector& columns)
-> ::arrow::Result<std::shared_ptr<Table>> {
int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
} else {
for (int i : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
}
}
}

*out = Table::Make(std::move(result_schema), std::move(columns), num_rows);
return (*out)->Validate();
auto table = Table::Make(std::move(result_schema), columns, num_rows);
RETURN_NOT_OK(table->Validate());
return table;
};
return ::arrow::internal::OptionalParallelForAsync(reader_properties_.use_threads(),
std::move(readers), read_column,
cpu_executor)
.Then(std::move(make_table));
}

std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
Expand Down