Skip to content

Commit

Permalink
Use Fast-DDS Waitsets instead of listeners (#619)
Browse files Browse the repository at this point in the history
* Initial commit

Signed-off-by: Ricardo González Moreno <[email protected]>

* Working subscriptions and services

Signed-off-by: Ricardo González Moreno <[email protected]>

* Add event support

Signed-off-by: Ricardo González Moreno <[email protected]>

* Initial new event callback

Signed-off-by: Ricardo González Moreno <[email protected]>

* Use guard_condition with event listeners

Signed-off-by: Ricardo González Moreno <[email protected]>

* Remove unused functions

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fixing tests

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fixing uncrustify

Signed-off-by: Ricardo González Moreno <[email protected]>

* Simplify SubListener's get_unread_messages()

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Simplified get_unread_requests() and get_unread_responses()

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Moved Waitset creation/destruction outside loop as suggested

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Renamed variable. Removed unneded checks. Replaced get_unread_count with get_first_untaken_info

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Modified oneliners.

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Cleaned more unneeded checks

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Added RCPPUTILS_TSA_GUARDED_BY macros to previously atomic booleans

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Fixed wrong mutex guard. Renamed and removed break; from TERMINATE_THREAD

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Fix waiting events

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fixing crash

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fix linking error on Windows

Signed-off-by: Ricardo González Moreno <[email protected]>

* Usage of new function get_unread_count(bool)

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fix windows compilation linkage error

Signed-off-by: Ricardo González Moreno <[email protected]>

* Removed unneeded include. Restored some cleanup code. Added some comments.

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Set nullptr after delete

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Detach listener

Signed-off-by: Javier Santiago <[email protected]>
Signed-off-by: Ricardo González Moreno <[email protected]>

* Fix creation of datareader for content filter

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fix wrong usage of take_next_sample with read samples

Signed-off-by: Ricardo González Moreno <[email protected]>

* Apply suggestions

Signed-off-by: Ricardo González Moreno <[email protected]>

* Apply suggestion

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fix rosbag2 failure tests

Signed-off-by: Ricardo González Moreno <[email protected]>

* Apply suggestions

Signed-off-by: Ricardo González Moreno <[email protected]>

* Change usage of StatusMask::operator<<

Signed-off-by: Ricardo González Moreno <[email protected]>

* Protecting a member

Signed-off-by: Ricardo González Moreno <[email protected]>

* Apply suggestions

Signed-off-by: Ricardo González Moreno <[email protected]>

* Fix extra space

Signed-off-by: Ricardo González Moreno <[email protected]>

Signed-off-by: Ricardo González Moreno <[email protected]>
Signed-off-by: Javier Santiago <[email protected]>
Co-authored-by: Javier Santiago <[email protected]>
  • Loading branch information
richiware and jsan-rt authored Aug 24, 2022
1 parent dbee45e commit 1dcfe80
Show file tree
Hide file tree
Showing 38 changed files with 1,068 additions and 1,110 deletions.
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);
} // namespace rmw_fastrtps_cpp

#endif // RMW_FASTRTPS_CPP__SUBSCRIPTION_HPP_
26 changes: 16 additions & 10 deletions rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
9 changes: 7 additions & 2 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,22 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,17 @@ rmw_create_client(
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->response_reader_) {
RMW_SET_ERROR_MSG("create_client() failed to create response DataReader");
return nullptr;
}

info->response_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -375,17 +379,22 @@ rmw_create_client(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
request_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->request_writer_) {
RMW_SET_ERROR_MSG("create_client() failed to create request DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->request_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,17 @@ rmw_create_service(
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->request_reader_) {
RMW_SET_ERROR_MSG("create_service() failed to create request DataReader");
return nullptr;
}

info->request_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -378,17 +382,22 @@ rmw_create_service(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
response_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->response_writer_) {
RMW_SET_ERROR_MSG("create_service() failed to create response DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->response_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ rmw_create_subscription(
topic_name,
&adapted_qos_policies,
subscription_options,
false, // use no keyed topic
true); // create subscription listener
false); // use no keyed topic
if (!subscription) {
return nullptr;
}
Expand Down
27 changes: 16 additions & 11 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener)
bool keyed)
{
/////
// Check input parameters
Expand Down Expand Up @@ -164,7 +163,8 @@ create_subscription(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, dds_participant]()
{
delete info->listener_;
if (info->type_support_) {
dds_participant->unregister_type(info->type_support_.get_type_name());
Expand Down Expand Up @@ -208,12 +208,10 @@ create_subscription(

/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}
info->listener_ = new (std::nothrow) SubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}

/////
Expand Down Expand Up @@ -297,9 +295,14 @@ create_subscription(
return nullptr;
}

// Initialize DataReader's StatusCondition to be notified when new data is available
info->data_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
[subscriber, info]()
{
subscriber->delete_datareader(info->data_reader_);
});

Expand All @@ -316,7 +319,8 @@ create_subscription(
return nullptr;
}
auto cleanup_rmw_subscription = rcpputils::make_scope_exit(
[rmw_subscription]() {
[rmw_subscription]()
{
rmw_free(const_cast<char *>(rmw_subscription->topic_name));
rmw_subscription_free(rmw_subscription);
});
Expand Down Expand Up @@ -345,4 +349,5 @@ create_subscription(
info->subscription_gid_.data);
return rmw_subscription;
}

} // namespace rmw_fastrtps_cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);

} // namespace rmw_fastrtps_dynamic_cpp

Expand Down
26 changes: 16 additions & 10 deletions rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
Loading

0 comments on commit 1dcfe80

Please sign in to comment.