Skip to content

Commit

Permalink
Use Fast-DDS Waitsets instead of listeners (backport ros2#619)
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Company <[email protected]>
  • Loading branch information
richiware authored and Jeffery Hsu committed Aug 30, 2022
1 parent bc3c7a7 commit f7c9425
Show file tree
Hide file tree
Showing 38 changed files with 1,109 additions and 1,091 deletions.
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);
} // namespace rmw_fastrtps_cpp

#endif // RMW_FASTRTPS_CPP__SUBSCRIPTION_HPP_
26 changes: 16 additions & 10 deletions rmw_fastrtps_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
9 changes: 7 additions & 2 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,22 @@ rmw_fastrtps_cpp::create_publisher(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,17 @@ rmw_create_client(
info->response_reader_ = subscriber->create_datareader(
response_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->response_reader_) {
RMW_SET_ERROR_MSG("create_client() failed to create response DataReader");
return nullptr;
}

info->response_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -370,17 +374,22 @@ rmw_create_client(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->request_writer_ = publisher->create_datawriter(
request_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->request_writer_) {
RMW_SET_ERROR_MSG("create_client() failed to create request DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->request_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
15 changes: 12 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,17 @@ rmw_create_service(
info->request_reader_ = subscriber->create_datareader(
request_topic_desc,
reader_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::subscription_matched());

if (!info->request_reader_) {
RMW_SET_ERROR_MSG("create_service() failed to create request DataReader");
return nullptr;
}

info->request_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
Expand Down Expand Up @@ -373,17 +377,22 @@ rmw_create_service(
return nullptr;
}

// Creates DataWriter
// Creates DataWriter with a mask enabling publication_matched calls for the listener
info->response_writer_ = publisher->create_datawriter(
response_topic.topic,
writer_qos,
info->pub_listener_);
info->pub_listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->response_writer_) {
RMW_SET_ERROR_MSG("create_service() failed to create response DataWriter");
return nullptr;
}

// Set the StatusCondition to none to prevent triggering via WaitSets
info->response_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
3 changes: 1 addition & 2 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ rmw_create_subscription(
topic_name,
qos_policies,
subscription_options,
false, // use no keyed topic
true); // create subscription listener
false); // use no keyed topic
if (!subscription) {
return nullptr;
}
Expand Down
27 changes: 16 additions & 11 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener)
bool keyed)
{
/////
// Check input parameters
Expand Down Expand Up @@ -164,7 +163,8 @@ create_subscription(
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, dds_participant]() {
[info, dds_participant]()
{
delete info->listener_;
if (info->type_support_) {
dds_participant->unregister_type(info->type_support_.get_type_name());
Expand Down Expand Up @@ -208,12 +208,10 @@ create_subscription(

/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}
info->listener_ = new (std::nothrow) SubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
return nullptr;
}

/////
Expand Down Expand Up @@ -297,9 +295,14 @@ create_subscription(
return nullptr;
}

// Initialize DataReader's StatusCondition to be notified when new data is available
info->data_reader_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::data_available());

// lambda to delete datareader
auto cleanup_datareader = rcpputils::make_scope_exit(
[subscriber, info]() {
[subscriber, info]()
{
subscriber->delete_datareader(info->data_reader_);
});

Expand All @@ -316,7 +319,8 @@ create_subscription(
return nullptr;
}
auto cleanup_rmw_subscription = rcpputils::make_scope_exit(
[rmw_subscription]() {
[rmw_subscription]()
{
rmw_free(const_cast<char *>(rmw_subscription->topic_name));
rmw_subscription_free(rmw_subscription);
});
Expand Down Expand Up @@ -345,4 +349,5 @@ create_subscription(
info->subscription_gid_.data);
return rmw_subscription;
}

} // namespace rmw_fastrtps_cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options,
bool keyed,
bool create_subscription_listener);
bool keyed);

} // namespace rmw_fastrtps_dynamic_cpp

Expand Down
26 changes: 16 additions & 10 deletions rmw_fastrtps_dynamic_cpp/src/init_rmw_context_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ using rmw_dds_common::msg::ParticipantEntitiesInfo;

static
rmw_ret_t
init_context_impl(rmw_context_t * context)
init_context_impl(
rmw_context_t * context)
{
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
Expand All @@ -65,7 +66,8 @@ init_context_impl(rmw_context_t * context)
(context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) ? 1 : 0,
context->options.enclave,
common_context.get()),
[&](CustomParticipantInfo * participant_info) {
[&](CustomParticipantInfo * participant_info)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_participant(participant_info)) {
RCUTILS_SAFE_FWRITE_TO_STDERR(
"Failed to destroy participant after function: '"
Expand All @@ -92,9 +94,10 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&publisher_options,
false, // our fastrtps typesupport doesn't support keyed topics
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_publisher_t * pub) {
[&](rmw_publisher_t * pub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_publisher(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -119,9 +122,9 @@ init_context_impl(rmw_context_t * context)
"ros_discovery_info",
&qos,
&subscription_options,
false, // our fastrtps typesupport doesn't support keyed topics
true),
[&](rmw_subscription_t * sub) {
false), // our fastrtps typesupport doesn't support keyed topics
[&](rmw_subscription_t * sub)
{
if (RMW_RET_OK != rmw_fastrtps_shared_cpp::destroy_subscription(
eprosima_fastrtps_identifier,
participant_info.get(),
Expand All @@ -139,7 +142,8 @@ init_context_impl(rmw_context_t * context)
std::unique_ptr<rmw_guard_condition_t, std::function<void(rmw_guard_condition_t *)>>
graph_guard_condition(
rmw_fastrtps_shared_cpp::__rmw_create_guard_condition(eprosima_fastrtps_identifier),
[&](rmw_guard_condition_t * p) {
[&](rmw_guard_condition_t * p)
{
rmw_ret_t ret = rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition(p);
if (ret != RMW_RET_OK) {
RMW_SAFE_FWRITE_TO_STDERR(
Expand All @@ -166,7 +170,8 @@ init_context_impl(rmw_context_t * context)
}

common_context->graph_cache.set_on_change_callback(
[guard_condition = graph_guard_condition.get()]() {
[guard_condition = graph_guard_condition.get()]()
{
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
eprosima_fastrtps_identifier,
guard_condition);
Expand All @@ -185,7 +190,8 @@ init_context_impl(rmw_context_t * context)
}

rmw_ret_t
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(rmw_context_t * context)
rmw_fastrtps_dynamic_cpp::increment_context_impl_ref_count(
rmw_context_t * context)
{
assert(context);
assert(context->impl);
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
info->data_writer_ = publisher->create_datawriter(
topic.topic,
writer_qos,
info->listener_);
info->listener_,
eprosima::fastdds::dds::StatusMask::publication_matched());

if (!info->data_writer_) {
RMW_SET_ERROR_MSG("create_publisher() could not create data writer");
return nullptr;
}

info->data_writer_->get_statuscondition().set_enabled_statuses(
eprosima::fastdds::dds::StatusMask::none());

// lambda to delete datawriter
auto cleanup_datawriter = rcpputils::make_scope_exit(
[publisher, info]() {
Expand Down
Loading

0 comments on commit f7c9425

Please sign in to comment.