Skip to content

Commit

Permalink
Allow to add/remove nodes thread safely in rclcpp::Executor (#1505)
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
  • Loading branch information
ivanpauno authored Jan 14, 2021
1 parent 8d2c682 commit 7ccd64c
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
32 changes: 19 additions & 13 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ class Executor
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
WeakCallbackGroupsToNodesMap weak_groups_to_nodes,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);

/// Return true if the node has been added to this executor.
Expand All @@ -460,7 +460,7 @@ class Executor
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const;
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;

RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
Expand All @@ -476,7 +476,7 @@ class Executor
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true);
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Remove a callback group from the executor.
/**
Expand All @@ -487,7 +487,7 @@ class Executor
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true);
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

RCLCPP_PUBLIC
bool
Expand All @@ -497,7 +497,7 @@ class Executor
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
WeakCallbackGroupsToNodesMap weak_groups_to_nodes);
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);

RCLCPP_PUBLIC
bool
Expand All @@ -518,7 +518,7 @@ class Executor
*/
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor();
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
Expand All @@ -532,10 +532,11 @@ class Executor
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

// Mutex to protect the subsequent memory_strategy_.
std::mutex memory_strategy_mutex_;
mutable std::mutex mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);

/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;
Expand All @@ -552,19 +553,24 @@ class Executor
WeakNodesToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_;
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups to nodes associated with executor
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps all callback groups to nodes
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_;
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
};

namespace executor
Expand Down
24 changes: 18 additions & 6 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_all_callback_groups()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
Expand All @@ -157,6 +158,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_manually_added_callback_groups()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
Expand All @@ -167,6 +169,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_automatically_added_callback_groups_from_nodes()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
groups.push_back(group_node_ptr.first);
}
Expand Down Expand Up @@ -233,7 +236,6 @@ Executor::add_callback_group_to_map(
}
}
// Add the node's notify condition to the guard condition handles
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
}
}
Expand All @@ -244,6 +246,7 @@ Executor::add_callback_group(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify)
{
std::lock_guard<std::mutex> guard{mutex_};
this->add_callback_group_to_map(
group_ptr,
node_ptr,
Expand All @@ -259,6 +262,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
Expand Down Expand Up @@ -307,7 +311,6 @@ Executor::remove_callback_group_from_map(
throw_from_rcl_error(ret, "Failed to trigger guard condition on callback group remove");
}
}
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
}
}
Expand All @@ -317,6 +320,7 @@ Executor::remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify)
{
std::lock_guard<std::mutex> guard{mutex_};
this->remove_callback_group_from_map(
group_ptr,
weak_groups_associated_with_executor_to_nodes_,
Expand All @@ -336,6 +340,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
throw std::runtime_error("Node needs to be associated with an executor.");
}

std::lock_guard<std::mutex> guard{mutex_};
bool found_node = false;
auto node_it = weak_nodes_.begin();
while (node_it != weak_nodes_.end()) {
Expand Down Expand Up @@ -489,6 +494,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr
if (memory_strategy == nullptr) {
throw std::runtime_error("Received NULL memory strategy in executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
memory_strategy_ = memory_strategy;
}

Expand Down Expand Up @@ -662,7 +668,7 @@ void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
{
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
std::lock_guard<std::mutex> guard(mutex_);

// Check weak_nodes_ to find any callback group that is not owned
// by an executor and add it to the list of callbackgroups for
Expand Down Expand Up @@ -741,12 +747,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)

// check the null handles in the wait set and remove them from the handles in memory strategy
// for callback-based entities
std::lock_guard<std::mutex> guard(mutex_);
memory_strategy_->remove_null_handles(&wait_set_);
}

rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
Executor::get_node_by_group(
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group)
{
if (!group) {
Expand All @@ -764,6 +772,7 @@ Executor::get_node_by_group(
rclcpp::CallbackGroup::SharedPtr
Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
{
std::lock_guard<std::mutex> guard{mutex_};
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
auto group = pair.first.lock();
if (!group) {
Expand Down Expand Up @@ -804,9 +813,11 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
bool
Executor::get_next_ready_executable_from_map(
AnyExecutable & any_executable,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes)
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes)
{
bool success = false;
std::lock_guard<std::mutex> guard{mutex_};
// Check the timers to see if there are any that are ready
memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
if (any_executable.timer) {
Expand Down Expand Up @@ -893,7 +904,8 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos
bool
Executor::has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap &
weak_groups_to_nodes) const
{
return std::find_if(
weak_groups_to_nodes.begin(),
Expand Down
31 changes: 31 additions & 0 deletions rclcpp/test/rclcpp/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "rclcpp/executor.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/strategies/allocator_memory_strategy.hpp"

#include "../mocking_utils/patch.hpp"
Expand Down Expand Up @@ -54,6 +55,7 @@ class DummyExecutor : public rclcpp::Executor
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group(
rclcpp::CallbackGroup::SharedPtr group)
{
std::lock_guard<std::mutex> guard_{mutex_}; // only to make the TSA happy
return get_node_by_group(weak_groups_to_nodes_, group);
}

Expand Down Expand Up @@ -87,6 +89,35 @@ MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, !=)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, <)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, >)

TEST_F(TestExecutor, add_remove_node_thread_safe) {
using namespace std::chrono_literals;

// Create an Executor
rclcpp::executors::SingleThreadedExecutor executor;

auto future = std::async(std::launch::async, [&executor] {executor.spin();});

// Add and remove nodes repeatedly
// Test that this does not cause a segfault
size_t num_nodes = 100;
for (size_t i = 0; i < num_nodes; ++i) {
std::ostringstream name;
name << "node_" << i;
auto node = std::make_shared<rclcpp::Node>(name.str());
executor.add_node(node);
// Sleeping here helps exaggerate the issue
std::this_thread::sleep_for(std::chrono::milliseconds(1));
executor.remove_node(node);
}
std::future_status future_status = std::future_status::timeout;
do {
executor.cancel();
future_status = future.wait_for(1s);
} while (future_status == std::future_status::timeout);
EXPECT_EQ(future_status, std::future_status::ready);
future.get();
}

TEST_F(TestExecutor, constructor_bad_guard_condition_init) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_guard_condition_init, RCL_RET_ERROR);
Expand Down

0 comments on commit 7ccd64c

Please sign in to comment.