-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ARROW-11928: [C++] Execution engine API
- Loading branch information
Showing
10 changed files
with
481 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
# Headers: top level | ||
arrow_install_all_headers("arrow/engine") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#pragma once | ||
|
||
#include "arrow/engine/query_plan.h" // IWYU pragma: export |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include "arrow/engine/exec_plan.h" | ||
|
||
#include "arrow/compute/exec.h" | ||
#include "arrow/engine/query_plan.h" | ||
#include "arrow/result.h" | ||
#include "arrow/status.h" | ||
#include "arrow/util/checked_cast.h" | ||
#include "arrow/util/future.h" | ||
#include "arrow/util/logging.h" | ||
#include "arrow/util/thread_pool.h" | ||
|
||
namespace arrow { | ||
|
||
using internal::checked_cast; | ||
|
||
namespace engine { | ||
|
||
ExecNode::InputBatch::InputBatch(std::vector<util::optional<compute::ExecBatch>> batches) | ||
: batches(std::move(batches)), ready_fut(Future<>::Make()) {} | ||
|
||
ExecNode::ExecNode(ExecPlan* plan, const QueryNode* query_node) | ||
: plan_(plan), | ||
query_node_(query_node), | ||
num_inputs_(query_node->num_inputs()), | ||
finished_fut_(Future<>::Make()) {} | ||
|
||
ExecNode::~ExecNode() = default; | ||
|
||
void ExecNode::ReserveBatches(int32_t num_batches) { | ||
// Should be called with mutex locked | ||
if (static_cast<size_t>(num_batches) > input_batches_.size()) { | ||
input_batches_.resize(num_batches); | ||
} | ||
} | ||
|
||
ExecNode::InputBatch* ExecNode::EnsureBatch(int32_t batch_index) { | ||
// Should be called with mutex locked | ||
if (input_batches_[batch_index] == nullptr) { | ||
input_batches_[batch_index].reset( | ||
new InputBatch{std::vector<util::optional<compute::ExecBatch>>(num_inputs_)}); | ||
} | ||
return input_batches_[batch_index].get(); | ||
} | ||
|
||
Status ExecNode::InputReceived(int32_t input_index, int32_t batch_index, | ||
compute::ExecBatch batch) { | ||
auto lock = mutex_.Lock(); | ||
|
||
if (input_index >= num_inputs_) { | ||
return Status::Invalid("Invalid input index"); | ||
} | ||
if (finish_at_ >= 0 && batch_index >= finish_at_) { | ||
return Status::Invalid("Input batch index out of bounds"); | ||
} | ||
|
||
ReserveBatches(batch_index + 1); | ||
auto* input_batch = EnsureBatch(batch_index); | ||
|
||
if (input_batch->batches[input_index].has_value()) { | ||
return Status::Invalid("Batch #", batch_index, " for input #", input_index, | ||
" already received"); | ||
} | ||
input_batch->batches[input_index] = std::move(batch); | ||
if (++input_batch->num_ready == num_inputs_) { | ||
input_batch->ready_fut.MarkFinished(); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Status ExecNode::InputFinished(int32_t num_batches) { | ||
auto lock = mutex_.Lock(); | ||
|
||
if (finish_at_ >= 0) { | ||
return Status::Invalid("InputFinished already called"); | ||
} | ||
finish_at_ = num_batches; | ||
ReserveBatches(num_batches); | ||
|
||
std::vector<Future<>> batch_futures; | ||
for (int32_t i = 0; i < num_batches; ++i) { | ||
auto* input_batch = EnsureBatch(i); | ||
batch_futures[i] = input_batch->ready_fut; | ||
} | ||
|
||
// TODO lifetime | ||
AllComplete(std::move(batch_futures)) | ||
.AddCallback([this](const Result<detail::Empty>& res) { | ||
finished_fut_.MarkFinished(res.status()); | ||
}); | ||
return Status::OK(); | ||
} | ||
|
||
Future<> ExecNode::RunAsync(int32_t batch_index, internal::Executor* executor) { | ||
auto lock = mutex_.Lock(); | ||
|
||
ReserveBatches(batch_index + 1); | ||
auto* input_batch = EnsureBatch(batch_index); | ||
|
||
// TODO lifetime (take strong ref to ExecPlan?) | ||
return executor->Transfer(input_batch->ready_fut) | ||
.Then([this, batch_index](...) -> Status { return RunSyncInternal(batch_index); }); | ||
} | ||
|
||
Future<> ExecNode::FinishAsync(internal::Executor* executor) { | ||
auto lock = mutex_.Lock(); | ||
|
||
// TODO lifetime | ||
return executor->Transfer(finished_fut_).Then([this](...) -> Status { | ||
return FinishSyncInternal(); | ||
}); | ||
} | ||
|
||
} // namespace engine | ||
} // namespace arrow |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#pragma once | ||
|
||
#include <memory> | ||
#include <string> | ||
#include <vector> | ||
|
||
#include "arrow/compute/type_fwd.h" | ||
#include "arrow/type_fwd.h" | ||
#include "arrow/util/future.h" | ||
#include "arrow/util/macros.h" | ||
#include "arrow/util/mutex.h" | ||
#include "arrow/util/optional.h" | ||
#include "arrow/util/type_fwd.h" | ||
#include "arrow/util/visibility.h" | ||
|
||
namespace arrow { | ||
namespace engine { | ||
|
||
class ExecNode; | ||
class QueryNode; | ||
class QueryPlan; | ||
|
||
class ARROW_EXPORT ExecPlan { | ||
public: | ||
using NodeVector = std::vector<ExecNode*>; | ||
|
||
virtual ~ExecPlan() = default; | ||
|
||
/// The query plan this ExecPlan is an instance of | ||
const QueryPlan& query_plan() const { return *query_plan_; } | ||
|
||
compute::ExecContext* context() { return context_; } | ||
|
||
protected: | ||
friend class QueryPlan; | ||
|
||
ExecPlan() = default; | ||
|
||
const std::shared_ptr<const QueryPlan> query_plan_; | ||
compute::ExecContext* context_; | ||
}; | ||
|
||
class ARROW_EXPORT ExecNode { | ||
public: | ||
using NodeVector = std::vector<ExecNode*>; | ||
|
||
virtual ~ExecNode(); | ||
|
||
/// The query node this ExecNode is an instance of | ||
const QueryNode& query_node() const { return *query_node_; } | ||
|
||
/// This node's exec plan | ||
ExecPlan* plan() { return plan_; } | ||
|
||
/// Transfer input batch to ExecNode | ||
/// | ||
/// When all inputs are received for a given batch_index, the batch is ready | ||
/// for execution. | ||
Status InputReceived(int32_t input_index, int32_t batch_index, | ||
compute::ExecBatch batch); | ||
|
||
/// Mark the inputs finished after the given number of batches. | ||
/// | ||
/// This may be called before all inputs are received. This simply fixes | ||
/// the total number of incoming batches so that the ExecNode knows when | ||
/// it has received all input. | ||
Status InputFinished(int32_t num_batches); | ||
|
||
/// Schedule batch execution once all inputs are received for the given batch_index | ||
/// | ||
/// The returned Future is finished once execution of the batch is finished. | ||
/// Note that execution doesn't necessarily mean that any outputs are produced. | ||
/// Depending on the ExecNode type, outputs may be produced on the fly, | ||
/// or only at the end when all inputs have been received. | ||
Future<> RunAsync(int32_t batch_index, internal::Executor* executor); | ||
|
||
/// Schedule finalization once all batches are executed | ||
/// | ||
/// Return a Future that will be marked finished once all inputs are received | ||
/// and all computation has finished. | ||
/// | ||
/// RunAsync and FinishAsync can be though of as exposing a map/reduce pipeline. | ||
Future<> FinishAsync(internal::Executor* executor); | ||
|
||
protected: | ||
struct InputBatch { | ||
std::vector<util::optional<compute::ExecBatch>> batches; | ||
int32_t num_ready = 0; | ||
Future<> ready_fut; | ||
|
||
explicit InputBatch(std::vector<util::optional<compute::ExecBatch>> batches); | ||
}; | ||
|
||
ExecNode(ExecPlan* plan, const QueryNode* query_node); | ||
|
||
virtual Status RunSyncInternal(int32_t batch_index) = 0; | ||
virtual Status FinishSyncInternal() = 0; | ||
|
||
InputBatch* EnsureBatch(int32_t batch_index); | ||
void ReserveBatches(int32_t num_batches); | ||
|
||
ExecPlan* plan_; | ||
const QueryNode* const query_node_; | ||
const int32_t num_inputs_; | ||
|
||
// XXX also use a per-input batch mutex? | ||
util::Mutex mutex_; | ||
std::vector<std::unique_ptr<InputBatch>> input_batches_; | ||
int32_t finish_at_ = -1; | ||
Future<> finished_fut_; | ||
}; | ||
|
||
} // namespace engine | ||
} // namespace arrow |
Oops, something went wrong.