Skip to content

Commit

Permalink
apacheGH-33212: [C++][Python] Add use_threads to pyarrow.substrait.ru…
Browse files Browse the repository at this point in the history
…n_query (apache#33623)

Also adds memory_pool and & function_registry to the various DeclarationToXyz methods.  Converts `ExecuteSerializedPlan` to `DeclarationToReader` instead of the bespoke thing it was doing before.
* Closes: apache#33212

Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Vibhatha Lakmal Abeykoon <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
westonpace and vibhatha authored Jan 13, 2023
1 parent 97998d8 commit e1027dc
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 172 deletions.
87 changes: 52 additions & 35 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,23 +563,28 @@ Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; });
}

Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
bool use_threads) {
Future<std::shared_ptr<Table>> DeclarationToTableAsync(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToTableAsync(std::move(declaration), *threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx)
.Then([tpool](const std::shared_ptr<Table>& table) { return table; });
}
}

Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
bool use_threads) {
bool use_threads,
MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<std::shared_ptr<Table>>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx);
},
use_threads);
Expand All @@ -594,12 +599,15 @@ Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
}

Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, bool use_threads) {
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToBatchesAsync(std::move(declaration), *threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx)
.Then([tpool](const std::vector<std::shared_ptr<RecordBatch>>& batches) {
return batches;
Expand All @@ -608,11 +616,12 @@ Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
}

Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
Declaration declaration, bool use_threads) {
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<
Future<std::vector<std::shared_ptr<RecordBatch>>>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx);
},
use_threads);
Expand Down Expand Up @@ -641,24 +650,27 @@ Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declar
});
}

Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declaration,
bool use_threads) {
Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToExecBatchesAsync(std::move(declaration),
*threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx)
.Then([tpool](const BatchesWithCommonSchema& batches) { return batches; });
}
}

Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration,
bool use_threads) {
Result<BatchesWithCommonSchema> DeclarationToExecBatches(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<BatchesWithCommonSchema>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx);
},
use_threads);
Expand All @@ -680,20 +692,25 @@ Future<> DeclarationToStatusAsync(Declaration declaration, ExecContext exec_cont
return exec_plan->finished().Then([exec_plan]() {});
}

Future<> DeclarationToStatusAsync(Declaration declaration, bool use_threads) {
Future<> DeclarationToStatusAsync(Declaration declaration, bool use_threads,
MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToStatusAsync(std::move(declaration), *threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx).Then([tpool]() {});
}
}

Status DeclarationToStatus(Declaration declaration, bool use_threads) {
Status DeclarationToStatus(Declaration declaration, bool use_threads,
MemoryPool* memory_pool, FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx);
},
use_threads);
Expand Down Expand Up @@ -738,11 +755,9 @@ struct BatchConverter {
};

Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
Declaration declaration, ::arrow::internal::Executor* executor,
std::shared_ptr<Schema>* out_schema) {
Declaration declaration, ExecContext exec_ctx, std::shared_ptr<Schema>* out_schema) {
auto converter = std::make_shared<BatchConverter>();
ExecContext exec_context(default_memory_pool(), executor);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_ctx));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
Expand All @@ -754,14 +769,16 @@ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGen
}
} // namespace

Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
bool use_threads) {
Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
std::shared_ptr<Schema> schema;
auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
[&](::arrow::internal::Executor* executor)
-> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
return DeclarationToRecordBatchGenerator(declaration, executor, &schema);
ExecContext exec_ctx(memory_pool, executor, function_registry);
return DeclarationToRecordBatchGenerator(declaration, exec_ctx, &schema);
},
use_threads));

Expand Down
68 changes: 49 additions & 19 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,24 +426,36 @@ struct ARROW_EXPORT Declaration {

/// \brief Utility method to run a declaration and collect the results into a table
///
/// \param use_threads If `use_threads` is false then all CPU work will be done on the
/// calling thread. I/O tasks will still happen on the I/O executor
/// and may be multi-threaded (but should not use significant CPU
/// resources).
/// \param memory_pool The memory pool to use for allocations made while running the plan.
/// \param function_registry The function registry to use for function execution. If null
/// then the default function registry will be used.
///
/// This method will add a sink node to the declaration to collect results into a
/// table. It will then create an ExecPlan from the declaration, start the exec plan,
/// block until the plan has finished, and return the created table.
///
/// If `use_threads` is false then all CPU work will be done on the calling thread. I/O
/// tasks will still happen on the I/O executor and may be multi-threaded (but should
/// not use significant CPU resources)
ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
bool use_threads = true);
ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToTable
///
/// The behavior of use_threads is slightly different than the synchronous version since
/// we cannot run synchronously on the calling thread. Instead, if use_threads=false then
/// a new thread pool will be created with a single thread and this will be used for all
/// compute work.
/// \param use_threads The behavior of use_threads is slightly different than the
/// synchronous version since we cannot run synchronously on the
/// calling thread. Instead, if use_threads=false then a new thread
/// pool will be created with a single thread and this will be used for
/// all compute work.
/// \param memory_pool The memory pool to use for allocations made while running the plan.
/// \param function_registry The function registry to use for function execution. If null
/// then the default function registry will be used.
ARROW_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToTableAsync accepting a custom exec context
///
Expand All @@ -463,13 +475,17 @@ struct BatchesWithCommonSchema {
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Result<BatchesWithCommonSchema> DeclarationToExecBatches(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToExecBatches
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToExecBatchesAsync accepting a custom exec context
///
Expand All @@ -481,13 +497,17 @@ ARROW_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToBatches
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToBatchesAsync accepting a custom exec context
///
Expand All @@ -511,24 +531,34 @@ ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatc
///
/// If a custom exec context is provided then the value of `use_threads` will be ignored.
ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToReader accepting a custom exec context
ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, ExecContext exec_context);

/// \brief Utility method to run a declaration and ignore results
///
/// This can be useful when the data are consumed as part of the plan itself, for
/// example, when the plan ends with a write node.
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Status DeclarationToStatus(Declaration declaration, bool use_threads = true);
ARROW_EXPORT Status DeclarationToStatus(Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToStatus
///
/// This can be useful when the data are consumed as part of the plan itself, for
/// example, when the plan ends with a write node.
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<> DeclarationToStatusAsync(Declaration declaration,
bool use_threads = true);
ARROW_EXPORT Future<> DeclarationToStatusAsync(
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToStatusAsync accepting a custom exec context
///
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/engine/substrait/serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ DeclarationFactory MakeWriteDeclarationFactory(
};
}

DeclarationFactory MakeNoSinkDeclarationFactory() {
return [](compute::Declaration input,
std::vector<std::string> names) -> Result<compute::Declaration> {
return input;
};
}

// FIXME - Replace with actual version that includes the change
constexpr uint32_t kMinimumMajorVersion = 0;
constexpr uint32_t kMinimumMinorVersion = 19;
Expand Down Expand Up @@ -188,6 +195,21 @@ Result<std::vector<compute::Declaration>> DeserializePlans(
registry, ext_set_out, conversion_options);
}

ARROW_ENGINE_EXPORT Result<compute::Declaration> DeserializePlan(
const Buffer& buf, const ExtensionIdRegistry* registry, ExtensionSet* ext_set_out,
const ConversionOptions& conversion_options) {
ARROW_ASSIGN_OR_RAISE(std::vector<compute::Declaration> top_level_decls,
DeserializePlans(buf, MakeNoSinkDeclarationFactory(), registry,
ext_set_out, conversion_options));
if (top_level_decls.empty()) {
return Status::Invalid("No RelRoot in plan");
}
if (top_level_decls.size() != 1) {
return Status::Invalid("Multiple top level declarations found in Substrait plan");
}
return top_level_decls[0];
}

namespace {

Result<std::shared_ptr<compute::ExecPlan>> MakeSingleDeclarationPlan(
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/engine/substrait/serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,23 @@ ARROW_ENGINE_EXPORT Result<std::shared_ptr<compute::ExecPlan>> DeserializePlan(
const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR,
const ConversionOptions& conversion_options = {});

/// \brief Deserializes a Substrait Plan message to a Declaration
///
/// The plan will not contain any sink nodes and will be suitable for use in any
/// of the arrow::compute::DeclarationToXyz methods.
///
/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
/// message
/// \param[in] registry an extension-id-registry to use, or null for the default one.
/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
/// Plan is returned here.
/// \param[in] conversion_options options to control how the conversion is to be done.
/// \return A declaration representing the Substrait plan
ARROW_ENGINE_EXPORT Result<compute::Declaration> DeserializePlan(
const Buffer& buf, const ExtensionIdRegistry* registry = NULLPTR,
ExtensionSet* ext_set_out = NULLPTR,
const ConversionOptions& conversion_options = {});

/// \brief Deserializes a Substrait Type message to the corresponding Arrow type
///
/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type
Expand Down
Loading

0 comments on commit e1027dc

Please sign in to comment.