Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to add/remove nodes thread safely in rclcpp::Executor #1505

Merged
merged 6 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ class Executor
RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_by_group(
WeakCallbackGroupsToNodesMap weak_groups_to_nodes,
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group);

/// Return true if the node has been added to this executor.
Expand All @@ -460,7 +460,7 @@ class Executor
bool
has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const;
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;

RCLCPP_PUBLIC
rclcpp::CallbackGroup::SharedPtr
Expand All @@ -476,7 +476,7 @@ class Executor
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true);
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

/// Remove a callback group from the executor.
/**
Expand All @@ -487,7 +487,7 @@ class Executor
remove_callback_group_from_map(
rclcpp::CallbackGroup::SharedPtr group_ptr,
WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
bool notify = true);
bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);

RCLCPP_PUBLIC
bool
Expand All @@ -497,7 +497,7 @@ class Executor
bool
get_next_ready_executable_from_map(
AnyExecutable & any_executable,
WeakCallbackGroupsToNodesMap weak_groups_to_nodes);
const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);

RCLCPP_PUBLIC
bool
Expand All @@ -518,7 +518,7 @@ class Executor
*/
RCLCPP_PUBLIC
virtual void
add_callback_groups_from_nodes_associated_to_executor();
add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
Expand All @@ -532,10 +532,11 @@ class Executor
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

// Mutex to protect the subsequent memory_strategy_.
std::mutex memory_strategy_mutex_;
mutable std::mutex mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
memory_strategy::MemoryStrategy::SharedPtr
memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);

/// The context associated with this executor.
std::shared_ptr<rclcpp::Context> context_;
Expand All @@ -552,19 +553,24 @@ class Executor
WeakNodesToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups associated to nodes
WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_;
WeakCallbackGroupsToNodesMap
weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups to nodes associated with executor
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps all callback groups to nodes
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_;
WeakCallbackGroupsToNodesMap
weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// nodes that are associated with the executor
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
};

namespace executor
Expand Down
21 changes: 15 additions & 6 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_all_callback_groups()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
Expand All @@ -157,6 +158,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_manually_added_callback_groups()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) {
groups.push_back(group_node_ptr.first);
}
Expand All @@ -167,6 +169,7 @@ std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_automatically_added_callback_groups_from_nodes()
{
std::vector<rclcpp::CallbackGroup::WeakPtr> groups;
std::lock_guard<std::mutex> guard{mutex_};
for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) {
groups.push_back(group_node_ptr.first);
}
Expand Down Expand Up @@ -233,7 +236,6 @@ Executor::add_callback_group_to_map(
}
}
// Add the node's notify condition to the guard condition handles
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
}
}
Expand All @@ -244,6 +246,7 @@ Executor::add_callback_group(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify)
{
std::lock_guard<std::mutex> guard{mutex_};
this->add_callback_group_to_map(
group_ptr,
node_ptr,
Expand All @@ -259,6 +262,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
if (has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
for (auto & weak_group : node_ptr->get_callback_groups()) {
auto group_ptr = weak_group.lock();
if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() &&
Expand Down Expand Up @@ -307,7 +311,6 @@ Executor::remove_callback_group_from_map(
throw_from_rcl_error(ret, "Failed to trigger guard condition on callback group remove");
}
}
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
}
}
Expand All @@ -317,6 +320,7 @@ Executor::remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify)
{
std::lock_guard<std::mutex> guard{mutex_};
this->remove_callback_group_from_map(
group_ptr,
weak_groups_associated_with_executor_to_nodes_,
Expand All @@ -336,6 +340,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
throw std::runtime_error("Node needs to be associated with an executor.");
}

std::lock_guard<std::mutex> guard{mutex_};
bool found_node = false;
auto node_it = weak_nodes_.begin();
while (node_it != weak_nodes_.end()) {
Expand Down Expand Up @@ -489,6 +494,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr
if (memory_strategy == nullptr) {
throw std::runtime_error("Received NULL memory strategy in executor.");
}
std::lock_guard<std::mutex> guard{mutex_};
memory_strategy_ = memory_strategy;
}

Expand Down Expand Up @@ -662,7 +668,7 @@ void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
{
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
std::lock_guard<std::mutex> guard(mutex_);

// Check weak_nodes_ to find any callback group that is not owned
// by an executor and add it to the list of callbackgroups for
Expand Down Expand Up @@ -741,12 +747,13 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)

// check the null handles in the wait set and remove them from the handles in memory strategy
// for callback-based entities
std::lock_guard<std::mutex> guard(mutex_);
memory_strategy_->remove_null_handles(&wait_set_);
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
}

rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
Executor::get_node_by_group(
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes,
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
rclcpp::CallbackGroup::SharedPtr group)
{
if (!group) {
Expand All @@ -764,6 +771,7 @@ Executor::get_node_by_group(
rclcpp::CallbackGroup::SharedPtr
Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
{
std::lock_guard<std::mutex> guard{mutex_};
for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) {
auto group = pair.first.lock();
if (!group) {
Expand Down Expand Up @@ -804,9 +812,10 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
bool
Executor::get_next_ready_executable_from_map(
AnyExecutable & any_executable,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes)
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)
{
bool success = false;
std::lock_guard<std::mutex> guard{mutex_};
// Check the timers to see if there are any that are ready
memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes);
if (any_executable.timer) {
Expand Down Expand Up @@ -893,7 +902,7 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos
bool
Executor::has_node(
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const
const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const
{
return std::find_if(
weak_groups_to_nodes.begin(),
Expand Down
31 changes: 31 additions & 0 deletions rclcpp/test/rclcpp/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "rclcpp/executor.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/strategies/allocator_memory_strategy.hpp"

#include "../mocking_utils/patch.hpp"
Expand Down Expand Up @@ -54,6 +55,7 @@ class DummyExecutor : public rclcpp::Executor
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group(
rclcpp::CallbackGroup::SharedPtr group)
{
std::lock_guard<std::mutex> guard_{mutex_}; // only to make the TSA happy
return get_node_by_group(weak_groups_to_nodes_, group);
}

Expand Down Expand Up @@ -87,6 +89,35 @@ MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, !=)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, <)
MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, >)

TEST_F(TestExecutor, add_remove_node_thread_safe) {
using namespace std::chrono_literals;

// Create an Executor
rclcpp::executors::SingleThreadedExecutor executor;

auto future = std::async(std::launch::async, [&executor] {executor.spin();});

// Add and remove nodes repeatedly
// Test that this does not cause a segfault
size_t num_nodes = 100;
for (size_t i = 0; i < num_nodes; ++i) {
std::ostringstream name;
name << "node_" << i;
auto node = std::make_shared<rclcpp::Node>(name.str());
executor.add_node(node);
// Sleeping here helps exaggerate the issue
std::this_thread::sleep_for(std::chrono::milliseconds(1));
executor.remove_node(node);
}
std::future_status future_status = std::future_status::timeout;
do {
executor.cancel();
future_status = future.wait_for(1s);
} while (future_status == std::future_status::timeout);
EXPECT_EQ(future_status, std::future_status::ready);
future.get();
}

TEST_F(TestExecutor, constructor_bad_guard_condition_init) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_guard_condition_init, RCL_RET_ERROR);
Expand Down