Skip to content

Commit

Permalink
ARROW-15732: Do not use any CPU threads in execution plan when use_th…
Browse files Browse the repository at this point in the history
…reads is false
  • Loading branch information
westonpace committed Oct 18, 2022
1 parent 5f5ea7b commit ba6fa88
Show file tree
Hide file tree
Showing 35 changed files with 246 additions and 309 deletions.
2 changes: 1 addition & 1 deletion cpp/examples/arrow/engine_substrait_consumption.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ arrow::Status RunSubstraitConsumer(int argc, char** argv) {

// Start the plan...
std::cout << std::string(50, '#') << " consuming batches:" << std::endl;
ARROW_RETURN_NOT_OK(plan->StartProducing());
ARROW_RETURN_NOT_OK(plan->StartProducing(arrow::internal::GetCpuThreadPool()));

// ... and wait for it to finish
ARROW_RETURN_NOT_OK(plan->finished().status());
Expand Down
50 changes: 18 additions & 32 deletions cpp/examples/arrow/execution_plan_documentation_examples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ arrow::Status ExecutePlanAndCollectAsTable(
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
ARROW_RETURN_NOT_OK(plan->StartProducing());
ARROW_RETURN_NOT_OK(plan->StartProducing(exec_context.executor()));

// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
Expand Down Expand Up @@ -297,8 +297,7 @@ arrow::Status ExecutePlanAndCollectAsTable(
/// via the sink node.
arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
// Execution plan created
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());

Expand Down Expand Up @@ -332,8 +331,7 @@ arrow::Status ScanSinkExample(cp::ExecContext& exec_context) {
/// and the sink node emits the data as an output represented in
/// a table.
arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());

Expand Down Expand Up @@ -362,8 +360,7 @@ arrow::Status SourceSinkExample(cp::ExecContext& exec_context) {
/// receiving data from a table and the sink node emits
/// the data to a generator which we collect into a table.
arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(auto table, GetTable());

Expand Down Expand Up @@ -392,8 +389,7 @@ arrow::Status TableSourceSinkExample(cp::ExecContext& exec_context) {
/// along with the source and sink operations. The output from the
/// exeuction plan is obtained as a table via the sink node.
arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());

Expand Down Expand Up @@ -446,8 +442,7 @@ arrow::Status ScanFilterSinkExample(cp::ExecContext& exec_context) {
/// into the execution plan, how project operation can be applied on the
/// data stream and how the output is obtained as a table via the sink node.
arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());

Expand Down Expand Up @@ -491,8 +486,7 @@ arrow::Status ScanProjectSinkExample(cp::ExecContext& exec_context) {
/// data and the aggregation (counting unique types in column 'a')
/// is applied on this data. The output is obtained from the sink node as a table.
arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());

Expand Down Expand Up @@ -527,8 +521,7 @@ arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) {
/// data and the aggregation (counting unique types in column 'a') is
/// applied on this data. The output is obtained from the sink node as a table.
arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());

Expand Down Expand Up @@ -566,8 +559,7 @@ arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) {
/// This example shows how the data can be consumed within the execution plan
/// by using a ConsumingSink node. There is no data output from this execution plan.
arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());

Expand Down Expand Up @@ -611,7 +603,7 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "Exec Plan created: " << plan->ToString() << std::endl;
// plan start producing
ARROW_RETURN_NOT_OK(plan->StartProducing());
ARROW_RETURN_NOT_OK(plan->StartProducing(::arrow::internal::GetCpuThreadPool()));
// Source should finish fairly quickly
ARROW_RETURN_NOT_OK(source->finished().status());
std::cout << "Source Finished!" << std::endl;
Expand All @@ -633,8 +625,7 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
/// ASCENDING or DESCENDING and it is configurable. The output
/// is obtained as a table from the sink node.
arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());

Expand Down Expand Up @@ -667,8 +658,7 @@ arrow::Status SourceOrderBySinkExample(cp::ExecContext& exec_context) {
/// is obtained as a table via the sink node.
arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

Expand Down Expand Up @@ -712,8 +702,7 @@ arrow::Status SourceHashJoinSinkExample(cp::ExecContext& exec_context) {
/// sink node where output can be obtained as a table.
arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());
arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

ARROW_ASSIGN_OR_RAISE(
Expand Down Expand Up @@ -745,8 +734,7 @@ arrow::Status SourceKSelectExample(cp::ExecContext& exec_context) {
/// and after processing how it can be written to disk.
arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
const std::string& file_path) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());

Expand Down Expand Up @@ -797,7 +785,7 @@ arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "Execution Plan Created : " << plan->ToString() << std::endl;
// // // start the ExecPlan
ARROW_RETURN_NOT_OK(plan->StartProducing());
ARROW_RETURN_NOT_OK(plan->StartProducing(::arrow::internal::GetCpuThreadPool()));
auto future = plan->finished();
ARROW_RETURN_NOT_OK(future.status());
future.Wait();
Expand All @@ -818,8 +806,7 @@ arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());
arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

cp::Declaration union_node{"union", cp::ExecNodeOptions{}};
Expand Down Expand Up @@ -859,8 +846,7 @@ arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
/// receiving data as batches and the table sink node
/// which emits the output as a table.
arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());

Expand All @@ -878,7 +864,7 @@ arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
ARROW_RETURN_NOT_OK(plan->StartProducing());
ARROW_RETURN_NOT_OK(plan->StartProducing(::arrow::internal::GetCpuThreadPool()));

// Wait for the plan to finish
auto finished = plan->finished();
Expand Down
9 changes: 3 additions & 6 deletions cpp/examples/arrow/join_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,9 @@ arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> CreateDataSetFromCSVData
}

arrow::Status DoHashJoin() {
cp::ExecContext exec_context;

arrow::dataset::internal::Initialize();

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(&exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan, cp::ExecPlan::Make());

arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

Expand Down Expand Up @@ -131,12 +128,12 @@ arrow::Status DoHashJoin() {
cp::SinkNodeOptions{&sink_gen}));
// expected columns l_a, l_b
std::shared_ptr<arrow::RecordBatchReader> sink_reader = cp::MakeGeneratorReader(
hashjoin->output_schema(), std::move(sink_gen), exec_context.memory_pool());
hashjoin->output_schema(), std::move(sink_gen), arrow::default_memory_pool());

// validate the ExecPlan
ARROW_RETURN_NOT_OK(plan->Validate());
// start the ExecPlan
ARROW_RETURN_NOT_OK(plan->StartProducing());
ARROW_RETURN_NOT_OK(plan->StartProducing(arrow::internal::GetCpuThreadPool()));

// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,7 @@ int64_t InferBatchLength(const std::vector<Datum>& values, bool* all_same) {
ExecContext::ExecContext(MemoryPool* pool, ::arrow::internal::Executor* executor,
FunctionRegistry* func_registry)
: pool_(pool), executor_(executor) {
DCHECK_NE(executor, nullptr);
this->func_registry_ = func_registry == nullptr ? GetFunctionRegistry() : func_registry;
}

Expand Down
17 changes: 5 additions & 12 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/result.h"
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"

Expand All @@ -61,9 +62,10 @@ static constexpr int64_t kDefaultExecChunksize = UINT16_MAX;
class ARROW_EXPORT ExecContext {
public:
// If no function registry passed, the default is used.
explicit ExecContext(MemoryPool* pool = default_memory_pool(),
::arrow::internal::Executor* executor = NULLPTR,
FunctionRegistry* func_registry = NULLPTR);
explicit ExecContext(
MemoryPool* pool = default_memory_pool(),
::arrow::internal::Executor* executor = ::arrow::internal::GetCpuThreadPool(),
FunctionRegistry* func_registry = NULLPTR);

/// \brief The MemoryPool used for allocations, default is
/// default_memory_pool().
Expand All @@ -90,14 +92,6 @@ class ARROW_EXPORT ExecContext {
// smaller chunks.
int64_t exec_chunksize() const { return exec_chunksize_; }

/// \brief Set whether to use multiple threads for function execution. This
/// is not yet used.
void set_use_threads(bool use_threads = true) { use_threads_ = use_threads; }

/// \brief If true, then utilize multiple threads where relevant for function
/// execution. This is not yet used.
bool use_threads() const { return use_threads_; }

// Set the preallocation strategy for kernel execution as it relates to
// chunked execution. For chunked execution, whether via ChunkedArray inputs
// or splitting larger Array arguments into smaller pieces, contiguous
Expand All @@ -124,7 +118,6 @@ class ARROW_EXPORT ExecContext {
FunctionRegistry* func_registry_;
int64_t exec_chunksize_ = std::numeric_limits<int64_t>::max();
bool preallocate_contiguous_ = true;
bool use_threads_ = true;
};

// TODO: Consider standardizing on uint16 selection vectors and only use them
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/arrow/compute/exec/asof_join_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ static void TableJoinOverhead(benchmark::State& state,
TableGenerationProperties right_table_properties,
int batch_size, int num_right_tables,
std::string factory_name, ExecNodeOptions& options) {
ExecContext ctx(default_memory_pool(), nullptr);
left_table_properties.column_prefix = "lt";
left_table_properties.seed = 0;
ASSERT_OK_AND_ASSIGN(TableStats left_table_stats, MakeTable(left_table_properties));
Expand All @@ -76,7 +75,7 @@ static void TableJoinOverhead(benchmark::State& state,
for (auto _ : state) {
state.PauseTiming();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::compute::ExecPlan> plan,
ExecPlan::Make(&ctx));
ExecPlan::Make());
std::vector<ExecNode*> input_nodes = {*arrow::compute::MakeExecNode(
"table_source", plan.get(), {},
arrow::compute::TableSourceNodeOptions(left_table_stats.table, batch_size))};
Expand All @@ -91,7 +90,7 @@ static void TableJoinOverhead(benchmark::State& state,
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ASSERT_OK(MakeExecNode("sink", plan.get(), {join_node}, SinkNodeOptions{&sink_gen}));
state.ResumeTiming();
ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen));
ASSERT_FINISHES_OK(StartAndCollect(plan.get(), sink_gen, /*use_threads=*/false));
}

state.counters["input_rows_per_second"] = benchmark::Counter(
Expand All @@ -104,7 +103,7 @@ static void TableJoinOverhead(benchmark::State& state,
benchmark::Counter::kIsRate);

state.counters["maximum_peak_memory"] =
benchmark::Counter(static_cast<double>(ctx.memory_pool()->max_memory()));
benchmark::Counter(static_cast<double>(default_memory_pool()->max_memory()));
}

static void AsOfJoinOverhead(benchmark::State& state) {
Expand Down
15 changes: 7 additions & 8 deletions cpp/src/arrow/compute/exec/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
const BatchesWithSchema& r1_batches,
const BatchesWithSchema& exp_batches,
const AsofJoinNodeOptions join_options) {
auto exec_ctx = std::make_unique<ExecContext>(default_memory_pool(), nullptr);
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());

Declaration join{"asofjoin", join_options};

Expand All @@ -226,7 +225,8 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
ASSERT_OK(Declaration::Sequence({join, {"sink", SinkNodeOptions{&sink_gen}}})
.AddToPlan(plan.get()));

ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen));
ASSERT_FINISHES_OK_AND_ASSIGN(
auto res, StartAndCollect(plan.get(), sink_gen, /*use_threads=*/false));
for (auto batch : res) {
ASSERT_EQ(exp_batches.schema->num_fields(), batch.values.size());
}
Expand Down Expand Up @@ -256,8 +256,7 @@ void DoInvalidPlanTest(const BatchesWithSchema& l_batches,
const AsofJoinNodeOptions& join_options,
const std::string& expected_error_str,
bool fail_on_plan_creation = false) {
ExecContext exec_ctx;
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());

Declaration join{"asofjoin", join_options};
join.inputs.emplace_back(Declaration{
Expand All @@ -269,9 +268,9 @@ void DoInvalidPlanTest(const BatchesWithSchema& l_batches,
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ASSERT_OK(Declaration::Sequence({join, {"sink", SinkNodeOptions{&sink_gen}}})
.AddToPlan(plan.get()));
EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr(expected_error_str),
StartAndCollect(plan.get(), sink_gen));
EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr(expected_error_str),
StartAndCollect(plan.get(), sink_gen, /*use_threads=*/false));
} else {
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr(expected_error_str),
join.AddToPlan(plan.get()));
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/compute/exec/benchmark_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace compute {
// calling InputFinished and InputReceived.

Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
arrow::compute::ExecContext ctx,
arrow::compute::Expression expr, int32_t num_batches,
int32_t batch_size,
arrow::compute::BatchesWithSchema data,
Expand All @@ -46,7 +45,7 @@ Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen;

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
arrow::compute::ExecPlan::Make(&ctx));
arrow::compute::ExecPlan::Make());
// Source and sink nodes have no effect on the benchmark.
// Used for dummy purposes as they are referenced in InputReceived and InputFinished.
ARROW_ASSIGN_OR_RAISE(arrow::compute::ExecNode * source_node,
Expand Down Expand Up @@ -113,13 +112,13 @@ Status BenchmarkIsolatedNodeOverhead(benchmark::State& state,
// a source -> node_declarations -> sink sequence.

Status BenchmarkNodeOverhead(
benchmark::State& state, arrow::compute::ExecContext ctx, int32_t num_batches,
int32_t batch_size, arrow::compute::BatchesWithSchema data,
benchmark::State& state, int32_t num_batches, int32_t batch_size,
arrow::compute::BatchesWithSchema data,
std::vector<arrow::compute::Declaration>& node_declarations) {
for (auto _ : state) {
state.PauseTiming();
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::compute::ExecPlan> plan,
arrow::compute::ExecPlan::Make(&ctx));
arrow::compute::ExecPlan::Make());
AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen;
arrow::compute::Declaration source = arrow::compute::Declaration(
{"source",
Expand Down
Loading

0 comments on commit ba6fa88

Please sign in to comment.