From 5545e04615181f982ed3f545dfd4cfdfae559b5a Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 10 Feb 2023 19:39:27 +0800 Subject: [PATCH] Topic node guard condition in executor (#2074) * add a test Signed-off-by: Chen Lihui * rollback the node guard condition for exectuor Signed-off-by: Chen Lihui --------- Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/executor.hpp | 9 ++++ rclcpp/src/rclcpp/executor.cpp | 19 ++++++++ .../test_add_callback_groups_to_executor.cpp | 47 +++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 65d0a930cb..7f2071cded 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -560,11 +560,20 @@ class Executor virtual void spin_once_impl(std::chrono::nanoseconds timeout); + typedef std::map> + WeakNodesToGuardConditionsMap; + typedef std::map> WeakCallbackGroupsToGuardConditionsMap; + /// maps nodes to guard conditions + WeakNodesToGuardConditionsMap + weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + /// maps callback groups to guard conditions WeakCallbackGroupsToGuardConditionsMap weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 401beb0a73..32b895c1e3 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -112,6 +112,12 @@ Executor::~Executor() } weak_groups_to_guard_conditions_.clear(); + for (const auto & pair : weak_nodes_to_guard_conditions_) { + auto guard_condition = pair.second; + memory_strategy_->remove_guard_condition(guard_condition); + } + weak_nodes_to_guard_conditions_.clear(); + // Finalize the wait set. if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( @@ -274,6 +280,10 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt } }); + const auto & gc = node_ptr->get_notify_guard_condition(); + weak_nodes_to_guard_conditions_[node_ptr] = &gc; + // Add the node's notify condition to the guard condition handles + memory_strategy_->add_guard_condition(gc); weak_nodes_.push_back(node_ptr); } @@ -378,6 +388,9 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node } } + memory_strategy_->remove_guard_condition(&node_ptr->get_notify_guard_condition()); + weak_nodes_to_guard_conditions_.erase(node_ptr); + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); has_executor.store(false); } @@ -706,6 +719,12 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) auto weak_node_ptr = pair.second; if (weak_group_ptr.expired() || weak_node_ptr.expired()) { invalid_group_ptrs.push_back(weak_group_ptr); + auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr); + if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) { + auto guard_condition = node_guard_pair->second; + weak_nodes_to_guard_conditions_.erase(weak_node_ptr); + memory_strategy_->remove_guard_condition(guard_condition); + } } } std::for_each( diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 07ca1e87d8..02fa0b7a94 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -340,6 +340,53 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess EXPECT_TRUE(received_message_future.get()); } +/* + * Test callback group created after spin. + * A subscriber with a new callback group that created after executor spin not received a message + * because the executor can't be triggered while a subscriber created, see + * https://github.com/ros2/rclcpp/issues/2067 +*/ +TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin) +{ + auto node = std::make_shared("my_node", "/ns"); + + // create a publisher to send data + rclcpp::QoS qos = rclcpp::QoS(1).reliable().transient_local(); + rclcpp::Publisher::SharedPtr publisher = + node->create_publisher("topic_name", qos); + publisher->publish(test_msgs::msg::Empty()); + + // create a thread running an executor + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + std::promise received_message_promise; + auto received_message_future = received_message_promise.get_future(); + rclcpp::FutureReturnCode return_code = rclcpp::FutureReturnCode::TIMEOUT; + std::thread executor_thread = std::thread( + [&executor, &received_message_future, &return_code]() { + return_code = executor.spin_until_future_complete(received_message_future, 5s); + }); + + // to create a callback group after spin + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + // expect the subscriber to receive a message + auto sub_callback = [&received_message_promise](test_msgs::msg::Empty::ConstSharedPtr) { + received_message_promise.set_value(true); + }; + // create a subscription using the `cb_grp` callback group + auto options = rclcpp::SubscriptionOptions(); + options.callback_group = cb_grp; + rclcpp::Subscription::SharedPtr subscription = + node->create_subscription("topic_name", qos, sub_callback, options); + + executor_thread.join(); + ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code); + EXPECT_TRUE(received_message_future.get()); +} + /* * Test removing callback group from executor that its not associated with. */