Skip to content

Commit

Permalink
ARROW-11928: [C++] Execution engine API
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Mar 17, 2021
1 parent 3decc46 commit 66b9b60
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ if(ARROW_CUDA
set(ARROW_IPC ON)
endif()

if(ARROW_ENGINE)
set(ARROW_COMPUTE ON)
endif()

if(ARROW_DATASET)
set(ARROW_COMPUTE ON)
set(ARROW_FILESYSTEM ON)
Expand Down
2 changes: 2 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")

define_option(ARROW_DATASET "Build the Arrow Dataset Modules" OFF)

define_option(ARROW_ENGINE "Build the Arrow Execution Engine" OFF)

define_option(ARROW_FILESYSTEM "Build the Arrow Filesystem Layer" OFF)

define_option(ARROW_FLIGHT
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ if(ARROW_COMPUTE)
endif()
endif()

if(ARROW_ENGINE)
list(APPEND ARROW_SRCS engine/exec_plan.cc engine/query_plan.cc)
endif()

if(ARROW_FILESYSTEM)
if(ARROW_HDFS)
add_definitions(-DARROW_HDFS)
Expand Down Expand Up @@ -671,6 +675,10 @@ if(ARROW_DATASET)
add_subdirectory(dataset)
endif()

if(ARROW_ENGINE)
add_subdirectory(engine)
endif()

if(ARROW_FILESYSTEM)
add_subdirectory(filesystem)
endif()
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct FunctionOptions;

struct CastOptions;

class ExecBatch;
class ExecContext;
class KernelContext;

Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Headers: top level
arrow_install_all_headers("arrow/engine")
20 changes: 20 additions & 0 deletions cpp/src/arrow/engine/api.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/engine/query_plan.h" // IWYU pragma: export
131 changes: 131 additions & 0 deletions cpp/src/arrow/engine/exec_plan.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/engine/exec_plan.h"

#include "arrow/compute/exec.h"
#include "arrow/engine/query_plan.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"

namespace arrow {

using internal::checked_cast;

namespace engine {

ExecNode::InputBatch::InputBatch(std::vector<util::optional<compute::ExecBatch>> batches)
: batches(std::move(batches)), ready_fut(Future<>::Make()) {}

ExecNode::ExecNode(ExecPlan* plan, const QueryNode* query_node)
: plan_(plan),
query_node_(query_node),
num_inputs_(query_node->num_inputs()),
finished_fut_(Future<>::Make()) {}

ExecNode::~ExecNode() = default;

void ExecNode::ReserveBatches(int32_t num_batches) {
// Should be called with mutex locked
if (static_cast<size_t>(num_batches) > input_batches_.size()) {
input_batches_.resize(num_batches);
}
}

ExecNode::InputBatch* ExecNode::EnsureBatch(int32_t batch_index) {
// Should be called with mutex locked
if (input_batches_[batch_index] == nullptr) {
input_batches_[batch_index].reset(
new InputBatch{std::vector<util::optional<compute::ExecBatch>>(num_inputs_)});
}
return input_batches_[batch_index].get();
}

Status ExecNode::InputReceived(int32_t input_index, int32_t batch_index,
compute::ExecBatch batch) {
auto lock = mutex_.Lock();

if (input_index >= num_inputs_) {
return Status::Invalid("Invalid input index");
}
if (finish_at_ >= 0 && batch_index >= finish_at_) {
return Status::Invalid("Input batch index out of bounds");
}

ReserveBatches(batch_index + 1);
auto* input_batch = EnsureBatch(batch_index);

if (input_batch->batches[input_index].has_value()) {
return Status::Invalid("Batch #", batch_index, " for input #", input_index,
" already received");
}
input_batch->batches[input_index] = std::move(batch);
if (++input_batch->num_ready == num_inputs_) {
input_batch->ready_fut.MarkFinished();
}
return Status::OK();
}

Status ExecNode::InputFinished(int32_t num_batches) {
auto lock = mutex_.Lock();

if (finish_at_ >= 0) {
return Status::Invalid("InputFinished already called");
}
finish_at_ = num_batches;
ReserveBatches(num_batches);

std::vector<Future<>> batch_futures;
for (int32_t i = 0; i < num_batches; ++i) {
auto* input_batch = EnsureBatch(i);
batch_futures[i] = input_batch->ready_fut;
}

// TODO lifetime
AllComplete(std::move(batch_futures))
.AddCallback([this](const Result<detail::Empty>& res) {
finished_fut_.MarkFinished(res.status());
});
return Status::OK();
}

Future<> ExecNode::RunAsync(int32_t batch_index, internal::Executor* executor) {
auto lock = mutex_.Lock();

ReserveBatches(batch_index + 1);
auto* input_batch = EnsureBatch(batch_index);

// TODO lifetime (take strong ref to ExecPlan?)
return executor->Transfer(input_batch->ready_fut)
.Then([this, batch_index](...) -> Status { return RunSyncInternal(batch_index); });
}

Future<> ExecNode::FinishAsync(internal::Executor* executor) {
auto lock = mutex_.Lock();

// TODO lifetime
return executor->Transfer(finished_fut_).Then([this](...) -> Status {
return FinishSyncInternal();
});
}

} // namespace engine
} // namespace arrow
131 changes: 131 additions & 0 deletions cpp/src/arrow/engine/exec_plan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "arrow/compute/type_fwd.h"
#include "arrow/type_fwd.h"
#include "arrow/util/future.h"
#include "arrow/util/macros.h"
#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {
namespace engine {

class ExecNode;
class QueryNode;
class QueryPlan;

class ARROW_EXPORT ExecPlan {
public:
using NodeVector = std::vector<ExecNode*>;

virtual ~ExecPlan() = default;

/// The query plan this ExecPlan is an instance of
const QueryPlan& query_plan() const { return *query_plan_; }

compute::ExecContext* context() { return context_; }

protected:
friend class QueryPlan;

ExecPlan() = default;

const std::shared_ptr<const QueryPlan> query_plan_;
compute::ExecContext* context_;
};

class ARROW_EXPORT ExecNode {
public:
using NodeVector = std::vector<ExecNode*>;

virtual ~ExecNode();

/// The query node this ExecNode is an instance of
const QueryNode& query_node() const { return *query_node_; }

/// This node's exec plan
ExecPlan* plan() { return plan_; }

/// Transfer input batch to ExecNode
///
/// When all inputs are received for a given batch_index, the batch is ready
/// for execution.
Status InputReceived(int32_t input_index, int32_t batch_index,
compute::ExecBatch batch);

/// Mark the inputs finished after the given number of batches.
///
/// This may be called before all inputs are received. This simply fixes
/// the total number of incoming batches so that the ExecNode knows when
/// it has received all input.
Status InputFinished(int32_t num_batches);

/// Schedule batch execution once all inputs are received for the given batch_index
///
/// The returned Future is finished once execution of the batch is finished.
/// Note that execution doesn't necessarily mean that any outputs are produced.
/// Depending on the ExecNode type, outputs may be produced on the fly,
/// or only at the end when all inputs have been received.
Future<> RunAsync(int32_t batch_index, internal::Executor* executor);

/// Schedule finalization once all batches are executed
///
/// Return a Future that will be marked finished once all inputs are received
/// and all computation has finished.
///
/// RunAsync and FinishAsync can be though of as exposing a map/reduce pipeline.
Future<> FinishAsync(internal::Executor* executor);

protected:
struct InputBatch {
std::vector<util::optional<compute::ExecBatch>> batches;
int32_t num_ready = 0;
Future<> ready_fut;

explicit InputBatch(std::vector<util::optional<compute::ExecBatch>> batches);
};

ExecNode(ExecPlan* plan, const QueryNode* query_node);

virtual Status RunSyncInternal(int32_t batch_index) = 0;
virtual Status FinishSyncInternal() = 0;

InputBatch* EnsureBatch(int32_t batch_index);
void ReserveBatches(int32_t num_batches);

ExecPlan* plan_;
const QueryNode* const query_node_;
const int32_t num_inputs_;

// XXX also use a per-input batch mutex?
util::Mutex mutex_;
std::vector<std::unique_ptr<InputBatch>> input_batches_;
int32_t finish_at_ = -1;
Future<> finished_fut_;
};

} // namespace engine
} // namespace arrow
Loading

0 comments on commit 66b9b60

Please sign in to comment.