From 29475a62b61c1b3a68250062be434cf3c4a64b15 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Tue, 23 Mar 2021 15:11:06 +0800 Subject: [PATCH 01/33] to support a feature of content filtered topic Signed-off-by: Chen Lihui --- rmw_connextdds/src/rmw_api_impl_ndds.cpp | 22 ++ .../include/rmw_connextdds/dds_api.hpp | 1 + .../include/rmw_connextdds/rmw_api_impl.hpp | 16 + .../include/rmw_connextdds/rmw_impl.hpp | 39 ++- .../src/common/rmw_event.cpp | 16 +- rmw_connextdds_common/src/common/rmw_impl.cpp | 295 +++++++++++++++++- .../src/common/rmw_subscription.cpp | 48 +++ .../src/ndds/dds_api_ndds.cpp | 13 +- .../src/rtime/dds_api_rtime.cpp | 2 + .../src/rmw_api_impl_rtime.cpp | 20 ++ 10 files changed, 447 insertions(+), 25 deletions(-) diff --git a/rmw_connextdds/src/rmw_api_impl_ndds.cpp b/rmw_connextdds/src/rmw_api_impl_ndds.cpp index 81af9ee7..3bf150ef 100644 --- a/rmw_connextdds/src/rmw_api_impl_ndds.cpp +++ b/rmw_connextdds/src/rmw_api_impl_ndds.cpp @@ -637,6 +637,28 @@ rmw_subscription_get_actual_qos( } +rmw_ret_t +rmw_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + return rmw_api_connextdds_subscription_set_cft_expression_parameters( + subscription, filter_expression, expression_parameters); +} + + +rmw_ret_t +rmw_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters) +{ + return rmw_api_connextdds_subscription_get_cft_expression_parameters( + subscription, filter_expression, expression_parameters); +} + + rmw_ret_t rmw_destroy_subscription( rmw_node_t * node, diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index f342e4b3..d05fe218 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -72,6 +72,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * cft_expression_parameters, DDS_TopicDescription ** const cft_out); rmw_ret_t diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp index e70dd802..697ade38 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp @@ -430,6 +430,22 @@ rmw_api_connextdds_subscription_get_actual_qos( rmw_qos_profile_t * qos); +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters); + + +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters); + + RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_destroy_subscription( diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 19d55c41..ca8aaec1 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -353,7 +353,7 @@ class RMW_Connext_Subscriber RMW_Connext_SubscriberStatusCondition * condition() { - return &this->status_condition; + return this->status_condition.get(); } const rmw_gid_t * gid() const @@ -396,7 +396,7 @@ class RMW_Connext_Subscriber } if (this->internal) { - return this->status_condition.trigger_loan_guard_condition(this->loan_len > 0); + return this->status_condition->trigger_loan_guard_condition(this->loan_len > 0); } return RMW_RET_OK; @@ -437,6 +437,18 @@ class RMW_Connext_Subscriber rmw_message_info_t * const message_info, bool * const taken); + rmw_ret_t + set_cft_expression_parameters( + const char * filter_expression, + const rcutils_string_array_t * expression_parameters + ); + + rmw_ret_t + get_cft_expression_parameters( + char ** filter_expression, + rcutils_string_array_t * expression_parameters + ); + bool has_data() { @@ -467,6 +479,21 @@ class RMW_Connext_Subscriber return this->dds_topic; } + static std::string get_atomic_id() { + static std::atomic_uint64_t id; + return std::to_string(id++); + } + + void set_node(const rmw_node_t * node) + { + this->node = node; + } + + bool is_cft_supported() + { + return nullptr != dds_topic_cft; + } + const bool internal; const bool ignore_local; @@ -478,12 +505,16 @@ class RMW_Connext_Subscriber RMW_Connext_MessageTypeSupport * type_support; rmw_gid_t ros_gid; const bool created_topic; - RMW_Connext_SubscriberStatusCondition status_condition; + std::shared_ptr status_condition; RMW_Connext_UntypedSampleSeq loan_data; DDS_SampleInfoSeq loan_info; - size_t loan_len; + std::atomic_uint64_t loan_len; size_t loan_next; std::mutex loan_mutex; + std::mutex cft_mutex; + const rmw_node_t * node; + std::string fqtopic_name; + rmw_qos_profile_t qos_policies; RMW_Connext_Subscriber( rmw_context_impl_t * const ctx, diff --git a/rmw_connextdds_common/src/common/rmw_event.cpp b/rmw_connextdds_common/src/common/rmw_event.cpp index 23acf6ee..4e124200 100644 --- a/rmw_connextdds_common/src/common/rmw_event.cpp +++ b/rmw_connextdds_common/src/common/rmw_event.cpp @@ -124,12 +124,16 @@ rmw_api_connextdds_take_event( } else { condition = RMW_Connext_Event::publisher(event_handle)->condition(); } - rmw_ret_t rc = condition->get_status(event_handle->event_type, event_info); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR_SET("failed to get status from DDS entity") - return rc; + if (nullptr != condition) { + rmw_ret_t rc = condition->get_status(event_handle->event_type, event_info); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get status from DDS entity") + return rc; + } + + *taken = true; + } else { + RMW_CONNEXT_LOG_WARNING("condition was reset because of resetting cft") } - - *taken = true; return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 29b671d7..4efba0a4 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1095,7 +1095,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( dds_topic_cft(dds_topic_cft), type_support(type_support), created_topic(created_topic), - status_condition(dds_reader, ignore_local, internal) + status_condition(new RMW_Connext_SubscriberStatusCondition(dds_reader, ignore_local, internal)) { rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); @@ -1215,11 +1215,24 @@ RMW_Connext_Subscriber::create( }); DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic); - - if (nullptr != cft_name) { - rmw_ret_t cft_rc = - rmw_connextdds_create_contentfilteredtopic( - ctx, dp, topic, cft_name, cft_filter, &cft_topic); + std::string cft_topic_name; + if ( nullptr != cft_name || + nullptr != subscriber_options->filter_expression) { + rmw_ret_t cft_rc = RMW_RET_OK; + + if (nullptr != cft_name) { + cft_rc = + rmw_connextdds_create_contentfilteredtopic( + ctx, dp, topic, cft_name, cft_filter, NULL, &cft_topic); + } else if (nullptr != subscriber_options->filter_expression) { + cft_topic_name = + fqtopic_name+"_ContentFilterTopic"+RMW_Connext_Subscriber::get_atomic_id(); + cft_rc = + rmw_connextdds_create_contentfilteredtopic( + ctx, dp, topic, cft_topic_name.c_str(), + subscriber_options->filter_expression, + subscriber_options->expression_parameters, &cft_topic); + } if (RMW_RET_OK != cft_rc) { if (RMW_RET_UNSUPPORTED != cft_rc) { @@ -1302,6 +1315,9 @@ RMW_Connext_Subscriber::create( RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") return nullptr; } + rmw_sub_impl->fqtopic_name = fqtopic_name; + rmw_sub_impl->qos_policies = *qos_policies; + scope_exit_type_unregister.cancel(); scope_exit_topic_delete.cancel(); scope_exit_dds_reader_delete.cancel(); @@ -1317,7 +1333,7 @@ RMW_Connext_Subscriber::finalize() (void *)this, this->type_support->type_name()) // Make sure subscriber's condition is detached from any waitset - this->status_condition.invalidate(); + this->status_condition->invalidate(); if (this->loan_len > 0) { this->loan_next = this->loan_len; @@ -1326,7 +1342,7 @@ RMW_Connext_Subscriber::finalize() } } - if (DDS_RETCODE_OK != + if (nullptr != this->dds_reader && DDS_RETCODE_OK != DDS_Subscriber_delete_datareader( this->dds_subscriber(), this->dds_reader)) { @@ -1488,6 +1504,256 @@ RMW_Connext_Subscriber::take_serialized( return rc; } +rmw_ret_t +RMW_Connext_Subscriber::set_cft_expression_parameters( + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + std::lock_guard lock(this->cft_mutex); + if (nullptr == this->dds_topic_cft) { + // case 1: reset if there is no cft topic exist + if (filter_expression && *filter_expression == '\0') { + RMW_CONNEXT_LOG_DEBUG("current subscriber has no content filter topic") + return RMW_RET_OK; + } + } else { + if (nullptr != filter_expression && *filter_expression != '\0') { + // case 2: normally to set filter expre if cft already support + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(dds_topic_cft); + + struct DDS_StringSeq cft_parameters; + DDS_StringSeq_initialize(&cft_parameters); + if (expression_parameters) { + DDS_StringSeq_ensure_length( + &cft_parameters, expression_parameters->size, expression_parameters->size); + DDS_StringSeq_from_array(&cft_parameters, + const_cast(expression_parameters->data), + expression_parameters->size); + } + + DDS_ReturnCode_t ret = + DDS_ContentFilteredTopic_set_expression(cft_topic, filter_expression, &cft_parameters); + DDS_StringSeq_finalize(&cft_parameters); + if (DDS_RETCODE_OK != ret) + { + RMW_CONNEXT_LOG_ERROR("failed to set content-filtered topic") + return RMW_RET_ERROR; + } + return RMW_RET_OK; + } + } + + if (this->loan_len != 0) { + RMW_CONNEXT_LOG_ERROR("subscriber can't be reset because a message data loaned") + return RMW_RET_ERROR; + } + if (RMW_RET_OK != + rmw_connextdds_graph_on_subscriber_deleted( + ctx, node, this)) + { + RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") + return RMW_RET_ERROR; + } + + this->status_condition->invalidate(); + + if (DDS_RETCODE_OK != + DDS_Subscriber_delete_datareader(this->dds_subscriber(), this->dds_reader)) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to delete DDS DataReader") + return RMW_RET_ERROR; + } + + DDS_DomainParticipant * const dp = this->dds_participant(); + DDS_TopicDescription * sub_topic = nullptr; + if (nullptr == this->dds_topic_cft) { + // case 3: create a new cft if there is no cft topic and filter_expression is not empty("") + // remove old reader + // create cft topic and a new reader + std::string cft_topic_name = + fqtopic_name+"_ContentFilterTopic"+RMW_Connext_Subscriber::get_atomic_id(); + + rmw_ret_t cft_rc = + rmw_connextdds_create_contentfilteredtopic( + ctx, dp, dds_topic, cft_topic_name.c_str(), + filter_expression, + expression_parameters, &sub_topic); + + if (RMW_RET_OK != cft_rc) { + RMW_CONNEXT_LOG_ERROR("failed to create content filter topic") + return RMW_RET_ERROR; + } + + this->dds_topic_cft = sub_topic; + } else { + // case 4: delete cft topic and create a normal reader if filter_expression is empty("") + // that means reset + // remove content filter topic + if (nullptr != this->dds_topic_cft) { + rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( + ctx, dp, this->dds_topic_cft); + + if (RMW_RET_OK != cft_rc) { + return cft_rc; + } + } + + this->dds_topic_cft = nullptr; + + // create normal topic and a new reader + sub_topic = DDS_Topic_as_topicdescription(dds_topic); + } + + #if !RMW_CONNEXT_DDS_API_PRO_LEGACY + DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; + #else + DDS_DataReaderQos dr_qos; + if (DDS_RETCODE_OK != DDS_DataReaderQos_initialize(&dr_qos)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datareader qos") + return RMW_RET_ERROR; + } + #endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ + + DDS_DataReaderQos * const dr_qos_ptr = &dr_qos; + auto scope_exit_dr_qos_delete = + rcpputils::make_scope_exit( + [dr_qos_ptr]() + { + DDS_DataReaderQos_finalize(dr_qos_ptr); + }); + + if (DDS_RETCODE_OK != + DDS_Subscriber_get_default_datareader_qos(this->dds_subscriber(), &dr_qos)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to get default reader QoS") + return RMW_RET_ERROR; + } + + DDS_DataReader * dds_reader = + rmw_connextdds_create_datareader( + ctx, + dp, + this->dds_subscriber(), + &qos_policies, + nullptr, + internal, + type_support, + sub_topic, + &dr_qos); + + if (nullptr == dds_reader) { + RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") + return RMW_RET_ERROR; + } + + this->dds_reader = dds_reader; + + this->status_condition.reset( + new RMW_Connext_SubscriberStatusCondition( + this->dds_reader, this->ignore_local, this->internal)); + rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); + + if (!internal) { + if (RMW_RET_OK != this->enable()) { + RMW_CONNEXT_LOG_ERROR("failed to enable subscription") + return RMW_RET_ERROR; + } + + if (RMW_RET_OK != + rmw_connextdds_graph_on_subscriber_created( + ctx, node, this)) + { + RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") + return RMW_RET_ERROR; + } + } + + return RMW_RET_OK; +} + +rmw_ret_t +RMW_Connext_Subscriber::get_cft_expression_parameters( + char ** filter_expression, + rcutils_string_array_t * expression_parameters) +{ + std::lock_guard lock(this->cft_mutex); + if (nullptr == this->dds_topic_cft) { + RMW_SET_ERROR_MSG("this subscriber has not created a contentfilteredtopic."); + return RMW_RET_ERROR; + } + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(dds_topic_cft); + + int parameters_len; + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + // get filter_expression + const char* expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); + if (!expression) { + RMW_SET_ERROR_MSG("failed to get filter expression"); + return RMW_RET_ERROR; + } + + *filter_expression = rcutils_strdup(expression, allocator); + if (NULL == *filter_expression) { + RMW_SET_ERROR_MSG("failed to duplicate string"); + return RMW_RET_BAD_ALLOC; + } + auto scope_exit_filter_expression_delete = + rcpputils::make_scope_exit( + [filter_expression, allocator]() + { + if (*filter_expression) { + allocator.deallocate(*filter_expression, allocator.state); + *filter_expression = nullptr; + } + }); + + // get parameters + struct DDS_StringSeq parameters; + DDS_ReturnCode_t status = + DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); + if (DDS_RETCODE_OK != status) { + RMW_SET_ERROR_MSG("failed to get expression parameters"); + return RMW_RET_ERROR; + } + auto scope_exit_parameters_delete = + rcpputils::make_scope_exit( + [¶meters]() + { + DDS_StringSeq_finalize(¶meters); + }); + + parameters_len = DDS_StringSeq_get_length(¶meters); + rcutils_ret_t rcutils_ret = rcutils_string_array_init(expression_parameters, parameters_len, &allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); + return RMW_RET_ERROR; + } + auto scope_exit_expression_parameters_delete = + rcpputils::make_scope_exit( + [expression_parameters]() + { + if (RCUTILS_RET_OK != rcutils_string_array_fini(expression_parameters)) { + RCUTILS_LOG_ERROR("Error while finalizing expression parameter due to another error"); + } + }); + for (int i = 0; i < parameters_len; ++i) { + char * parameter = rcutils_strdup(DDS_StringSeq_get(¶meters, i), allocator); + if (!parameter) { + RMW_SET_ERROR_MSG("failed to allocate memory for parameter"); + return RMW_RET_BAD_ALLOC; + } + expression_parameters->data[i] = parameter; + } + + scope_exit_filter_expression_delete.cancel(); + scope_exit_expression_parameters_delete.cancel(); + return RMW_RET_OK; +} + rmw_ret_t RMW_Connext_Subscriber::loan_messages() { @@ -1507,7 +1773,7 @@ RMW_Connext_Subscriber::loan_messages() this->type_support->type_name(), this->loan_len) if (this->loan_len > 0) { - return this->status_condition.set_data_available(true); + return this->status_condition->set_data_available(true); } return RMW_RET_OK; @@ -1523,16 +1789,16 @@ RMW_Connext_Subscriber::return_messages() "[%s] return loaned messages: %lu", this->type_support->type_name(), this->loan_len) - this->loan_len = 0; - this->loan_next = 0; - rmw_ret_t rc_result = RMW_RET_OK; rmw_ret_t rc = rmw_connextdds_return_samples(this); if (RMW_RET_OK != rc) { rc_result = rc; } - rc = this->status_condition.set_data_available(false); + this->loan_len = 0; + this->loan_next = 0; + + rc = this->status_condition->set_data_available(false); if (RMW_RET_OK != rc) { rc_result = rc; } @@ -1730,7 +1996,7 @@ rmw_connextdds_create_subscriber( "failed to allocate RMW_Connext_Subscriber") return nullptr; } - + rmw_sub_impl->set_node(node); auto scope_exit_rmw_reader_impl_delete = rcpputils::make_scope_exit( [rmw_sub_impl]() @@ -1774,6 +2040,7 @@ rmw_connextdds_create_subscriber( topic_name_len + 1); rmw_subscriber->options = *subscriber_options; rmw_subscriber->can_loan_messages = false; + rmw_subscriber->is_cft_supported = rmw_sub_impl->is_cft_supported(); if (!internal) { if (RMW_RET_OK != rmw_sub_impl->enable()) { diff --git a/rmw_connextdds_common/src/common/rmw_subscription.cpp b/rmw_connextdds_common/src/common/rmw_subscription.cpp index 378322e9..2e088496 100644 --- a/rmw_connextdds_common/src/common/rmw_subscription.cpp +++ b/rmw_connextdds_common/src/common/rmw_subscription.cpp @@ -160,6 +160,54 @@ rmw_api_connextdds_subscription_get_actual_qos( } +rmw_ret_t +rmw_api_connextdds_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(filter_expression, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + rmw_ret_t rc = sub_impl->set_cft_expression_parameters(filter_expression, expression_parameters); + subscription->is_cft_supported = sub_impl->is_cft_supported(); + + return rc; +} + + +rmw_ret_t +rmw_api_connextdds_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(filter_expression, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(expression_parameters, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + rmw_ret_t rc = sub_impl->get_cft_expression_parameters(filter_expression, expression_parameters); + + return rc; +} + + rmw_ret_t rmw_api_connextdds_destroy_subscription( rmw_node_t * node, diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 0e4dfc12..e0084fd2 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -217,6 +217,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -224,13 +225,23 @@ rmw_connextdds_create_contentfilteredtopic( RMW_CONNEXT_ASSERT(nullptr != cft_filter) struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; - DDS_StringSeq_ensure_length(&cft_parameters, 0, 0); + DDS_StringSeq_initialize(&cft_parameters); + if (cft_expression_parameters) { + DDS_StringSeq_ensure_length( + &cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size); + DDS_StringSeq_from_array(&cft_parameters, + const_cast(cft_expression_parameters->data), + cft_expression_parameters->size); + } else { + DDS_StringSeq_ensure_length(&cft_parameters, 0, 0); + } *cft_out = nullptr; DDS_ContentFilteredTopic * cft_topic = DDS_DomainParticipant_create_contentfilteredtopic( dp, cft_name, base_topic, cft_filter, &cft_parameters); + DDS_StringSeq_finalize(&cft_parameters); if (nullptr == cft_topic) { RMW_CONNEXT_LOG_ERROR_A_SET( "failed to create content-filtered topic: " diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index d4397ab2..dbc9c178 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -867,6 +867,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -874,6 +875,7 @@ rmw_connextdds_create_contentfilteredtopic( UNUSED_ARG(base_topic); UNUSED_ARG(cft_name); UNUSED_ARG(cft_filter); + UNUSED_ARG(cft_expression_parameters); UNUSED_ARG(cft_out); return RMW_RET_UNSUPPORTED; } diff --git a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp index 9760208b..3b280046 100644 --- a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp +++ b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp @@ -637,6 +637,26 @@ rmw_subscription_get_actual_qos( } +rmw_ret_t +rmw_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + return RMW_RET_UNSUPPORTED; +} + + +rmw_ret_t +rmw_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters) +{ + return RMW_RET_UNSUPPORTED; +} + + rmw_ret_t rmw_destroy_subscription( rmw_node_t * node, From e45098ca51afcbd2bf80d011f02977f8ab396b4a Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Wed, 24 Mar 2021 13:53:18 +0800 Subject: [PATCH 02/33] fix for unscritify Signed-off-by: Chen Lihui --- .../include/rmw_connextdds/rmw_impl.hpp | 4 +- rmw_connextdds_common/src/common/rmw_impl.cpp | 38 ++++++++++--------- .../src/ndds/dds_api_ndds.cpp | 3 +- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index ca8aaec1..fc886e8c 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -479,7 +480,8 @@ class RMW_Connext_Subscriber return this->dds_topic; } - static std::string get_atomic_id() { + static std::string get_atomic_id() + { static std::atomic_uint64_t id; return std::to_string(id++); } diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 4efba0a4..917cb2b1 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1216,8 +1216,9 @@ RMW_Connext_Subscriber::create( DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic); std::string cft_topic_name; - if ( nullptr != cft_name || - nullptr != subscriber_options->filter_expression) { + if (nullptr != cft_name || + nullptr != subscriber_options->filter_expression) + { rmw_ret_t cft_rc = RMW_RET_OK; if (nullptr != cft_name) { @@ -1226,7 +1227,7 @@ RMW_Connext_Subscriber::create( ctx, dp, topic, cft_name, cft_filter, NULL, &cft_topic); } else if (nullptr != subscriber_options->filter_expression) { cft_topic_name = - fqtopic_name+"_ContentFilterTopic"+RMW_Connext_Subscriber::get_atomic_id(); + fqtopic_name + "_ContentFilterTopic" + RMW_Connext_Subscriber::get_atomic_id(); cft_rc = rmw_connextdds_create_contentfilteredtopic( ctx, dp, topic, cft_topic_name.c_str(), @@ -1527,7 +1528,8 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( if (expression_parameters) { DDS_StringSeq_ensure_length( &cft_parameters, expression_parameters->size, expression_parameters->size); - DDS_StringSeq_from_array(&cft_parameters, + DDS_StringSeq_from_array( + &cft_parameters, const_cast(expression_parameters->data), expression_parameters->size); } @@ -1535,8 +1537,7 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( DDS_ReturnCode_t ret = DDS_ContentFilteredTopic_set_expression(cft_topic, filter_expression, &cft_parameters); DDS_StringSeq_finalize(&cft_parameters); - if (DDS_RETCODE_OK != ret) - { + if (DDS_RETCODE_OK != ret) { RMW_CONNEXT_LOG_ERROR("failed to set content-filtered topic") return RMW_RET_ERROR; } @@ -1573,7 +1574,7 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( // remove old reader // create cft topic and a new reader std::string cft_topic_name = - fqtopic_name+"_ContentFilterTopic"+RMW_Connext_Subscriber::get_atomic_id(); + fqtopic_name + "_ContentFilterTopic" + RMW_Connext_Subscriber::get_atomic_id(); rmw_ret_t cft_rc = rmw_connextdds_create_contentfilteredtopic( @@ -1606,15 +1607,15 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( sub_topic = DDS_Topic_as_topicdescription(dds_topic); } - #if !RMW_CONNEXT_DDS_API_PRO_LEGACY - DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; - #else - DDS_DataReaderQos dr_qos; - if (DDS_RETCODE_OK != DDS_DataReaderQos_initialize(&dr_qos)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datareader qos") - return RMW_RET_ERROR; - } - #endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ +#if !RMW_CONNEXT_DDS_API_PRO_LEGACY + DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; +#else + DDS_DataReaderQos dr_qos; + if (DDS_RETCODE_OK != DDS_DataReaderQos_initialize(&dr_qos)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datareader qos") + return RMW_RET_ERROR; + } +#endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ DDS_DataReaderQos * const dr_qos_ptr = &dr_qos; auto scope_exit_dr_qos_delete = @@ -1690,7 +1691,7 @@ RMW_Connext_Subscriber::get_cft_expression_parameters( rcutils_allocator_t allocator = rcutils_get_default_allocator(); // get filter_expression - const char* expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); + const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); if (!expression) { RMW_SET_ERROR_MSG("failed to get filter expression"); return RMW_RET_ERROR; @@ -1727,7 +1728,8 @@ RMW_Connext_Subscriber::get_cft_expression_parameters( }); parameters_len = DDS_StringSeq_get_length(¶meters); - rcutils_ret_t rcutils_ret = rcutils_string_array_init(expression_parameters, parameters_len, &allocator); + rcutils_ret_t rcutils_ret = + rcutils_string_array_init(expression_parameters, parameters_len, &allocator); if (rcutils_ret != RCUTILS_RET_OK) { RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); return RMW_RET_ERROR; diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index e0084fd2..e7f2b6c0 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -229,7 +229,8 @@ rmw_connextdds_create_contentfilteredtopic( if (cft_expression_parameters) { DDS_StringSeq_ensure_length( &cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size); - DDS_StringSeq_from_array(&cft_parameters, + DDS_StringSeq_from_array( + &cft_parameters, const_cast(cft_expression_parameters->data), cft_expression_parameters->size); } else { From 2d5903e19fe88d636d4be40e78c295fa028436f5 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Thu, 25 Mar 2021 16:35:35 +0800 Subject: [PATCH 03/33] address review Co-authored-by: Andrea Sorbini Signed-off-by: Chen Lihui --- .../include/rmw_connextdds/dds_api.hpp | 12 + .../include/rmw_connextdds/rmw_impl.hpp | 16 +- rmw_connextdds_common/src/common/rmw_impl.cpp | 374 +++++++----------- .../src/ndds/dds_api_ndds.cpp | 153 ++++++- .../src/rtime/dds_api_rtime.cpp | 24 ++ 5 files changed, 327 insertions(+), 252 deletions(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index d05fe218..d982693b 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -208,6 +208,18 @@ rmw_connextdds_enable_security( rmw_context_impl_t * const ctx, DDS_DomainParticipantQos * const qos); +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters); + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * cft_params_out); + // Define some macro aliases for security-related properties #ifndef DDS_SECURITY_PROPERTY_PREFIX #define DDS_SECURITY_PROPERTY_PREFIX \ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index fc886e8c..80ce8add 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -282,7 +282,7 @@ class RMW_Connext_Subscriber const char * const cft_filter = nullptr); rmw_ret_t - finalize(); + finalize(const bool reset_cft = false); DDS_DataReader * reader() const @@ -517,6 +517,7 @@ class RMW_Connext_Subscriber const rmw_node_t * node; std::string fqtopic_name; rmw_qos_profile_t qos_policies; + rmw_subscription_options_t subscriber_options; RMW_Connext_Subscriber( rmw_context_impl_t * const ctx, @@ -528,6 +529,19 @@ class RMW_Connext_Subscriber DDS_TopicDescription * const dds_topic_cft, const bool internal); + static + DDS_DataReader * + initialize_datareader( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const dp, + DDS_Subscriber * const sub, + const std::string & fqtopic_name, + const rmw_qos_profile_t * const qos_policies, + RMW_Connext_MessageTypeSupport * const type_support, + const rmw_subscription_options_t * const subscriber_options, + const bool internal, + DDS_TopicDescription * sub_topic); + friend class RMW_Connext_SubscriberStatusCondition; }; diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 917cb2b1..4857e8c2 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1244,6 +1244,67 @@ RMW_Connext_Subscriber::create( } } + DDS_DataReader * dds_reader = initialize_datareader( + ctx, + dp, + sub, + fqtopic_name, + qos_policies, + type_support, + subscriber_options, + internal, + sub_topic); + + auto scope_exit_dds_reader_delete = + rcpputils::make_scope_exit( + [sub, dds_reader]() + { + if (DDS_RETCODE_OK != + DDS_Subscriber_delete_datareader(sub, dds_reader)) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to delete DDS DataWriter") + } + }); + + RMW_Connext_Subscriber * rmw_sub_impl = + new (std::nothrow) RMW_Connext_Subscriber( + ctx, + dds_reader, + topic, + type_support, + subscriber_options->ignore_local_publications, + topic_created, + cft_topic, + internal); + + if (nullptr == rmw_sub_impl) { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") + return nullptr; + } + rmw_sub_impl->fqtopic_name = fqtopic_name; + rmw_sub_impl->qos_policies = *qos_policies; + rmw_sub_impl->subscriber_options = *subscriber_options; + + scope_exit_type_unregister.cancel(); + scope_exit_topic_delete.cancel(); + scope_exit_dds_reader_delete.cancel(); + + return rmw_sub_impl; +} + +DDS_DataReader * +RMW_Connext_Subscriber::initialize_datareader( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const dp, + DDS_Subscriber * const sub, + const std::string & fqtopic_name, + const rmw_qos_profile_t * const qos_policies, + RMW_Connext_MessageTypeSupport * const type_support, + const rmw_subscription_options_t * const subscriber_options, + const bool internal, + DDS_TopicDescription * sub_topic) +{ // The following initialization generates warnings when built // with RTI Connext DDS Professional < 6 (e.g. 5.3.1), so use // DDS_DataWriterQos_initialize() for older versions. @@ -1288,46 +1349,11 @@ RMW_Connext_Subscriber::create( RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") return nullptr; } - - auto scope_exit_dds_reader_delete = - rcpputils::make_scope_exit( - [sub, dds_reader]() - { - if (DDS_RETCODE_OK != - DDS_Subscriber_delete_datareader(sub, dds_reader)) - { - RMW_CONNEXT_LOG_ERROR_SET( - "failed to delete DDS DataWriter") - } - }); - - RMW_Connext_Subscriber * rmw_sub_impl = - new (std::nothrow) RMW_Connext_Subscriber( - ctx, - dds_reader, - topic, - type_support, - subscriber_options->ignore_local_publications, - topic_created, - cft_topic, - internal); - - if (nullptr == rmw_sub_impl) { - RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") - return nullptr; - } - rmw_sub_impl->fqtopic_name = fqtopic_name; - rmw_sub_impl->qos_policies = *qos_policies; - - scope_exit_type_unregister.cancel(); - scope_exit_topic_delete.cancel(); - scope_exit_dds_reader_delete.cancel(); - - return rmw_sub_impl; + return dds_reader; } rmw_ret_t -RMW_Connext_Subscriber::finalize() +RMW_Connext_Subscriber::finalize(const bool reset_cft) { RMW_CONNEXT_LOG_DEBUG_A( "finalizing subscriber: sub=%p, type=%s", @@ -1351,44 +1377,46 @@ RMW_Connext_Subscriber::finalize() return RMW_RET_ERROR; } - DDS_DomainParticipant * const participant = this->dds_participant(); + if (!reset_cft) { + DDS_DomainParticipant * const participant = this->dds_participant(); - if (nullptr != this->dds_topic_cft) { - rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( - ctx, participant, this->dds_topic_cft); + if (nullptr != this->dds_topic_cft) { + rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( + ctx, participant, this->dds_topic_cft); - if (RMW_RET_OK != cft_rc) { - return cft_rc; + if (RMW_RET_OK != cft_rc) { + return cft_rc; + } + this->dds_topic_cft = nullptr; } - } - if (this->created_topic) { - DDS_Topic * const topic = this->dds_topic; + if (this->created_topic) { + DDS_Topic * const topic = this->dds_topic; - RMW_CONNEXT_LOG_DEBUG_A( - "deleting topic: name=%s", - DDS_TopicDescription_get_name( - DDS_Topic_as_topicdescription(topic))) + RMW_CONNEXT_LOG_DEBUG_A( + "deleting topic: name=%s", + DDS_TopicDescription_get_name( + DDS_Topic_as_topicdescription(topic))) - DDS_ReturnCode_t rc = - DDS_DomainParticipant_delete_topic(participant, topic); + DDS_ReturnCode_t rc = + DDS_DomainParticipant_delete_topic(participant, topic); - if (DDS_RETCODE_OK != rc) { - RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS Topic") - return RMW_RET_ERROR; + if (DDS_RETCODE_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS Topic") + return RMW_RET_ERROR; + } } - } - - rmw_ret_t rc = RMW_Connext_MessageTypeSupport::unregister_type_support( - this->ctx, participant, this->type_support->type_name()); - if (RMW_RET_OK != rc) { - return rc; - } + rmw_ret_t rc = RMW_Connext_MessageTypeSupport::unregister_type_support( + this->ctx, participant, this->type_support->type_name()); - delete this->type_support; - this->type_support = nullptr; + if (RMW_RET_OK != rc) { + return rc; + } + delete this->type_support; + this->type_support = nullptr; + } return RMW_RET_OK; } @@ -1510,139 +1538,73 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( const char * filter_expression, const rcutils_string_array_t * expression_parameters) { + RMW_CONNEXT_ASSERT(nullptr != filter_expression) + RMW_CONNEXT_ASSERT(!internal) + + rmw_ret_t ret; std::lock_guard lock(this->cft_mutex); + const bool filter_expression_empty = (*filter_expression == '\0'); if (nullptr == this->dds_topic_cft) { - // case 1: reset if there is no cft topic exist - if (filter_expression && *filter_expression == '\0') { + // allow to call set filter_expresson even if cft not exist + if (filter_expression_empty) { RMW_CONNEXT_LOG_DEBUG("current subscriber has no content filter topic") return RMW_RET_OK; } } else { - if (nullptr != filter_expression && *filter_expression != '\0') { - // case 2: normally to set filter expre if cft already support - DDS_ContentFilteredTopic * const cft_topic = - DDS_ContentFilteredTopic_narrow(dds_topic_cft); - - struct DDS_StringSeq cft_parameters; - DDS_StringSeq_initialize(&cft_parameters); - if (expression_parameters) { - DDS_StringSeq_ensure_length( - &cft_parameters, expression_parameters->size, expression_parameters->size); - DDS_StringSeq_from_array( - &cft_parameters, - const_cast(expression_parameters->data), - expression_parameters->size); - } - - DDS_ReturnCode_t ret = - DDS_ContentFilteredTopic_set_expression(cft_topic, filter_expression, &cft_parameters); - DDS_StringSeq_finalize(&cft_parameters); - if (DDS_RETCODE_OK != ret) { - RMW_CONNEXT_LOG_ERROR("failed to set content-filtered topic") - return RMW_RET_ERROR; - } - return RMW_RET_OK; + if (!filter_expression_empty) { + // set filter expre if cft exist + return rmw_connextdds_set_cft_filter_expression( + this->dds_topic_cft, filter_expression, expression_parameters); } } - if (this->loan_len != 0) { - RMW_CONNEXT_LOG_ERROR("subscriber can't be reset because a message data loaned") - return RMW_RET_ERROR; - } - if (RMW_RET_OK != - rmw_connextdds_graph_on_subscriber_deleted( - ctx, node, this)) - { - RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") - return RMW_RET_ERROR; + // finalization to remove the old data reader + ret = finalize(true); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize subscriber with resetting cft flag") + return ret; } - - this->status_condition->invalidate(); - - if (DDS_RETCODE_OK != - DDS_Subscriber_delete_datareader(this->dds_subscriber(), this->dds_reader)) - { - RMW_CONNEXT_LOG_ERROR_SET( - "failed to delete DDS DataReader") - return RMW_RET_ERROR; - } - DDS_DomainParticipant * const dp = this->dds_participant(); DDS_TopicDescription * sub_topic = nullptr; if (nullptr == this->dds_topic_cft) { - // case 3: create a new cft if there is no cft topic and filter_expression is not empty("") - // remove old reader - // create cft topic and a new reader + // create a new cft topic, and then use cft topic to create a new reader std::string cft_topic_name = fqtopic_name + "_ContentFilterTopic" + RMW_Connext_Subscriber::get_atomic_id(); - rmw_ret_t cft_rc = - rmw_connextdds_create_contentfilteredtopic( + ret = rmw_connextdds_create_contentfilteredtopic( ctx, dp, dds_topic, cft_topic_name.c_str(), filter_expression, expression_parameters, &sub_topic); - if (RMW_RET_OK != cft_rc) { + if (RMW_RET_OK != ret) { RMW_CONNEXT_LOG_ERROR("failed to create content filter topic") - return RMW_RET_ERROR; + return ret; } this->dds_topic_cft = sub_topic; } else { - // case 4: delete cft topic and create a normal reader if filter_expression is empty("") - // that means reset - // remove content filter topic - if (nullptr != this->dds_topic_cft) { - rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( - ctx, dp, this->dds_topic_cft); + // delete cft, and then use the existing parent topic to create a new reader + ret = rmw_connextdds_delete_contentfilteredtopic( + ctx, dp, this->dds_topic_cft); - if (RMW_RET_OK != cft_rc) { - return cft_rc; - } + if (RMW_RET_OK != ret) { + return ret; } - this->dds_topic_cft = nullptr; - // create normal topic and a new reader sub_topic = DDS_Topic_as_topicdescription(dds_topic); } -#if !RMW_CONNEXT_DDS_API_PRO_LEGACY - DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; -#else - DDS_DataReaderQos dr_qos; - if (DDS_RETCODE_OK != DDS_DataReaderQos_initialize(&dr_qos)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datareader qos") - return RMW_RET_ERROR; - } -#endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ - - DDS_DataReaderQos * const dr_qos_ptr = &dr_qos; - auto scope_exit_dr_qos_delete = - rcpputils::make_scope_exit( - [dr_qos_ptr]() - { - DDS_DataReaderQos_finalize(dr_qos_ptr); - }); - - if (DDS_RETCODE_OK != - DDS_Subscriber_get_default_datareader_qos(this->dds_subscriber(), &dr_qos)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get default reader QoS") - return RMW_RET_ERROR; - } - - DDS_DataReader * dds_reader = - rmw_connextdds_create_datareader( + DDS_DataReader * dds_reader = initialize_datareader( ctx, dp, this->dds_subscriber(), + fqtopic_name, &qos_policies, - nullptr, - internal, type_support, - sub_topic, - &dr_qos); + &subscriber_options, + internal, + sub_topic); if (nullptr == dds_reader) { RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") @@ -1656,19 +1618,16 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( this->dds_reader, this->ignore_local, this->internal)); rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); - if (!internal) { - if (RMW_RET_OK != this->enable()) { - RMW_CONNEXT_LOG_ERROR("failed to enable subscription") - return RMW_RET_ERROR; - } + ret = this->enable(); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR("failed to enable subscription") + return ret; + } - if (RMW_RET_OK != - rmw_connextdds_graph_on_subscriber_created( - ctx, node, this)) - { - RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") - return RMW_RET_ERROR; - } + ret = rmw_connextdds_graph_on_subscriber_created(ctx, node, this); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") + return ret; } return RMW_RET_OK; @@ -1684,76 +1643,9 @@ RMW_Connext_Subscriber::get_cft_expression_parameters( RMW_SET_ERROR_MSG("this subscriber has not created a contentfilteredtopic."); return RMW_RET_ERROR; } - DDS_ContentFilteredTopic * const cft_topic = - DDS_ContentFilteredTopic_narrow(dds_topic_cft); - - int parameters_len; - rcutils_allocator_t allocator = rcutils_get_default_allocator(); - - // get filter_expression - const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); - if (!expression) { - RMW_SET_ERROR_MSG("failed to get filter expression"); - return RMW_RET_ERROR; - } - - *filter_expression = rcutils_strdup(expression, allocator); - if (NULL == *filter_expression) { - RMW_SET_ERROR_MSG("failed to duplicate string"); - return RMW_RET_BAD_ALLOC; - } - auto scope_exit_filter_expression_delete = - rcpputils::make_scope_exit( - [filter_expression, allocator]() - { - if (*filter_expression) { - allocator.deallocate(*filter_expression, allocator.state); - *filter_expression = nullptr; - } - }); - // get parameters - struct DDS_StringSeq parameters; - DDS_ReturnCode_t status = - DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); - if (DDS_RETCODE_OK != status) { - RMW_SET_ERROR_MSG("failed to get expression parameters"); - return RMW_RET_ERROR; - } - auto scope_exit_parameters_delete = - rcpputils::make_scope_exit( - [¶meters]() - { - DDS_StringSeq_finalize(¶meters); - }); - - parameters_len = DDS_StringSeq_get_length(¶meters); - rcutils_ret_t rcutils_ret = - rcutils_string_array_init(expression_parameters, parameters_len, &allocator); - if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); - return RMW_RET_ERROR; - } - auto scope_exit_expression_parameters_delete = - rcpputils::make_scope_exit( - [expression_parameters]() - { - if (RCUTILS_RET_OK != rcutils_string_array_fini(expression_parameters)) { - RCUTILS_LOG_ERROR("Error while finalizing expression parameter due to another error"); - } - }); - for (int i = 0; i < parameters_len; ++i) { - char * parameter = rcutils_strdup(DDS_StringSeq_get(¶meters, i), allocator); - if (!parameter) { - RMW_SET_ERROR_MSG("failed to allocate memory for parameter"); - return RMW_RET_BAD_ALLOC; - } - expression_parameters->data[i] = parameter; - } - - scope_exit_filter_expression_delete.cancel(); - scope_exit_expression_parameters_delete.cancel(); - return RMW_RET_OK; + return rmw_connextdds_get_cft_filter_expression( + this->dds_topic_cft, filter_expression, expression_parameters); } rmw_ret_t diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index e7f2b6c0..d8034fd8 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -210,6 +210,29 @@ rmw_connextdds_initialize_participant_qos_impl( return RMW_RET_OK; } +static +rmw_ret_t +initialize_cft_parameters( + struct DDS_StringSeq * cft_parameters, + const rcutils_string_array_t * cft_expression_parameters) +{ + if (!DDS_StringSeq_ensure_length( + cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to ensure length for cft parameters sequence") + return RMW_RET_ERROR; + } + if (!DDS_StringSeq_from_array( + cft_parameters, + const_cast(cft_expression_parameters->data), + cft_expression_parameters->size)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to copy data for cft parameters sequence") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -225,16 +248,17 @@ rmw_connextdds_create_contentfilteredtopic( RMW_CONNEXT_ASSERT(nullptr != cft_filter) struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; - DDS_StringSeq_initialize(&cft_parameters); + auto scope_exit_cft_params = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); if (cft_expression_parameters) { - DDS_StringSeq_ensure_length( - &cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size); - DDS_StringSeq_from_array( - &cft_parameters, - const_cast(cft_expression_parameters->data), - cft_expression_parameters->size); - } else { - DDS_StringSeq_ensure_length(&cft_parameters, 0, 0); + if (RMW_RET_OK != initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize_cft_parameters") + return RMW_RET_ERROR; + } } *cft_out = nullptr; @@ -242,7 +266,6 @@ rmw_connextdds_create_contentfilteredtopic( DDS_ContentFilteredTopic * cft_topic = DDS_DomainParticipant_create_contentfilteredtopic( dp, cft_name, base_topic, cft_filter, &cft_parameters); - DDS_StringSeq_finalize(&cft_parameters); if (nullptr == cft_topic) { RMW_CONNEXT_LOG_ERROR_A_SET( "failed to create content-filtered topic: " @@ -1236,3 +1259,113 @@ rmw_connextdds_enable_security( return RMW_RET_OK; } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_cft_parameters = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (expression_parameters) { + if (RMW_RET_OK != initialize_cft_parameters(&cft_parameters, expression_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize_cft_parameters") + return RMW_RET_ERROR; + } + } + + DDS_ReturnCode_t ret = + DDS_ContentFilteredTopic_set_expression(cft_topic, filter_expression, &cft_parameters); + if (DDS_RETCODE_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to set content-filtered topic") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * cft_params_out) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + int parameters_len; + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + // get filter_expression + const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); + if (!expression) { + RMW_SET_ERROR_MSG("failed to get filter expression"); + return RMW_RET_ERROR; + } + + *expr_out = rcutils_strdup(expression, allocator); + if (NULL == *expr_out) { + RMW_SET_ERROR_MSG("failed to duplicate string"); + return RMW_RET_BAD_ALLOC; + } + auto scope_exit_filter_expression_delete = + rcpputils::make_scope_exit( + [expr_out, allocator]() + { + if (*expr_out) { + allocator.deallocate(*expr_out, allocator.state); + *expr_out = nullptr; + } + }); + + // get parameters + struct DDS_StringSeq parameters; + DDS_ReturnCode_t status = + DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); + if (DDS_RETCODE_OK != status) { + RMW_SET_ERROR_MSG("failed to get expression parameters"); + return RMW_RET_ERROR; + } + auto scope_exit_parameters_delete = + rcpputils::make_scope_exit( + [¶meters]() + { + DDS_StringSeq_finalize(¶meters); + }); + + parameters_len = DDS_StringSeq_get_length(¶meters); + rcutils_ret_t rcutils_ret = + rcutils_string_array_init(cft_params_out, parameters_len, &allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); + return RMW_RET_ERROR; + } + auto scope_exit_expression_parameters_delete = + rcpputils::make_scope_exit( + [cft_params_out]() + { + if (RCUTILS_RET_OK != rcutils_string_array_fini(cft_params_out)) { + RCUTILS_LOG_ERROR("Error while finalizing expression parameter due to another error"); + } + }); + for (int i = 0; i < parameters_len; ++i) { + char * parameter = rcutils_strdup(DDS_StringSeq_get(¶meters, i), allocator); + if (!parameter) { + RMW_SET_ERROR_MSG("failed to allocate memory for parameter"); + return RMW_RET_BAD_ALLOC; + } + cft_params_out->data[i] = parameter; + } + + scope_exit_filter_expression_delete.cancel(); + scope_exit_expression_parameters_delete.cancel(); + return RMW_RET_OK; +} diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index dbc9c178..cb6d6566 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -1954,3 +1954,27 @@ rmw_connextdds_enable_security( return RMW_RET_ERROR; #endif /* RMW_CONNEXT_ENABLE_SECURITY */ } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(filter_expression); + UNUSED_ARG(expression_parameters); + return RMW_RET_UNSUPPORTED; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * cft_params_out); +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(expr_out); + UNUSED_ARG(cft_params_out); + return RMW_RET_UNSUPPORTED; +} From f1a42174d96f0c5851ce23331a1bc14a4f8bdd06 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 26 Mar 2021 15:40:14 +0800 Subject: [PATCH 04/33] continue to address review add feature macro, some comments and something consistent, etc Co-authored-by: Andrea Sorbini Signed-off-by: Chen Lihui --- rmw_connextdds_common/src/common/rmw_impl.cpp | 29 ++++++++--------- .../src/ndds/dds_api_ndds.cpp | 31 ++++++++++++------- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 4857e8c2..b267d4c4 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1216,9 +1216,7 @@ RMW_Connext_Subscriber::create( DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic); std::string cft_topic_name; - if (nullptr != cft_name || - nullptr != subscriber_options->filter_expression) - { + if (nullptr != cft_name || nullptr != subscriber_options->filter_expression) { rmw_ret_t cft_rc = RMW_RET_OK; if (nullptr != cft_name) { @@ -1240,7 +1238,9 @@ RMW_Connext_Subscriber::create( return nullptr; } } else { - sub_topic = cft_topic; + if (nullptr != cft_topic) { + sub_topic = cft_topic; + } } } @@ -1540,26 +1540,23 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( { RMW_CONNEXT_ASSERT(nullptr != filter_expression) RMW_CONNEXT_ASSERT(!internal) + RMW_CONNEXT_ASSERT(nullptr != node) rmw_ret_t ret; std::lock_guard lock(this->cft_mutex); const bool filter_expression_empty = (*filter_expression == '\0'); - if (nullptr == this->dds_topic_cft) { + if (nullptr == this->dds_topic_cft && filter_expression_empty) { // allow to call set filter_expresson even if cft not exist - if (filter_expression_empty) { - RMW_CONNEXT_LOG_DEBUG("current subscriber has no content filter topic") - return RMW_RET_OK; - } - } else { - if (!filter_expression_empty) { - // set filter expre if cft exist - return rmw_connextdds_set_cft_filter_expression( - this->dds_topic_cft, filter_expression, expression_parameters); - } + RMW_CONNEXT_LOG_DEBUG("current subscriber has no content filter topic") + return RMW_RET_OK; + } else if (nullptr != this->dds_topic_cft && !filter_expression_empty) { + // set filter expre if cft exist + return rmw_connextdds_set_cft_filter_expression( + this->dds_topic_cft, filter_expression, expression_parameters); } // finalization to remove the old data reader - ret = finalize(true); + ret = finalize(true /* reset_cft */); if (RMW_RET_OK != ret) { RMW_CONNEXT_LOG_ERROR_SET("failed to finalize subscriber with resetting cft flag") return ret; diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index d8034fd8..7c625db8 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -212,7 +212,7 @@ rmw_connextdds_initialize_participant_qos_impl( static rmw_ret_t -initialize_cft_parameters( +rmw_connextdds_initialize_cft_parameters( struct DDS_StringSeq * cft_parameters, const rcutils_string_array_t * cft_expression_parameters) { @@ -255,8 +255,10 @@ rmw_connextdds_create_contentfilteredtopic( } }); if (cft_expression_parameters) { - if (RMW_RET_OK != initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize_cft_parameters") + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") return RMW_RET_ERROR; } } @@ -1277,8 +1279,10 @@ rmw_connextdds_set_cft_filter_expression( } }); if (expression_parameters) { - if (RMW_RET_OK != initialize_cft_parameters(&cft_parameters, expression_parameters)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize_cft_parameters") + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") return RMW_RET_ERROR; } } @@ -1301,7 +1305,6 @@ rmw_connextdds_get_cft_filter_expression( DDS_ContentFilteredTopic * const cft_topic = DDS_ContentFilteredTopic_narrow(topic_desc); - int parameters_len; rcutils_allocator_t allocator = rcutils_get_default_allocator(); // get filter_expression @@ -1327,7 +1330,7 @@ rmw_connextdds_get_cft_filter_expression( }); // get parameters - struct DDS_StringSeq parameters; + struct DDS_StringSeq parameters = DDS_SEQUENCE_INITIALIZER; DDS_ReturnCode_t status = DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); if (DDS_RETCODE_OK != status) { @@ -1341,9 +1344,9 @@ rmw_connextdds_get_cft_filter_expression( DDS_StringSeq_finalize(¶meters); }); - parameters_len = DDS_StringSeq_get_length(¶meters); + const DDS_Long parameters_len = DDS_StringSeq_get_length(¶meters); rcutils_ret_t rcutils_ret = - rcutils_string_array_init(cft_params_out, parameters_len, &allocator); + rcutils_string_array_init(cft_params_out, static_cast(parameters_len), &allocator); if (rcutils_ret != RCUTILS_RET_OK) { RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); return RMW_RET_ERROR; @@ -1356,8 +1359,14 @@ rmw_connextdds_get_cft_filter_expression( RCUTILS_LOG_ERROR("Error while finalizing expression parameter due to another error"); } }); - for (int i = 0; i < parameters_len; ++i) { - char * parameter = rcutils_strdup(DDS_StringSeq_get(¶meters, i), allocator); + for (DDS_Long i = 0; i < parameters_len; ++i) { + const char * parameter_ref = *DDS_StringSeq_get_reference(¶meters, i); + if (!parameter_ref) { + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "failed to get a reference for parameter with index %d", i); + return RMW_RET_ERROR; + } + char * parameter = rcutils_strdup(parameter_ref, allocator); if (!parameter) { RMW_SET_ERROR_MSG("failed to allocate memory for parameter"); return RMW_RET_BAD_ALLOC; From cdea0df37525ba2fece21d4f3b2680315cbe6d35 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 26 Mar 2021 16:21:39 +0800 Subject: [PATCH 05/33] use RMW_CONNEXT_LOG_ERROR* log Signed-off-by: Chen Lihui --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- rmw_connextdds_common/src/ndds/dds_api_ndds.cpp | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index b267d4c4..077a48a7 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1637,7 +1637,7 @@ RMW_Connext_Subscriber::get_cft_expression_parameters( { std::lock_guard lock(this->cft_mutex); if (nullptr == this->dds_topic_cft) { - RMW_SET_ERROR_MSG("this subscriber has not created a contentfilteredtopic."); + RMW_CONNEXT_LOG_ERROR_SET("this subscriber has not created a contentfilteredtopic."); return RMW_RET_ERROR; } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 7c625db8..f6f4068a 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -1310,13 +1310,13 @@ rmw_connextdds_get_cft_filter_expression( // get filter_expression const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); if (!expression) { - RMW_SET_ERROR_MSG("failed to get filter expression"); + RMW_CONNEXT_LOG_ERROR_SET("failed to get filter expression"); return RMW_RET_ERROR; } *expr_out = rcutils_strdup(expression, allocator); if (NULL == *expr_out) { - RMW_SET_ERROR_MSG("failed to duplicate string"); + RMW_CONNEXT_LOG_ERROR_SET("failed to duplicate string"); return RMW_RET_BAD_ALLOC; } auto scope_exit_filter_expression_delete = @@ -1334,7 +1334,7 @@ rmw_connextdds_get_cft_filter_expression( DDS_ReturnCode_t status = DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); if (DDS_RETCODE_OK != status) { - RMW_SET_ERROR_MSG("failed to get expression parameters"); + RMW_CONNEXT_LOG_ERROR_SET("failed to get expression parameters"); return RMW_RET_ERROR; } auto scope_exit_parameters_delete = @@ -1348,7 +1348,7 @@ rmw_connextdds_get_cft_filter_expression( rcutils_ret_t rcutils_ret = rcutils_string_array_init(cft_params_out, static_cast(parameters_len), &allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); + RMW_CONNEXT_LOG_ERROR_SET("failed to init string array for expression parameters"); return RMW_RET_ERROR; } auto scope_exit_expression_parameters_delete = @@ -1362,13 +1362,13 @@ rmw_connextdds_get_cft_filter_expression( for (DDS_Long i = 0; i < parameters_len; ++i) { const char * parameter_ref = *DDS_StringSeq_get_reference(¶meters, i); if (!parameter_ref) { - RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + RMW_CONNEXT_LOG_ERROR_A_SET( "failed to get a reference for parameter with index %d", i); return RMW_RET_ERROR; } char * parameter = rcutils_strdup(parameter_ref, allocator); if (!parameter) { - RMW_SET_ERROR_MSG("failed to allocate memory for parameter"); + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate memory for parameter"); return RMW_RET_BAD_ALLOC; } cft_params_out->data[i] = parameter; From 3f093b8edafbfddfd0920b5cbf53a376f1c99b05 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 26 Mar 2021 16:32:22 +0800 Subject: [PATCH 06/33] remove semicolon Signed-off-by: Chen Lihui --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- rmw_connextdds_common/src/ndds/dds_api_ndds.cpp | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 077a48a7..f2241429 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1637,7 +1637,7 @@ RMW_Connext_Subscriber::get_cft_expression_parameters( { std::lock_guard lock(this->cft_mutex); if (nullptr == this->dds_topic_cft) { - RMW_CONNEXT_LOG_ERROR_SET("this subscriber has not created a contentfilteredtopic."); + RMW_CONNEXT_LOG_ERROR_SET("this subscriber has not created a contentfilteredtopic") return RMW_RET_ERROR; } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index f6f4068a..54d27eb7 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -1310,13 +1310,13 @@ rmw_connextdds_get_cft_filter_expression( // get filter_expression const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); if (!expression) { - RMW_CONNEXT_LOG_ERROR_SET("failed to get filter expression"); + RMW_CONNEXT_LOG_ERROR_SET("failed to get filter expression") return RMW_RET_ERROR; } *expr_out = rcutils_strdup(expression, allocator); if (NULL == *expr_out) { - RMW_CONNEXT_LOG_ERROR_SET("failed to duplicate string"); + RMW_CONNEXT_LOG_ERROR_SET("failed to duplicate string") return RMW_RET_BAD_ALLOC; } auto scope_exit_filter_expression_delete = @@ -1334,7 +1334,7 @@ rmw_connextdds_get_cft_filter_expression( DDS_ReturnCode_t status = DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); if (DDS_RETCODE_OK != status) { - RMW_CONNEXT_LOG_ERROR_SET("failed to get expression parameters"); + RMW_CONNEXT_LOG_ERROR_SET("failed to get expression parameters") return RMW_RET_ERROR; } auto scope_exit_parameters_delete = @@ -1348,7 +1348,7 @@ rmw_connextdds_get_cft_filter_expression( rcutils_ret_t rcutils_ret = rcutils_string_array_init(cft_params_out, static_cast(parameters_len), &allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_CONNEXT_LOG_ERROR_SET("failed to init string array for expression parameters"); + RMW_CONNEXT_LOG_ERROR_SET("failed to init string array for expression parameters") return RMW_RET_ERROR; } auto scope_exit_expression_parameters_delete = @@ -1363,12 +1363,12 @@ rmw_connextdds_get_cft_filter_expression( const char * parameter_ref = *DDS_StringSeq_get_reference(¶meters, i); if (!parameter_ref) { RMW_CONNEXT_LOG_ERROR_A_SET( - "failed to get a reference for parameter with index %d", i); + "failed to get a reference for parameter with index %d", i) return RMW_RET_ERROR; } char * parameter = rcutils_strdup(parameter_ref, allocator); if (!parameter) { - RMW_CONNEXT_LOG_ERROR_SET("failed to allocate memory for parameter"); + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate memory for parameter") return RMW_RET_BAD_ALLOC; } cft_params_out->data[i] = parameter; From e642c50ee0a9158edc6423e0e624ff8fa21ec9d7 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Mon, 29 Mar 2021 09:34:28 +0800 Subject: [PATCH 07/33] accessing loan_len in finalize need to be protected by mutex Signed-off-by: Chen Lihui --- .../include/rmw_connextdds/rmw_impl.hpp | 2 +- rmw_connextdds_common/src/common/rmw_impl.cpp | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 80ce8add..f1815c01 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -510,7 +510,7 @@ class RMW_Connext_Subscriber std::shared_ptr status_condition; RMW_Connext_UntypedSampleSeq loan_data; DDS_SampleInfoSeq loan_info; - std::atomic_uint64_t loan_len; + size_t loan_len; size_t loan_next; std::mutex loan_mutex; std::mutex cft_mutex; diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index f2241429..3eb5a43b 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1362,10 +1362,12 @@ RMW_Connext_Subscriber::finalize(const bool reset_cft) // Make sure subscriber's condition is detached from any waitset this->status_condition->invalidate(); - if (this->loan_len > 0) { - this->loan_next = this->loan_len; - if (RMW_RET_OK != this->return_messages()) { - return RMW_RET_ERROR; + { + std::lock_guard lock(this->loan_mutex); + if (this->loan_len > 0) { + if (RMW_RET_OK != this->return_messages()) { + return RMW_RET_ERROR; + } } } From 7d6717fbae4b5f84064904e9305016dfc0972146 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Wed, 31 Mar 2021 14:02:34 +0800 Subject: [PATCH 08/33] make source code style consistent and update code structure Signed-off-by: Chen Lihui --- .../include/rmw_connextdds/rmw_impl.hpp | 12 ++- rmw_connextdds_common/src/common/rmw_impl.cpp | 81 ++++++++++++------- 2 files changed, 55 insertions(+), 38 deletions(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index f1815c01..8df4682c 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -274,6 +274,7 @@ class RMW_Connext_Subscriber const rmw_qos_profile_t * const qos_policies, const rmw_subscription_options_t * const subscriber_options, const bool internal = false, + const rmw_node_t * const node = nullptr, const RMW_Connext_MessageType msg_type = RMW_CONNEXT_MESSAGE_USERDATA, const void * const intro_members = nullptr, const bool intro_members_cpp = false, @@ -486,11 +487,6 @@ class RMW_Connext_Subscriber return std::to_string(id++); } - void set_node(const rmw_node_t * node) - { - this->node = node; - } - bool is_cft_supported() { return nullptr != dds_topic_cft; @@ -515,7 +511,6 @@ class RMW_Connext_Subscriber std::mutex loan_mutex; std::mutex cft_mutex; const rmw_node_t * node; - std::string fqtopic_name; rmw_qos_profile_t qos_policies; rmw_subscription_options_t subscriber_options; @@ -527,7 +522,10 @@ class RMW_Connext_Subscriber const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, - const bool internal); + const bool internal, + const rmw_node_t * const node, + const rmw_qos_profile_t * const qos_policies, + const rmw_subscription_options_t * const subscriber_options); static DDS_DataReader * diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 3eb5a43b..ddde8731 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -27,6 +27,7 @@ const char * const ROS_TOPIC_PREFIX = "rt"; const char * const ROS_SERVICE_REQUESTER_PREFIX = ROS_SERVICE_REQUESTER_PREFIX_STR; const char * const ROS_SERVICE_RESPONSE_PREFIX = ROS_SERVICE_RESPONSE_PREFIX_STR; +const char * const ROS_CFT_TOPIC_NAME_INFIX = "_ContentFilterTopic"; std::string rmw_connextdds_create_topic_name( @@ -1086,7 +1087,10 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, - const bool internal) + const bool internal, + const rmw_node_t * const node, + const rmw_qos_profile_t * const qos_policies, + const rmw_subscription_options_t * const subscriber_options) : internal(internal), ignore_local(ignore_local), ctx(ctx), @@ -1095,7 +1099,10 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( dds_topic_cft(dds_topic_cft), type_support(type_support), created_topic(created_topic), - status_condition(new RMW_Connext_SubscriberStatusCondition(dds_reader, ignore_local, internal)) + status_condition(new RMW_Connext_SubscriberStatusCondition(dds_reader, ignore_local, internal)), + node(node), + qos_policies(*qos_policies), + subscriber_options(*subscriber_options) { rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); @@ -1124,6 +1131,7 @@ RMW_Connext_Subscriber::create( const rmw_qos_profile_t * const qos_policies, const rmw_subscription_options_t * const subscriber_options, const bool internal, + const rmw_node_t * const node, const RMW_Connext_MessageType msg_type, const void * const intro_members, const bool intro_members_cpp, @@ -1225,7 +1233,7 @@ RMW_Connext_Subscriber::create( ctx, dp, topic, cft_name, cft_filter, NULL, &cft_topic); } else if (nullptr != subscriber_options->filter_expression) { cft_topic_name = - fqtopic_name + "_ContentFilterTopic" + RMW_Connext_Subscriber::get_atomic_id(); + fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id(); cft_rc = rmw_connextdds_create_contentfilteredtopic( ctx, dp, topic, cft_topic_name.c_str(), @@ -1244,7 +1252,7 @@ RMW_Connext_Subscriber::create( } } - DDS_DataReader * dds_reader = initialize_datareader( + DDS_DataReader * dds_reader = RMW_Connext_Subscriber::initialize_datareader( ctx, dp, sub, @@ -1276,15 +1284,15 @@ RMW_Connext_Subscriber::create( subscriber_options->ignore_local_publications, topic_created, cft_topic, - internal); + internal, + node, + qos_policies, + subscriber_options); if (nullptr == rmw_sub_impl) { RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") return nullptr; } - rmw_sub_impl->fqtopic_name = fqtopic_name; - rmw_sub_impl->qos_policies = *qos_policies; - rmw_sub_impl->subscriber_options = *subscriber_options; scope_exit_type_unregister.cancel(); scope_exit_topic_delete.cancel(); @@ -1371,17 +1379,24 @@ RMW_Connext_Subscriber::finalize(const bool reset_cft) } } - if (nullptr != this->dds_reader && DDS_RETCODE_OK != + if (nullptr == this->dds_reader) { + RMW_CONNEXT_LOG_ERROR_SET("DDS DataReader is invalid") + return RMW_RET_ERROR; + } + + // to get the participant needs a valid dds_reader + DDS_DomainParticipant * const participant = this->dds_participant(); + + if (DDS_RETCODE_OK != DDS_Subscriber_delete_datareader( this->dds_subscriber(), this->dds_reader)) { RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS DataReader") return RMW_RET_ERROR; } + this->dds_reader = nullptr; if (!reset_cft) { - DDS_DomainParticipant * const participant = this->dds_participant(); - if (nullptr != this->dds_topic_cft) { rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( ctx, participant, this->dds_topic_cft); @@ -1541,8 +1556,8 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( const rcutils_string_array_t * expression_parameters) { RMW_CONNEXT_ASSERT(nullptr != filter_expression) - RMW_CONNEXT_ASSERT(!internal) - RMW_CONNEXT_ASSERT(nullptr != node) + RMW_CONNEXT_ASSERT(!this->internal) + RMW_CONNEXT_ASSERT(nullptr != this->node) rmw_ret_t ret; std::lock_guard lock(this->cft_mutex); @@ -1557,21 +1572,25 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( this->dds_topic_cft, filter_expression, expression_parameters); } + DDS_DomainParticipant * const dp = this->dds_participant(); + DDS_Subscriber * const sub = this->dds_subscriber(); + DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(this->dds_topic); + std::string fqtopic_name = DDS_TopicDescription_get_name(sub_topic); + // finalization to remove the old data reader - ret = finalize(true /* reset_cft */); + ret = this->finalize(true /* reset_cft */); if (RMW_RET_OK != ret) { RMW_CONNEXT_LOG_ERROR_SET("failed to finalize subscriber with resetting cft flag") return ret; } - DDS_DomainParticipant * const dp = this->dds_participant(); - DDS_TopicDescription * sub_topic = nullptr; + if (nullptr == this->dds_topic_cft) { // create a new cft topic, and then use cft topic to create a new reader std::string cft_topic_name = - fqtopic_name + "_ContentFilterTopic" + RMW_Connext_Subscriber::get_atomic_id(); + fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id(); ret = rmw_connextdds_create_contentfilteredtopic( - ctx, dp, dds_topic, cft_topic_name.c_str(), + this->ctx, dp, this->dds_topic, cft_topic_name.c_str(), filter_expression, expression_parameters, &sub_topic); @@ -1584,25 +1603,23 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( } else { // delete cft, and then use the existing parent topic to create a new reader ret = rmw_connextdds_delete_contentfilteredtopic( - ctx, dp, this->dds_topic_cft); + this->ctx, dp, this->dds_topic_cft); if (RMW_RET_OK != ret) { return ret; } this->dds_topic_cft = nullptr; - - sub_topic = DDS_Topic_as_topicdescription(dds_topic); } - DDS_DataReader * dds_reader = initialize_datareader( - ctx, + DDS_DataReader * dds_reader = RMW_Connext_Subscriber::initialize_datareader( + this->ctx, dp, - this->dds_subscriber(), + sub, fqtopic_name, - &qos_policies, - type_support, - &subscriber_options, - internal, + &this->qos_policies, + this->type_support, + &this->subscriber_options, + this->internal, sub_topic); if (nullptr == dds_reader) { @@ -1623,7 +1640,7 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( return ret; } - ret = rmw_connextdds_graph_on_subscriber_created(ctx, node, this); + ret = rmw_connextdds_graph_on_subscriber_created(this->ctx, this->node, this); if (RMW_RET_OK != ret) { RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") return ret; @@ -1882,14 +1899,14 @@ rmw_connextdds_create_subscriber( topic_name, qos_policies, subscriber_options, - internal); + internal, + node); if (nullptr == rmw_sub_impl) { RMW_CONNEXT_LOG_ERROR( "failed to allocate RMW_Connext_Subscriber") return nullptr; } - rmw_sub_impl->set_node(node); auto scope_exit_rmw_reader_impl_delete = rcpputils::make_scope_exit( [rmw_sub_impl]() @@ -2554,6 +2571,7 @@ RMW_Connext_Client::create( qos_policies, &sub_options, false /* internal */, + nullptr /* node */, RMW_CONNEXT_MESSAGE_REPLY, svc_members_res, svc_members_res_cpp, @@ -2846,6 +2864,7 @@ RMW_Connext_Service::create( qos_policies, &sub_options, false /* internal */, + nullptr /* node */, RMW_CONNEXT_MESSAGE_REQUEST, svc_members_req, svc_members_req_cpp, From 5571d49497294b4f56ed6e1304c7c83d43419695 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Thu, 1 Apr 2021 09:58:10 +0800 Subject: [PATCH 09/33] add the lost `this->` and adjust location of a check statement Signed-off-by: Chen Lihui --- rmw_connextdds_common/src/common/rmw_impl.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index ddde8731..a263133c 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1367,6 +1367,11 @@ RMW_Connext_Subscriber::finalize(const bool reset_cft) "finalizing subscriber: sub=%p, type=%s", (void *)this, this->type_support->type_name()) + if (nullptr == this->dds_reader) { + RMW_CONNEXT_LOG_ERROR_SET("DDS DataReader is invalid") + return RMW_RET_ERROR; + } + // Make sure subscriber's condition is detached from any waitset this->status_condition->invalidate(); @@ -1379,11 +1384,6 @@ RMW_Connext_Subscriber::finalize(const bool reset_cft) } } - if (nullptr == this->dds_reader) { - RMW_CONNEXT_LOG_ERROR_SET("DDS DataReader is invalid") - return RMW_RET_ERROR; - } - // to get the participant needs a valid dds_reader DDS_DomainParticipant * const participant = this->dds_participant(); @@ -1399,7 +1399,7 @@ RMW_Connext_Subscriber::finalize(const bool reset_cft) if (!reset_cft) { if (nullptr != this->dds_topic_cft) { rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( - ctx, participant, this->dds_topic_cft); + this->ctx, participant, this->dds_topic_cft); if (RMW_RET_OK != cft_rc) { return cft_rc; From 2f85b632289176b33cf7fa5c2b18bcb8ade24e12 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:41:51 -0700 Subject: [PATCH 10/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index d982693b..57ff149b 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -72,7 +72,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, - const rcutils_string_array_t * cft_expression_parameters, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out); rmw_ret_t From 61a3bd5be84070019b697889417e51311ebd6e03 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:42:07 -0700 Subject: [PATCH 11/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index 57ff149b..701da9ba 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -211,7 +211,7 @@ rmw_connextdds_enable_security( rmw_ret_t rmw_connextdds_set_cft_filter_expression( DDS_TopicDescription * const topic_desc, - const char * filter_expression, + const char * const filter_expression, const rcutils_string_array_t * expression_parameters); rmw_ret_t From 6d065d1f26a215b093327a1948b38119294b87d8 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:42:23 -0700 Subject: [PATCH 12/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index 701da9ba..a861c20e 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -212,7 +212,7 @@ rmw_ret_t rmw_connextdds_set_cft_filter_expression( DDS_TopicDescription * const topic_desc, const char * const filter_expression, - const rcutils_string_array_t * expression_parameters); + const rcutils_string_array_t * const expression_parameters); rmw_ret_t rmw_connextdds_get_cft_filter_expression( From 11d653f4751497af82e945bbc3e91fe15165d65e Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:42:36 -0700 Subject: [PATCH 13/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index a861c20e..94a3ac81 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -218,7 +218,7 @@ rmw_ret_t rmw_connextdds_get_cft_filter_expression( DDS_TopicDescription * const topic_desc, char ** const expr_out, - rcutils_string_array_t * cft_params_out); + rcutils_string_array_t * const cft_params_out); // Define some macro aliases for security-related properties #ifndef DDS_SECURITY_PROPERTY_PREFIX From a297eaa850938efe3c9ff3c3770289547c90d8b0 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:42:48 -0700 Subject: [PATCH 14/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 8df4682c..827423a3 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -441,7 +441,7 @@ class RMW_Connext_Subscriber rmw_ret_t set_cft_expression_parameters( - const char * filter_expression, + const char * const filter_expression, const rcutils_string_array_t * expression_parameters ); From 62eae2e94875575b2d3721305a497ccfe6b56a5b Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:43:04 -0700 Subject: [PATCH 15/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 827423a3..39b7bcff 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -442,7 +442,7 @@ class RMW_Connext_Subscriber rmw_ret_t set_cft_expression_parameters( const char * const filter_expression, - const rcutils_string_array_t * expression_parameters + const rcutils_string_array_t * const expression_parameters ); rmw_ret_t From 1c3ec51027a1fa657628295da892341e86a55867 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:43:24 -0700 Subject: [PATCH 16/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 39b7bcff..d392aa53 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -447,7 +447,7 @@ class RMW_Connext_Subscriber rmw_ret_t get_cft_expression_parameters( - char ** filter_expression, + char ** const filter_expression, rcutils_string_array_t * expression_parameters ); From d549fb14001d9afee3688ebb3d55c51834044a85 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:43:46 -0700 Subject: [PATCH 17/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index d392aa53..d865fb93 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -448,7 +448,7 @@ class RMW_Connext_Subscriber rmw_ret_t get_cft_expression_parameters( char ** const filter_expression, - rcutils_string_array_t * expression_parameters + rcutils_string_array_t * const expression_parameters ); bool From 80b0d9d5640e4563231b0dbc253ec82932dd4b7a Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:44:01 -0700 Subject: [PATCH 18/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index d865fb93..bd6c63f5 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -510,7 +510,7 @@ class RMW_Connext_Subscriber size_t loan_next; std::mutex loan_mutex; std::mutex cft_mutex; - const rmw_node_t * node; + const rmw_node_t * const node; rmw_qos_profile_t qos_policies; rmw_subscription_options_t subscriber_options; From 784bacf6430700d48e8926215f196e1b7046c162 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:44:20 -0700 Subject: [PATCH 19/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index bd6c63f5..c040e46f 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -538,7 +538,7 @@ class RMW_Connext_Subscriber RMW_Connext_MessageTypeSupport * const type_support, const rmw_subscription_options_t * const subscriber_options, const bool internal, - DDS_TopicDescription * sub_topic); + DDS_TopicDescription * const sub_topic); friend class RMW_Connext_SubscriberStatusCondition; }; From f200086dbfea711d5d8170341650f0c414896d3d Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:44:36 -0700 Subject: [PATCH 20/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index a263133c..087f5a74 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1311,7 +1311,7 @@ RMW_Connext_Subscriber::initialize_datareader( RMW_Connext_MessageTypeSupport * const type_support, const rmw_subscription_options_t * const subscriber_options, const bool internal, - DDS_TopicDescription * sub_topic) + DDS_TopicDescription * const sub_topic) { // The following initialization generates warnings when built // with RTI Connext DDS Professional < 6 (e.g. 5.3.1), so use From 1fa0c92ae0a6530d2b629ab3f40ef522d3fa40a2 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:44:52 -0700 Subject: [PATCH 21/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 087f5a74..83569c4a 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1552,7 +1552,7 @@ RMW_Connext_Subscriber::take_serialized( rmw_ret_t RMW_Connext_Subscriber::set_cft_expression_parameters( - const char * filter_expression, + const char * const filter_expression, const rcutils_string_array_t * expression_parameters) { RMW_CONNEXT_ASSERT(nullptr != filter_expression) From 10f73bf9d9d1e0480f155853dc423ff3d421bcd3 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:45:11 -0700 Subject: [PATCH 22/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 83569c4a..99603081 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1553,7 +1553,7 @@ RMW_Connext_Subscriber::take_serialized( rmw_ret_t RMW_Connext_Subscriber::set_cft_expression_parameters( const char * const filter_expression, - const rcutils_string_array_t * expression_parameters) + const rcutils_string_array_t * const expression_parameters) { RMW_CONNEXT_ASSERT(nullptr != filter_expression) RMW_CONNEXT_ASSERT(!this->internal) From bcfe6a4bab56784911df2a3faec65dfe58eb61ac Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:45:26 -0700 Subject: [PATCH 23/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/rtime/dds_api_rtime.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index cb6d6566..95af0ae6 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -1971,7 +1971,7 @@ rmw_ret_t rmw_connextdds_get_cft_filter_expression( DDS_TopicDescription * const topic_desc, char ** const expr_out, - rcutils_string_array_t * cft_params_out); + rcutils_string_array_t * const cft_params_out); { UNUSED_ARG(topic_desc); UNUSED_ARG(expr_out); From 9869a73931ab093aee7193e5a830221061680d81 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:45:41 -0700 Subject: [PATCH 24/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/rtime/dds_api_rtime.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 95af0ae6..09a2b382 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -1959,7 +1959,7 @@ rmw_ret_t rmw_connextdds_set_cft_filter_expression( DDS_TopicDescription * const topic_desc, const char * filter_expression, - const rcutils_string_array_t * expression_parameters) + const rcutils_string_array_t * const expression_parameters) { UNUSED_ARG(topic_desc); UNUSED_ARG(filter_expression); From f6559440bcb7535c2e13f1b28b183b2ac773d84b Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:45:52 -0700 Subject: [PATCH 25/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/rtime/dds_api_rtime.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 09a2b382..81fc1100 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -1958,7 +1958,7 @@ rmw_connextdds_enable_security( rmw_ret_t rmw_connextdds_set_cft_filter_expression( DDS_TopicDescription * const topic_desc, - const char * filter_expression, + const char * const filter_expression, const rcutils_string_array_t * const expression_parameters) { UNUSED_ARG(topic_desc); From 836224a798420b1a1eab856153b18fec0084dd3c Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:46:04 -0700 Subject: [PATCH 26/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/rtime/dds_api_rtime.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 81fc1100..7b4d602c 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -867,7 +867,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, - const rcutils_string_array_t * cft_expression_parameters, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); From 2b72132b671223a9030b4ff3445fb58b20787b05 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:46:16 -0700 Subject: [PATCH 27/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/ndds/dds_api_ndds.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 54d27eb7..e581b04c 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -1266,7 +1266,7 @@ rmw_ret_t rmw_connextdds_set_cft_filter_expression( DDS_TopicDescription * const topic_desc, const char * filter_expression, - const rcutils_string_array_t * expression_parameters) + const rcutils_string_array_t * const expression_parameters) { DDS_ContentFilteredTopic * const cft_topic = DDS_ContentFilteredTopic_narrow(topic_desc); From c56c7f96a922880071a2ad66c7f7f22468455e56 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:46:30 -0700 Subject: [PATCH 28/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 99603081..e1887746 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1651,7 +1651,7 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( rmw_ret_t RMW_Connext_Subscriber::get_cft_expression_parameters( - char ** filter_expression, + char ** const filter_expression, rcutils_string_array_t * expression_parameters) { std::lock_guard lock(this->cft_mutex); From b342ba6e668be238d833c2faadc532afdd98ebe8 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:46:46 -0700 Subject: [PATCH 29/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index e1887746..d85e4c4a 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1652,7 +1652,7 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( rmw_ret_t RMW_Connext_Subscriber::get_cft_expression_parameters( char ** const filter_expression, - rcutils_string_array_t * expression_parameters) + rcutils_string_array_t * const expression_parameters) { std::lock_guard lock(this->cft_mutex); if (nullptr == this->dds_topic_cft) { From a177eda9cfce8c8ea9ddef3f433eeeb64b2dd45a Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:47:06 -0700 Subject: [PATCH 30/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/ndds/dds_api_ndds.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index e581b04c..97e82ae7 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -213,7 +213,7 @@ rmw_connextdds_initialize_participant_qos_impl( static rmw_ret_t rmw_connextdds_initialize_cft_parameters( - struct DDS_StringSeq * cft_parameters, + struct DDS_StringSeq * const cft_parameters, const rcutils_string_array_t * cft_expression_parameters) { if (!DDS_StringSeq_ensure_length( From 1d8d20a103cfbdf3191632fc6ba8a177f49bc938 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:47:21 -0700 Subject: [PATCH 31/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/ndds/dds_api_ndds.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 97e82ae7..7775717d 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -214,7 +214,7 @@ static rmw_ret_t rmw_connextdds_initialize_cft_parameters( struct DDS_StringSeq * const cft_parameters, - const rcutils_string_array_t * cft_expression_parameters) + const rcutils_string_array_t * const cft_expression_parameters) { if (!DDS_StringSeq_ensure_length( cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size)) From ac63580f19d49ac97fbc878a195ea52de46d1926 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:47:36 -0700 Subject: [PATCH 32/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/ndds/dds_api_ndds.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 7775717d..6436297f 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -240,7 +240,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, - const rcutils_string_array_t * cft_expression_parameters, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); From 3cc9b05c715c68ea803592ee2fe3332b51ac89e6 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 31 Mar 2021 19:47:51 -0700 Subject: [PATCH 33/33] Pass pointer arguments as const Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/ndds/dds_api_ndds.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 6436297f..ba7ede4b 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -1265,7 +1265,7 @@ rmw_connextdds_enable_security( rmw_ret_t rmw_connextdds_set_cft_filter_expression( DDS_TopicDescription * const topic_desc, - const char * filter_expression, + const char * const filter_expression, const rcutils_string_array_t * const expression_parameters) { DDS_ContentFilteredTopic * const cft_topic =