Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] ARROW-11928: [C++] Execution engine API #9742

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ if(ARROW_CUDA
set(ARROW_IPC ON)
endif()

if(ARROW_ENGINE)
set(ARROW_COMPUTE ON)
endif()

if(ARROW_DATASET)
set(ARROW_COMPUTE ON)
set(ARROW_FILESYSTEM ON)
Expand Down
2 changes: 2 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")

define_option(ARROW_DATASET "Build the Arrow Dataset Modules" OFF)

define_option(ARROW_ENGINE "Build the Arrow Execution Engine" OFF)

define_option(ARROW_FILESYSTEM "Build the Arrow Filesystem Layer" OFF)

define_option(ARROW_FLIGHT
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ if(ARROW_COMPUTE)
endif()
endif()

if(ARROW_ENGINE)
list(APPEND ARROW_SRCS engine/exec_plan.cc engine/query_plan.cc)
endif()

if(ARROW_FILESYSTEM)
if(ARROW_HDFS)
add_definitions(-DARROW_HDFS)
Expand Down Expand Up @@ -671,6 +675,10 @@ if(ARROW_DATASET)
add_subdirectory(dataset)
endif()

if(ARROW_ENGINE)
add_subdirectory(engine)
endif()

if(ARROW_FILESYSTEM)
add_subdirectory(filesystem)
endif()
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct FunctionOptions;

struct CastOptions;

class ExecBatch;
class ExecContext;
class KernelContext;

Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Headers: top level
arrow_install_all_headers("arrow/engine")
20 changes: 20 additions & 0 deletions cpp/src/arrow/engine/api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/engine/query_plan.h" // IWYU pragma: export
131 changes: 131 additions & 0 deletions cpp/src/arrow/engine/exec_plan.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/engine/exec_plan.h"

#include "arrow/compute/exec.h"
#include "arrow/engine/query_plan.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"

namespace arrow {

using internal::checked_cast;

namespace engine {

ExecNode::InputBatch::InputBatch(std::vector<util::optional<compute::ExecBatch>> batches)
: batches(std::move(batches)), ready_fut(Future<>::Make()) {}

ExecNode::ExecNode(ExecPlan* plan, const QueryNode* query_node)
: plan_(plan),
query_node_(query_node),
num_inputs_(query_node->num_inputs()),
finished_fut_(Future<>::Make()) {}

ExecNode::~ExecNode() = default;

void ExecNode::ReserveBatches(int32_t num_batches) {
// Should be called with mutex locked
if (static_cast<size_t>(num_batches) > input_batches_.size()) {
input_batches_.resize(num_batches);
}
}

ExecNode::InputBatch* ExecNode::EnsureBatch(int32_t batch_index) {
// Should be called with mutex locked
if (input_batches_[batch_index] == nullptr) {
input_batches_[batch_index].reset(
new InputBatch{std::vector<util::optional<compute::ExecBatch>>(num_inputs_)});
}
return input_batches_[batch_index].get();
}

Status ExecNode::InputReceived(int32_t input_index, int32_t batch_index,
compute::ExecBatch batch) {
auto lock = mutex_.Lock();

if (input_index >= num_inputs_) {
return Status::Invalid("Invalid input index");
}
if (finish_at_ >= 0 && batch_index >= finish_at_) {
return Status::Invalid("Input batch index out of bounds");
}

ReserveBatches(batch_index + 1);
auto* input_batch = EnsureBatch(batch_index);

if (input_batch->batches[input_index].has_value()) {
return Status::Invalid("Batch #", batch_index, " for input #", input_index,
" already received");
}
input_batch->batches[input_index] = std::move(batch);
if (++input_batch->num_ready == num_inputs_) {
input_batch->ready_fut.MarkFinished();
}
return Status::OK();
}

Status ExecNode::InputFinished(int32_t num_batches) {
auto lock = mutex_.Lock();

if (finish_at_ >= 0) {
return Status::Invalid("InputFinished already called");
}
finish_at_ = num_batches;
ReserveBatches(num_batches);

std::vector<Future<>> batch_futures;
for (int32_t i = 0; i < num_batches; ++i) {
auto* input_batch = EnsureBatch(i);
batch_futures[i] = input_batch->ready_fut;
}

// TODO lifetime
AllComplete(std::move(batch_futures))
.AddCallback([this](const Result<detail::Empty>& res) {
finished_fut_.MarkFinished(res.status());
});
return Status::OK();
}

Future<> ExecNode::RunAsync(int32_t batch_index, internal::Executor* executor) {
auto lock = mutex_.Lock();

ReserveBatches(batch_index + 1);
auto* input_batch = EnsureBatch(batch_index);

// TODO lifetime (take strong ref to ExecPlan?)
return executor->Transfer(input_batch->ready_fut)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would different nodes run on different executors? If not it might be simpler to avoid the transfer. Also, do I have to call RunAsync on every node for every batch index? Couldn't calling RunAsync on the root node(s?) be enough?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "root nodes" are in this context. You mean the sources?

Indeed, we may simply want to start executing as soon as all inputs are ready. Though this will depend on the node: some can execute as soon as one batch is received, some need the whole input to be received.

.Then([this, batch_index](...) -> Status { return RunSyncInternal(batch_index); });
}

Future<> ExecNode::FinishAsync(internal::Executor* executor) {
auto lock = mutex_.Lock();

// TODO lifetime
return executor->Transfer(finished_fut_).Then([this](...) -> Status {
return FinishSyncInternal();
});
}

} // namespace engine
} // namespace arrow
131 changes: 131 additions & 0 deletions cpp/src/arrow/engine/exec_plan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "arrow/compute/type_fwd.h"
#include "arrow/type_fwd.h"
#include "arrow/util/future.h"
#include "arrow/util/macros.h"
#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {
namespace engine {

class ExecNode;
class QueryNode;
class QueryPlan;

class ARROW_EXPORT ExecPlan {
public:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I actually run this? There doesn't appear to be any public interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is just a WIP PR. I decided to post it to ensure that I'm not entirely going in a bad direction.

using NodeVector = std::vector<ExecNode*>;

virtual ~ExecPlan() = default;

/// The query plan this ExecPlan is an instance of
const QueryPlan& query_plan() const { return *query_plan_; }

compute::ExecContext* context() { return context_; }

protected:
friend class QueryPlan;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be the other way around? The QueryPlan doesn't have any reference to the ExecPlan (and if my understanding of the relationship is correct it shouldn't). What value is there in this declaration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query plan is supposed to instantiate a corresponding exec plan when QueryPlan::MakeExecPlan is called. Presumably that will need access to non-public APIs or members, but this is all just a sketch.


ExecPlan() = default;

const std::shared_ptr<const QueryPlan> query_plan_;
compute::ExecContext* context_;
};

class ARROW_EXPORT ExecNode {
public:
using NodeVector = std::vector<ExecNode*>;

virtual ~ExecNode();

/// The query node this ExecNode is an instance of
const QueryNode& query_node() const { return *query_node_; }

/// This node's exec plan
ExecPlan* plan() { return plan_; }

/// Transfer input batch to ExecNode
///
/// When all inputs are received for a given batch_index, the batch is ready
/// for execution.
Status InputReceived(int32_t input_index, int32_t batch_index,
compute::ExecBatch batch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "batch_index"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index of the batch in the batch stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this parameter is important, at least at this stage. Could you clarify what "When all inputs are received for a given batch_index, the batch is ready for execution." means? In general, inputs from different parents won't necessarily correspond to each other.

For example, suppose that we are joining a 1,000,000 row input with a 100,000 row input. One input might yield 100 batches while the other might only yield 5 or 10. They may be of differing lengths, in an unpredictable order (since scans won't necessarily yield a deterministic order), and the batches won't correspond to each other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, inputs from different parents won't necessarily correspond to each other.

Ah, I see. This is a misunderstanding on my part, then. Sorry.


/// Mark the inputs finished after the given number of batches.
///
/// This may be called before all inputs are received. This simply fixes
/// the total number of incoming batches so that the ExecNode knows when
/// it has received all input.
Status InputFinished(int32_t num_batches);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • In general, nodes will not know how many batches they are going to produce, so the situations where this API would be used seem rare to me
  • When a node has multiple inputs (e.g. joins, unions), the parent nodes will finish producing outputs at different times. How are they supposed to independently communicate that they are done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the docstring says, this may or may not be called before all inputs are received. But it must be called at some point so that the node knows that input is finished.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the second question, it seems I should change this declaration to Status InputFinished(int32_t input_index, int32_t num_batches). Does that sound right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say to nix the "num_batches" argument, but otherwise yes InputFinished(input_index) sounds right


/// Schedule batch execution once all inputs are received for the given batch_index
///
/// The returned Future is finished once execution of the batch is finished.
/// Note that execution doesn't necessarily mean that any outputs are produced.
/// Depending on the ExecNode type, outputs may be produced on the fly,
/// or only at the end when all inputs have been received.
Future<> RunAsync(int32_t batch_index, internal::Executor* executor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't understand what "batch_index" is yet


/// Schedule finalization once all batches are executed
///
/// Return a Future that will be marked finished once all inputs are received
/// and all computation has finished.
///
/// RunAsync and FinishAsync can be though of as exposing a map/reduce pipeline.
Future<> FinishAsync(internal::Executor* executor);

protected:
struct InputBatch {
std::vector<util::optional<compute::ExecBatch>> batches;
int32_t num_ready = 0;
Future<> ready_fut;

explicit InputBatch(std::vector<util::optional<compute::ExecBatch>> batches);
};

ExecNode(ExecPlan* plan, const QueryNode* query_node);

virtual Status RunSyncInternal(int32_t batch_index) = 0;
virtual Status FinishSyncInternal() = 0;

InputBatch* EnsureBatch(int32_t batch_index);
void ReserveBatches(int32_t num_batches);

ExecPlan* plan_;
const QueryNode* const query_node_;
const int32_t num_inputs_;

// XXX also use a per-input batch mutex?
util::Mutex mutex_;
std::vector<std::unique_ptr<InputBatch>> input_batches_;
int32_t finish_at_ = -1;
Future<> finished_fut_;
};

} // namespace engine
} // namespace arrow
Loading