diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7d93595735..946aa18851 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -448,7 +448,7 @@ class Executor RCLCPP_PUBLIC rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group( - WeakCallbackGroupsToNodesMap weak_groups_to_nodes, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, rclcpp::CallbackGroup::SharedPtr group); /// Return true if the node has been added to this executor. @@ -460,7 +460,7 @@ class Executor bool has_node( const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; RCLCPP_PUBLIC rclcpp::CallbackGroup::SharedPtr @@ -476,7 +476,7 @@ class Executor rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true); + bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); /// Remove a callback group from the executor. /** @@ -487,7 +487,7 @@ class Executor remove_callback_group_from_map( rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true); + bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); RCLCPP_PUBLIC bool @@ -497,7 +497,7 @@ class Executor bool get_next_ready_executable_from_map( AnyExecutable & any_executable, - WeakCallbackGroupsToNodesMap weak_groups_to_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); RCLCPP_PUBLIC bool @@ -518,7 +518,7 @@ class Executor */ RCLCPP_PUBLIC virtual void - add_callback_groups_from_nodes_associated_to_executor(); + add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_); /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; @@ -532,10 +532,11 @@ class Executor rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); // Mutex to protect the subsequent memory_strategy_. - std::mutex memory_strategy_mutex_; + mutable std::mutex mutex_; /// The memory strategy: an interface for handling user-defined memory allocation strategies. - memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + memory_strategy::MemoryStrategy::SharedPtr + memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_); /// The context associated with this executor. std::shared_ptr context_; @@ -552,19 +553,24 @@ class Executor WeakNodesToGuardConditionsMap; /// maps nodes to guard conditions - WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + WeakNodesToGuardConditionsMap + weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// maps callback groups associated to nodes - WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + WeakCallbackGroupsToNodesMap + weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// maps callback groups to nodes associated with executor - WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + WeakCallbackGroupsToNodesMap + weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// maps all callback groups to nodes - WeakCallbackGroupsToNodesMap weak_groups_to_nodes_; + WeakCallbackGroupsToNodesMap + weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// nodes that are associated with the executor - std::list weak_nodes_; + std::list + weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); }; namespace executor diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 3b02705b1b..d33df58ee2 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -144,6 +144,7 @@ std::vector Executor::get_all_callback_groups() { std::vector groups; + std::lock_guard guard{mutex_}; for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { groups.push_back(group_node_ptr.first); } @@ -157,6 +158,7 @@ std::vector Executor::get_manually_added_callback_groups() { std::vector groups; + std::lock_guard guard{mutex_}; for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { groups.push_back(group_node_ptr.first); } @@ -167,6 +169,7 @@ std::vector Executor::get_automatically_added_callback_groups_from_nodes() { std::vector groups; + std::lock_guard guard{mutex_}; for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { groups.push_back(group_node_ptr.first); } @@ -233,7 +236,6 @@ Executor::add_callback_group_to_map( } } // Add the node's notify condition to the guard condition handles - std::unique_lock lock(memory_strategy_mutex_); memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition()); } } @@ -244,6 +246,7 @@ Executor::add_callback_group( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { + std::lock_guard guard{mutex_}; this->add_callback_group_to_map( group_ptr, node_ptr, @@ -259,6 +262,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt if (has_executor.exchange(true)) { throw std::runtime_error("Node has already been added to an executor."); } + std::lock_guard guard{mutex_}; for (auto & weak_group : node_ptr->get_callback_groups()) { auto group_ptr = weak_group.lock(); if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && @@ -307,7 +311,6 @@ Executor::remove_callback_group_from_map( throw_from_rcl_error(ret, "Failed to trigger guard condition on callback group remove"); } } - std::unique_lock lock(memory_strategy_mutex_); memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition()); } } @@ -317,6 +320,7 @@ Executor::remove_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) { + std::lock_guard guard{mutex_}; this->remove_callback_group_from_map( group_ptr, weak_groups_associated_with_executor_to_nodes_, @@ -336,6 +340,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node throw std::runtime_error("Node needs to be associated with an executor."); } + std::lock_guard guard{mutex_}; bool found_node = false; auto node_it = weak_nodes_.begin(); while (node_it != weak_nodes_.end()) { @@ -489,6 +494,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr if (memory_strategy == nullptr) { throw std::runtime_error("Received NULL memory strategy in executor."); } + std::lock_guard guard{mutex_}; memory_strategy_ = memory_strategy; } @@ -662,7 +668,7 @@ void Executor::wait_for_work(std::chrono::nanoseconds timeout) { { - std::unique_lock lock(memory_strategy_mutex_); + std::lock_guard guard(mutex_); // Check weak_nodes_ to find any callback group that is not owned // by an executor and add it to the list of callbackgroups for @@ -741,12 +747,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) // check the null handles in the wait set and remove them from the handles in memory strategy // for callback-based entities + std::lock_guard guard(mutex_); memory_strategy_->remove_null_handles(&wait_set_); } rclcpp::node_interfaces::NodeBaseInterface::SharedPtr Executor::get_node_by_group( - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes, + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes, rclcpp::CallbackGroup::SharedPtr group) { if (!group) { @@ -764,6 +772,7 @@ Executor::get_node_by_group( rclcpp::CallbackGroup::SharedPtr Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) { + std::lock_guard guard{mutex_}; for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { auto group = pair.first.lock(); if (!group) { @@ -804,9 +813,11 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) bool Executor::get_next_ready_executable_from_map( AnyExecutable & any_executable, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes) { bool success = false; + std::lock_guard guard{mutex_}; // Check the timers to see if there are any that are ready memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes); if (any_executable.timer) { @@ -893,7 +904,8 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos bool Executor::has_node( const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes) const { return std::find_if( weak_groups_to_nodes.begin(), diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index ee108fe1ec..eec6d6d2e8 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -21,6 +21,7 @@ #include "rclcpp/executor.hpp" #include "rclcpp/memory_strategy.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/strategies/allocator_memory_strategy.hpp" #include "../mocking_utils/patch.hpp" @@ -54,6 +55,7 @@ class DummyExecutor : public rclcpp::Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group( rclcpp::CallbackGroup::SharedPtr group) { + std::lock_guard guard_{mutex_}; // only to make the TSA happy return get_node_by_group(weak_groups_to_nodes_, group); } @@ -87,6 +89,35 @@ MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, !=) MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, <) MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, >) +TEST_F(TestExecutor, add_remove_node_thread_safe) { + using namespace std::chrono_literals; + + // Create an Executor + rclcpp::executors::SingleThreadedExecutor executor; + + auto future = std::async(std::launch::async, [&executor] {executor.spin();}); + + // Add and remove nodes repeatedly + // Test that this does not cause a segfault + size_t num_nodes = 100; + for (size_t i = 0; i < num_nodes; ++i) { + std::ostringstream name; + name << "node_" << i; + auto node = std::make_shared(name.str()); + executor.add_node(node); + // Sleeping here helps exaggerate the issue + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + executor.remove_node(node); + } + std::future_status future_status = std::future_status::timeout; + do { + executor.cancel(); + future_status = future.wait_for(1s); + } while (future_status == std::future_status::timeout); + EXPECT_EQ(future_status, std::future_status::ready); + future.get(); +} + TEST_F(TestExecutor, constructor_bad_guard_condition_init) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_guard_condition_init, RCL_RET_ERROR);