Skip to content

Commit

Permalink
ARROW-12208: [C++] Add the ability to run async tasks without using t…
Browse files Browse the repository at this point in the history
…he CPU thread pool

Added a serial executor which can run async code serially without needing to create threads.  In addition, modifies the Scanner::ToTable and FileSystemDataset::Write to execute serially.  Scanner::Scan is still non-serial but I think this is OK.  The I/O thread pool is still being used however.  To truly remove all instances of std::thread we'd need to change the I/O context to use the serial executor but let's see if this makes R happy enough first.

Closes apache#9892 from westonpace/feature/arrow-12208

Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Neal Richardson <[email protected]>
  • Loading branch information
2 people authored and pull[bot] committed Dec 14, 2021
1 parent 889d671 commit 7000172
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 81 deletions.
23 changes: 12 additions & 11 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,8 @@ Result<std::shared_ptr<TableReader>> MakeTableReader(

Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto cpu_executor = internal::GetCpuThreadPool();
internal::Executor* cpu_executor, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
std::shared_ptr<BaseStreamingReader> reader;
reader = std::make_shared<SerialStreamingReader>(
io_context, cpu_executor, input, read_options, parse_options, convert_options);
Expand Down Expand Up @@ -1036,8 +1035,9 @@ Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto io_context = io::IOContext(pool);
auto reader_fut = MakeStreamingReader(io_context, std::move(input), read_options,
parse_options, convert_options);
auto cpu_executor = internal::GetCpuThreadPool();
auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
read_options, parse_options, convert_options);
auto reader_result = reader_fut.result();
ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
return reader;
Expand All @@ -1047,19 +1047,20 @@ Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto reader_fut = MakeStreamingReader(io_context, std::move(input), read_options,
parse_options, convert_options);
auto cpu_executor = internal::GetCpuThreadPool();
auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
read_options, parse_options, convert_options);
auto reader_result = reader_fut.result();
ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
return reader;
}

Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
return MakeStreamingReader(io_context, std::move(input), read_options, parse_options,
convert_options);
internal::Executor* cpu_executor, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
parse_options, convert_options);
}

} // namespace csv
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/util/future.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand Down Expand Up @@ -72,7 +73,8 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
/// parsing (see ARROW-11889)
static Future<std::shared_ptr<StreamingReader>> MakeAsync(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&);
internal::Executor* cpu_executor, const ReadOptions&, const ParseOptions&,
const ConvertOptions&);

static Result<std::shared_ptr<StreamingReader>> Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
Expand Down
75 changes: 42 additions & 33 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,46 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr<ScanTask>& scan_t
return Status::OK();
}

Future<> WriteInternal(const ScanOptions& scan_options, WriteState& state,
ScanTaskVector scan_tasks, internal::Executor* cpu_executor) {
// Store a mapping from partitions (represened by their formatted partition expressions)
// to a WriteQueue which flushes batches into that partition's output file. In principle
// any thread could produce a batch for any partition, so each task alternates between
// pushing batches and flushing them to disk.
std::vector<Future<>> scan_futs;
auto task_group = scan_options.TaskGroup();

for (const auto& scan_task : scan_tasks) {
if (scan_task->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync(cpu_executor));
std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
[&, scan_task](std::shared_ptr<RecordBatch> batch) {
return WriteNextBatch(state, scan_task, std::move(batch));
};
scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
} else {
task_group->Append([&, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());

for (auto maybe_batch : batches) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch)));
}

return Status::OK();
});
}
}
scan_futs.push_back(task_group->FinishAsync());
return AllComplete(scan_futs);
}

} // namespace

Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
std::shared_ptr<Scanner> scanner) {
RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));

auto task_group = scanner->options()->TaskGroup();

// Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent:
//
// - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing
Expand All @@ -440,7 +472,6 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
ARROW_ASSIGN_OR_RAISE(auto fragment_it, scanner->GetFragments());
ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector());
ScanTaskVector scan_tasks;
std::vector<Future<>> scan_futs;

for (const auto& fragment : fragments) {
auto options = std::make_shared<ScanOptions>(*scanner->options());
Expand All @@ -454,38 +485,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
}
}

// Store a mapping from partitions (represened by their formatted partition expressions)
// to a WriteQueue which flushes batches into that partition's output file. In principle
// any thread could produce a batch for any partition, so each task alternates between
// pushing batches and flushing them to disk.
WriteState state(write_options);
auto res = internal::RunSynchronously<arrow::detail::Empty>(
[&](internal::Executor* cpu_executor) -> Future<> {
return WriteInternal(*scanner->options(), state, std::move(scan_tasks),
cpu_executor);
},
scanner->options()->use_threads);
RETURN_NOT_OK(res);

for (const auto& scan_task : scan_tasks) {
if (scan_task->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync());
std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
[&, scan_task](std::shared_ptr<RecordBatch> batch) {
return WriteNextBatch(state, scan_task, std::move(batch));
};
scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
} else {
task_group->Append([&, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());

for (auto maybe_batch : batches) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch)));
}

return Status::OK();
});
}
}
RETURN_NOT_OK(task_group->Finish());
auto scan_futs_all_done = AllComplete(scan_futs);
RETURN_NOT_OK(scan_futs_all_done.status());

task_group = scanner->options()->TaskGroup();
auto task_group = scanner->options()->TaskGroup();
for (const auto& part_queue : state.queues) {
task_group->Append([&] { return part_queue.second->writer()->Finish(); });
}
Expand Down
23 changes: 16 additions & 7 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ namespace dataset {

using internal::checked_cast;
using internal::checked_pointer_cast;
using internal::Executor;
using internal::SerialExecutor;
using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;

Result<std::unordered_set<std::string>> GetColumnNames(
Expand Down Expand Up @@ -107,13 +109,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
auto read_options = csv_scan_options->read_options;
// Multithreaded conversion of individual files would lead to excessive thread
// contention when ScanTasks are also executed in multiple threads, so we disable it
// here.
// here. Also, this is a no-op since the streaming CSV reader is currently serial
read_options.use_threads = false;
return read_options;
}

static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
internal::Executor* cpu_executor,
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
Expand All @@ -136,7 +139,8 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
}

return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input),
reader_options, parse_options, convert_options)
cpu_executor, reader_options, parse_options,
convert_options)
.Then(
[](const std::shared_ptr<csv::StreamingReader>& maybe_reader)
-> Result<std::shared_ptr<csv::StreamingReader>> { return maybe_reader; },
Expand All @@ -151,8 +155,12 @@ static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
auto open_reader_fut = OpenReaderAsync(source, format, scan_options, pool);
return open_reader_fut.result();
bool use_threads = (scan_options != nullptr && scan_options->use_threads);
return internal::RunSynchronously<std::shared_ptr<csv::StreamingReader>>(
[&](Executor* executor) {
return OpenReaderAsync(source, format, executor, scan_options, pool);
},
use_threads);
}

/// \brief A ScanTask backed by an Csv file.
Expand All @@ -166,14 +174,15 @@ class CsvScanTask : public ScanTask {
source_(fragment->source()) {}

Result<RecordBatchIterator> Execute() override {
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync());
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool()));
return MakeGeneratorIterator(std::move(gen));
}

bool supports_async() const override { return true; }

Result<RecordBatchGenerator> ExecuteAsync() override {
auto reader_fut = OpenReaderAsync(source_, *format_, options(), options()->pool);
Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor) override {
auto reader_fut =
OpenReaderAsync(source_, *format_, cpu_executor, options(), options()->pool);
auto generator_fut = reader_fut.Then(
[](const std::shared_ptr<csv::StreamingReader>& reader) -> RecordBatchGenerator {
return [reader]() { return reader->ReadNextAsync(); };
Expand Down
32 changes: 22 additions & 10 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/scanner_internal.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
Expand All @@ -47,6 +48,8 @@ std::vector<std::string> ScanOptions::MaterializedFields() const {
return fields;
}

using arrow::internal::Executor;
using arrow::internal::SerialExecutor;
using arrow::internal::TaskGroup;

std::shared_ptr<TaskGroup> ScanOptions::TaskGroup() const {
Expand All @@ -61,7 +64,7 @@ Result<RecordBatchIterator> InMemoryScanTask::Execute() {
return MakeVectorIterator(record_batches_);
}

Result<RecordBatchGenerator> ScanTask::ExecuteAsync() {
Result<RecordBatchGenerator> ScanTask::ExecuteAsync(internal::Executor*) {
return Status::NotImplemented("Async is not implemented for this scan task yet");
}

Expand Down Expand Up @@ -200,6 +203,13 @@ struct TableAssemblyState {
};

Result<std::shared_ptr<Table>> Scanner::ToTable() {
return internal::RunSynchronously<std::shared_ptr<Table>>(
[this](Executor* executor) { return ToTableInternal(executor); },
scan_options_->use_threads);
}

Future<std::shared_ptr<Table>> Scanner::ToTableInternal(
internal::Executor* cpu_executor) {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
auto task_group = scan_options_->TaskGroup();

Expand All @@ -215,7 +225,7 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {

auto id = scan_task_id++;
if (scan_task->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync());
ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync(cpu_executor));
auto scan_fut = CollectAsyncGenerator(std::move(scan_gen))
.Then([state, id](const RecordBatchVector& rbs) {
state->Emplace(rbs, id);
Expand All @@ -230,14 +240,16 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
});
}
}
// Wait for all async tasks to complete, or the first error
RETURN_NOT_OK(AllComplete(scan_futures).status());

// Wait for all sync tasks to complete, or the first error.
RETURN_NOT_OK(task_group->Finish());

return Table::FromRecordBatches(scan_options_->projected_schema,
FlattenRecordBatchVector(std::move(state->batches)));
auto scan_options = scan_options_;
scan_futures.push_back(task_group->FinishAsync());
// Wait for all tasks to complete, or the first error
return AllComplete(scan_futures)
.Then(
[scan_options, state](const detail::Empty&) -> Result<std::shared_ptr<Table>> {
return Table::FromRecordBatches(
scan_options->projected_schema,
FlattenRecordBatchVector(std::move(state->batches)));
});
}

} // namespace dataset
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <utility>
Expand All @@ -31,11 +32,12 @@
#include "arrow/dataset/visibility.h"
#include "arrow/memory_pool.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/type_fwd.h"

namespace arrow {
using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;

using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;

namespace dataset {

constexpr int64_t kDefaultBatchSize = 1 << 20;
Expand Down Expand Up @@ -103,7 +105,7 @@ class ARROW_DS_EXPORT ScanTask {
/// resulting from the Scan. Execution semantics are encapsulated in the
/// particular ScanTask implementation
virtual Result<RecordBatchIterator> Execute() = 0;
virtual Result<RecordBatchGenerator> ExecuteAsync();
virtual Result<RecordBatchGenerator> ExecuteAsync(internal::Executor* cpu_executor);
virtual bool supports_async() const;

virtual ~ScanTask() = default;
Expand Down Expand Up @@ -175,6 +177,8 @@ class ARROW_DS_EXPORT Scanner {
const std::shared_ptr<ScanOptions>& options() const { return scan_options_; }

protected:
Future<std::shared_ptr<Table>> ToTableInternal(internal::Executor* cpu_executor);

std::shared_ptr<Dataset> dataset_;
// TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments
std::shared_ptr<Fragment> fragment_;
Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/dataset/scanner_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/partition.h"
#include "arrow/dataset/scanner.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/logging.h"

namespace arrow {

using internal::checked_cast;
using internal::Executor;

namespace dataset {

Expand Down Expand Up @@ -171,22 +173,15 @@ class FilterAndProjectScanTask : public ScanTask {
options_->pool);
}

Result<RecordBatchIterator> Execute() override {
if (task_->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync());
return MakeGeneratorIterator(std::move(gen));
} else {
return ExecuteSync();
}
}
Result<RecordBatchIterator> Execute() override { return ExecuteSync(); }

Result<RecordBatchGenerator> ExecuteAsync() override {
Result<RecordBatchGenerator> ExecuteAsync(Executor* cpu_executor) override {
if (!task_->supports_async()) {
return Status::Invalid(
"ExecuteAsync should not have been called on FilterAndProjectScanTask if the "
"source task did not support async");
}
ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync());
ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync(cpu_executor));

ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
SimplifyWithGuarantee(options()->filter, partition_));
Expand Down
Loading

0 comments on commit 7000172

Please sign in to comment.