Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12208: [C++] Add the ability to run async tasks without using the CPU thread pool #9892

Closed
wants to merge 9 commits into from
23 changes: 12 additions & 11 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,8 @@ Result<std::shared_ptr<TableReader>> MakeTableReader(

Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto cpu_executor = internal::GetCpuThreadPool();
internal::Executor* cpu_executor, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
std::shared_ptr<BaseStreamingReader> reader;
reader = std::make_shared<SerialStreamingReader>(
io_context, cpu_executor, input, read_options, parse_options, convert_options);
Expand Down Expand Up @@ -1036,8 +1035,9 @@ Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto io_context = io::IOContext(pool);
auto reader_fut = MakeStreamingReader(io_context, std::move(input), read_options,
parse_options, convert_options);
auto cpu_executor = internal::GetCpuThreadPool();
auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
read_options, parse_options, convert_options);
auto reader_result = reader_fut.result();
ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
return reader;
Expand All @@ -1047,19 +1047,20 @@ Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto reader_fut = MakeStreamingReader(io_context, std::move(input), read_options,
parse_options, convert_options);
auto cpu_executor = internal::GetCpuThreadPool();
auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
read_options, parse_options, convert_options);
auto reader_result = reader_fut.result();
ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
return reader;
}

Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
return MakeStreamingReader(io_context, std::move(input), read_options, parse_options,
convert_options);
internal::Executor* cpu_executor, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
parse_options, convert_options);
}

} // namespace csv
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/util/future.h"
#include "arrow/util/thread_pool.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow/util/type_fwd.h was probably sufficient.

#include "arrow/util/visibility.h"

namespace arrow {
Expand Down Expand Up @@ -72,7 +73,8 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
/// parsing (see ARROW-11889)
static Future<std::shared_ptr<StreamingReader>> MakeAsync(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&);
internal::Executor* cpu_executor, const ReadOptions&, const ParseOptions&,
const ConvertOptions&);

static Result<std::shared_ptr<StreamingReader>> Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
Expand Down
75 changes: 42 additions & 33 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,46 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr<ScanTask>& scan_t
return Status::OK();
}

Future<> WriteInternal(const ScanOptions& scan_options, WriteState& state,
ScanTaskVector scan_tasks, internal::Executor* cpu_executor) {
// Store a mapping from partitions (represened by their formatted partition expressions)
// to a WriteQueue which flushes batches into that partition's output file. In principle
// any thread could produce a batch for any partition, so each task alternates between
// pushing batches and flushing them to disk.
std::vector<Future<>> scan_futs;
auto task_group = scan_options.TaskGroup();

for (const auto& scan_task : scan_tasks) {
if (scan_task->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync(cpu_executor));
std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
[&, scan_task](std::shared_ptr<RecordBatch> batch) {
return WriteNextBatch(state, scan_task, std::move(batch));
};
scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
} else {
task_group->Append([&, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());

for (auto maybe_batch : batches) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch)));
}

return Status::OK();
});
}
}
scan_futs.push_back(task_group->FinishAsync());
return AllComplete(scan_futs);
}

} // namespace

Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
std::shared_ptr<Scanner> scanner) {
RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));

auto task_group = scanner->options()->TaskGroup();

// Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent:
//
// - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing
Expand All @@ -440,7 +472,6 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
ARROW_ASSIGN_OR_RAISE(auto fragment_it, scanner->GetFragments());
ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector());
ScanTaskVector scan_tasks;
std::vector<Future<>> scan_futs;

for (const auto& fragment : fragments) {
auto options = std::make_shared<ScanOptions>(*scanner->options());
Expand All @@ -454,38 +485,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
}
}

// Store a mapping from partitions (represened by their formatted partition expressions)
// to a WriteQueue which flushes batches into that partition's output file. In principle
// any thread could produce a batch for any partition, so each task alternates between
// pushing batches and flushing them to disk.
WriteState state(write_options);
auto res = internal::RunSynchronously<arrow::detail::Empty>(
[&](internal::Executor* cpu_executor) -> Future<> {
return WriteInternal(*scanner->options(), state, std::move(scan_tasks),
cpu_executor);
},
scanner->options()->use_threads);
RETURN_NOT_OK(res);

for (const auto& scan_task : scan_tasks) {
if (scan_task->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync());
std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
[&, scan_task](std::shared_ptr<RecordBatch> batch) {
return WriteNextBatch(state, scan_task, std::move(batch));
};
scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
} else {
task_group->Append([&, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());

for (auto maybe_batch : batches) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch)));
}

return Status::OK();
});
}
}
RETURN_NOT_OK(task_group->Finish());
auto scan_futs_all_done = AllComplete(scan_futs);
RETURN_NOT_OK(scan_futs_all_done.status());

task_group = scanner->options()->TaskGroup();
auto task_group = scanner->options()->TaskGroup();
for (const auto& part_queue : state.queues) {
task_group->Append([&] { return part_queue.second->writer()->Finish(); });
}
Expand Down
23 changes: 16 additions & 7 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ namespace dataset {

using internal::checked_cast;
using internal::checked_pointer_cast;
using internal::Executor;
using internal::SerialExecutor;
using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;

Result<std::unordered_set<std::string>> GetColumnNames(
Expand Down Expand Up @@ -107,13 +109,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
auto read_options = csv_scan_options->read_options;
// Multithreaded conversion of individual files would lead to excessive thread
// contention when ScanTasks are also executed in multiple threads, so we disable it
// here.
// here. Also, this is a no-op since the streaming CSV reader is currently serial
read_options.use_threads = false;
return read_options;
}

static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
internal::Executor* cpu_executor,
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
Expand All @@ -136,7 +139,8 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
}

return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input),
reader_options, parse_options, convert_options)
cpu_executor, reader_options, parse_options,
convert_options)
.Then(
[](const std::shared_ptr<csv::StreamingReader>& maybe_reader)
-> Result<std::shared_ptr<csv::StreamingReader>> { return maybe_reader; },
Expand All @@ -151,8 +155,12 @@ static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
auto open_reader_fut = OpenReaderAsync(source, format, scan_options, pool);
return open_reader_fut.result();
bool use_threads = (scan_options != nullptr && scan_options->use_threads);
return internal::RunSynchronously<std::shared_ptr<csv::StreamingReader>>(
[&](Executor* executor) {
return OpenReaderAsync(source, format, executor, scan_options, pool);
},
use_threads);
}

/// \brief A ScanTask backed by an Csv file.
Expand All @@ -166,14 +174,15 @@ class CsvScanTask : public ScanTask {
source_(fragment->source()) {}

Result<RecordBatchIterator> Execute() override {
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync());
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you lookup ScanOptions::use_threads? Or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit awkward. This method would not be used by either Scanner::ToTable or FileSystemDataset::Write (they will call ExecuteAsync). The only way this could be called is if the user called Scan() directly.

In that case we cannot really use RunInSerialExecutor here because it is returning an iterator.

I suppose that means there is no truly serial way to call Scan() on CSV. It wouldn't be parallel exactly, it would just be juggling the work between the calling thread, the I/O executor, and the CPU executor.

return MakeGeneratorIterator(std::move(gen));
}

bool supports_async() const override { return true; }

Result<RecordBatchGenerator> ExecuteAsync() override {
auto reader_fut = OpenReaderAsync(source_, *format_, options(), options()->pool);
Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor) override {
auto reader_fut =
OpenReaderAsync(source_, *format_, cpu_executor, options(), options()->pool);
auto generator_fut = reader_fut.Then(
[](const std::shared_ptr<csv::StreamingReader>& reader) -> RecordBatchGenerator {
return [reader]() { return reader->ReadNextAsync(); };
Expand Down
32 changes: 22 additions & 10 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/scanner_internal.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
Expand All @@ -47,6 +48,8 @@ std::vector<std::string> ScanOptions::MaterializedFields() const {
return fields;
}

using arrow::internal::Executor;
using arrow::internal::SerialExecutor;
using arrow::internal::TaskGroup;

std::shared_ptr<TaskGroup> ScanOptions::TaskGroup() const {
Expand All @@ -61,7 +64,7 @@ Result<RecordBatchIterator> InMemoryScanTask::Execute() {
return MakeVectorIterator(record_batches_);
}

Result<RecordBatchGenerator> ScanTask::ExecuteAsync() {
Result<RecordBatchGenerator> ScanTask::ExecuteAsync(internal::Executor*) {
return Status::NotImplemented("Async is not implemented for this scan task yet");
}

Expand Down Expand Up @@ -200,6 +203,13 @@ struct TableAssemblyState {
};

Result<std::shared_ptr<Table>> Scanner::ToTable() {
return internal::RunSynchronously<std::shared_ptr<Table>>(
[this](Executor* executor) { return ToTableInternal(executor); },
scan_options_->use_threads);
}

Future<std::shared_ptr<Table>> Scanner::ToTableInternal(
internal::Executor* cpu_executor) {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
auto task_group = scan_options_->TaskGroup();

Expand All @@ -215,7 +225,7 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {

auto id = scan_task_id++;
if (scan_task->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync());
ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync(cpu_executor));
auto scan_fut = CollectAsyncGenerator(std::move(scan_gen))
.Then([state, id](const RecordBatchVector& rbs) {
state->Emplace(rbs, id);
Expand All @@ -230,14 +240,16 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
});
}
}
// Wait for all async tasks to complete, or the first error
RETURN_NOT_OK(AllComplete(scan_futures).status());

// Wait for all sync tasks to complete, or the first error.
RETURN_NOT_OK(task_group->Finish());

return Table::FromRecordBatches(scan_options_->projected_schema,
FlattenRecordBatchVector(std::move(state->batches)));
auto scan_options = scan_options_;
scan_futures.push_back(task_group->FinishAsync());
// Wait for all tasks to complete, or the first error
return AllComplete(scan_futures)
.Then(
[scan_options, state](const detail::Empty&) -> Result<std::shared_ptr<Table>> {
return Table::FromRecordBatches(
scan_options->projected_schema,
FlattenRecordBatchVector(std::move(state->batches)));
});
}

} // namespace dataset
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <utility>
Expand All @@ -31,11 +32,12 @@
#include "arrow/dataset/visibility.h"
#include "arrow/memory_pool.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/type_fwd.h"

namespace arrow {
using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;

using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;

namespace dataset {

constexpr int64_t kDefaultBatchSize = 1 << 20;
Expand Down Expand Up @@ -103,7 +105,7 @@ class ARROW_DS_EXPORT ScanTask {
/// resulting from the Scan. Execution semantics are encapsulated in the
/// particular ScanTask implementation
virtual Result<RecordBatchIterator> Execute() = 0;
virtual Result<RecordBatchGenerator> ExecuteAsync();
virtual Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor);
virtual bool supports_async() const;

virtual ~ScanTask() = default;
Expand Down Expand Up @@ -175,6 +177,8 @@ class ARROW_DS_EXPORT Scanner {
const std::shared_ptr<ScanOptions>& options() const { return scan_options_; }

protected:
Future<std::shared_ptr<Table>> ToTableInternal(internal::Executor* cpu_executor);

std::shared_ptr<Dataset> dataset_;
// TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments
std::shared_ptr<Fragment> fragment_;
Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/dataset/scanner_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/scanner.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/logging.h"

namespace arrow {

using internal::checked_cast;
using internal::Executor;

namespace dataset {

Expand Down Expand Up @@ -171,22 +173,15 @@ class FilterAndProjectScanTask : public ScanTask {
options_->pool);
}

Result<RecordBatchIterator> Execute() override {
if (task_->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync());
return MakeGeneratorIterator(std::move(gen));
} else {
return ExecuteSync();
}
}
Result<RecordBatchIterator> Execute() override { return ExecuteSync(); }

Result<RecordBatchGenerator> ExecuteAsync() override {
Result<RecordBatchGenerator> ExecuteAsync(Executor* cpu_executor) override {
if (!task_->supports_async()) {
return Status::Invalid(
"ExecuteAsync should not have been called on FilterAndProjectScanTask if the "
"source task did not support async");
}
ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync());
ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync(cpu_executor));

ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
SimplifyWithGuarantee(options()->filter, partition_));
Expand Down
Loading