Skip to content

Commit

Permalink
Topic node guard condition in executor (ros2#2074)
Browse files Browse the repository at this point in the history
* add a test

Signed-off-by: Chen Lihui <[email protected]>

* rollback the node guard condition for exectuor

Signed-off-by: Chen Lihui <[email protected]>

---------

Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
Chen Lihui authored and Alberto Soragna committed Apr 29, 2023
1 parent 5bf9083 commit 5545e04
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,20 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;

/// maps nodes to guard conditions
WeakNodesToGuardConditionsMap
weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);

/// maps callback groups to guard conditions
WeakCallbackGroupsToGuardConditionsMap
weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
Expand Down
19 changes: 19 additions & 0 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ Executor::~Executor()
}
weak_groups_to_guard_conditions_.clear();

for (const auto & pair : weak_nodes_to_guard_conditions_) {
auto guard_condition = pair.second;
memory_strategy_->remove_guard_condition(guard_condition);
}
weak_nodes_to_guard_conditions_.clear();

// Finalize the wait set.
if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
Expand Down Expand Up @@ -274,6 +280,10 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
}
});

const auto & gc = node_ptr->get_notify_guard_condition();
weak_nodes_to_guard_conditions_[node_ptr] = &gc;
// Add the node's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(gc);
weak_nodes_.push_back(node_ptr);
}

Expand Down Expand Up @@ -378,6 +388,9 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
}
}

memory_strategy_->remove_guard_condition(&node_ptr->get_notify_guard_condition());
weak_nodes_to_guard_conditions_.erase(node_ptr);

std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic();
has_executor.store(false);
}
Expand Down Expand Up @@ -706,6 +719,12 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
auto weak_node_ptr = pair.second;
if (weak_group_ptr.expired() || weak_node_ptr.expired()) {
invalid_group_ptrs.push_back(weak_group_ptr);
auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr);
if (node_guard_pair != weak_nodes_to_guard_conditions_.end()) {
auto guard_condition = node_guard_pair->second;
weak_nodes_to_guard_conditions_.erase(weak_node_ptr);
memory_strategy_->remove_guard_condition(guard_condition);
}
}
}
std::for_each(
Expand Down
47 changes: 47 additions & 0 deletions rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,53 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess
EXPECT_TRUE(received_message_future.get());
}

/*
* Test callback group created after spin.
* A subscriber with a new callback group that created after executor spin not received a message
* because the executor can't be triggered while a subscriber created, see
* https://github.com/ros2/rclcpp/issues/2067
*/
TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin)
{
auto node = std::make_shared<rclcpp::Node>("my_node", "/ns");

// create a publisher to send data
rclcpp::QoS qos = rclcpp::QoS(1).reliable().transient_local();
rclcpp::Publisher<test_msgs::msg::Empty>::SharedPtr publisher =
node->create_publisher<test_msgs::msg::Empty>("topic_name", qos);
publisher->publish(test_msgs::msg::Empty());

// create a thread running an executor
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
std::promise<bool> received_message_promise;
auto received_message_future = received_message_promise.get_future();
rclcpp::FutureReturnCode return_code = rclcpp::FutureReturnCode::TIMEOUT;
std::thread executor_thread = std::thread(
[&executor, &received_message_future, &return_code]() {
return_code = executor.spin_until_future_complete(received_message_future, 5s);
});

// to create a callback group after spin
std::this_thread::sleep_for(std::chrono::milliseconds(100));
rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);

// expect the subscriber to receive a message
auto sub_callback = [&received_message_promise](test_msgs::msg::Empty::ConstSharedPtr) {
received_message_promise.set_value(true);
};
// create a subscription using the `cb_grp` callback group
auto options = rclcpp::SubscriptionOptions();
options.callback_group = cb_grp;
rclcpp::Subscription<test_msgs::msg::Empty>::SharedPtr subscription =
node->create_subscription<test_msgs::msg::Empty>("topic_name", qos, sub_callback, options);

executor_thread.join();
ASSERT_EQ(rclcpp::FutureReturnCode::SUCCESS, return_code);
EXPECT_TRUE(received_message_future.get());
}

/*
* Test removing callback group from executor that its not associated with.
*/
Expand Down

0 comments on commit 5545e04

Please sign in to comment.