diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index 6875de79..e9e85238 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -220,6 +220,18 @@ rmw_connextdds_enable_security( rmw_context_impl_t * const ctx, DDS_DomainParticipantQos * const qos); +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters); + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * cft_params_out); + // Define some macro aliases for security-related properties #ifndef DDS_SECURITY_PROPERTY_PREFIX #define DDS_SECURITY_PROPERTY_PREFIX \ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 322065dd..c36f41a9 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -361,7 +361,7 @@ class RMW_Connext_Subscriber const char * const cft_filter = nullptr); rmw_ret_t - finalize(); + finalize(const bool reset_cft = false); DDS_DataReader * reader() const @@ -598,6 +598,7 @@ class RMW_Connext_Subscriber const rmw_node_t * node; std::string fqtopic_name; rmw_qos_profile_t qos_policies; + rmw_subscription_options_t subscriber_options; RMW_Connext_Subscriber( rmw_context_impl_t * const ctx, @@ -609,6 +610,21 @@ class RMW_Connext_Subscriber DDS_TopicDescription * const dds_topic_cft, const bool internal); + static + DDS_DataReader * + initialize_datareader( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const dp, + DDS_Subscriber * const sub, + const std::string & fqtopic_name, + const rmw_qos_profile_t * const qos_policies, + RMW_Connext_MessageTypeSupport * const type_support, +#if RMW_CONNEXT_HAVE_OPTIONS_PUBSUB + const rmw_subscription_options_t * const subscriber_options, +#endif /* RMW_CONNEXT_HAVE_OPTIONS_PUBSUB */ + const bool internal, + DDS_TopicDescription * sub_topic); + friend class RMW_Connext_SubscriberStatusCondition; }; diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 1fcfc9d2..a76eb19f 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -1278,52 +1278,18 @@ RMW_Connext_Subscriber::create( } } - // The following initialization generates warnings when built - // with RTI Connext DDS Professional < 6 (e.g. 5.3.1), so use - // DDS_DataWriterQos_initialize() for older versions. -#if !RMW_CONNEXT_DDS_API_PRO_LEGACY - DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; -#else - DDS_DataReaderQos dr_qos; - if (DDS_RETCODE_OK != DDS_DataReaderQos_initialize(&dr_qos)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datareader qos") - return nullptr; - } -#endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ - - DDS_DataReaderQos * const dr_qos_ptr = &dr_qos; - auto scope_exit_dr_qos_delete = - rcpputils::make_scope_exit( - [dr_qos_ptr]() - { - DDS_DataReaderQos_finalize(dr_qos_ptr); - }); - - if (DDS_RETCODE_OK != - DDS_Subscriber_get_default_datareader_qos_w_topic_name(sub, &dr_qos, fqtopic_name.c_str())) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get default reader QoS") - return nullptr; - } - - DDS_DataReader * dds_reader = - rmw_connextdds_create_datareader( + DDS_DataReader * dds_reader = initialize_datareader( ctx, dp, sub, + fqtopic_name, qos_policies, + type_support, #if RMW_CONNEXT_HAVE_OPTIONS_PUBSUB subscriber_options, #endif /* RMW_CONNEXT_HAVE_OPTIONS_PUBSUB */ internal, - type_support, - sub_topic, - &dr_qos); - - if (nullptr == dds_reader) { - RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") - return nullptr; - } + sub_topic); auto scope_exit_dds_reader_delete = rcpputils::make_scope_exit( @@ -1358,6 +1324,7 @@ RMW_Connext_Subscriber::create( } rmw_sub_impl->fqtopic_name = fqtopic_name; rmw_sub_impl->qos_policies = *qos_policies; + rmw_sub_impl->subscriber_options = *subscriber_options; scope_exit_type_unregister.cancel(); scope_exit_topic_delete.cancel(); @@ -1366,8 +1333,71 @@ RMW_Connext_Subscriber::create( return rmw_sub_impl; } +DDS_DataReader * +RMW_Connext_Subscriber::initialize_datareader( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const dp, + DDS_Subscriber * const sub, + const std::string & fqtopic_name, + const rmw_qos_profile_t * const qos_policies, + RMW_Connext_MessageTypeSupport * const type_support, +#if RMW_CONNEXT_HAVE_OPTIONS_PUBSUB + const rmw_subscription_options_t * const subscriber_options, +#endif /* RMW_CONNEXT_HAVE_OPTIONS_PUBSUB */ + const bool internal, + DDS_TopicDescription * sub_topic) +{ + // The following initialization generates warnings when built + // with RTI Connext DDS Professional < 6 (e.g. 5.3.1), so use + // DDS_DataWriterQos_initialize() for older versions. +#if !RMW_CONNEXT_DDS_API_PRO_LEGACY + DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; +#else + DDS_DataReaderQos dr_qos; + if (DDS_RETCODE_OK != DDS_DataReaderQos_initialize(&dr_qos)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datareader qos") + return nullptr; + } +#endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ + + DDS_DataReaderQos * const dr_qos_ptr = &dr_qos; + auto scope_exit_dr_qos_delete = + rcpputils::make_scope_exit( + [dr_qos_ptr]() + { + DDS_DataReaderQos_finalize(dr_qos_ptr); + }); + + if (DDS_RETCODE_OK != + DDS_Subscriber_get_default_datareader_qos_w_topic_name(sub, &dr_qos, fqtopic_name.c_str())) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to get default reader QoS") + return nullptr; + } + + DDS_DataReader * dds_reader = + rmw_connextdds_create_datareader( + ctx, + dp, + sub, + qos_policies, +#if RMW_CONNEXT_HAVE_OPTIONS_PUBSUB + subscriber_options, +#endif /* RMW_CONNEXT_HAVE_OPTIONS_PUBSUB */ + internal, + type_support, + sub_topic, + &dr_qos); + + if (nullptr == dds_reader) { + RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") + return nullptr; + } + return dds_reader; +} + rmw_ret_t -RMW_Connext_Subscriber::finalize() +RMW_Connext_Subscriber::finalize(const bool reset_cft) { RMW_CONNEXT_LOG_DEBUG_A( "finalizing subscriber: sub=%p, type=%s", @@ -1391,44 +1421,46 @@ RMW_Connext_Subscriber::finalize() return RMW_RET_ERROR; } - DDS_DomainParticipant * const participant = this->dds_participant(); + if (!reset_cft) { + DDS_DomainParticipant * const participant = this->dds_participant(); - if (nullptr != this->dds_topic_cft) { - rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( - ctx, participant, this->dds_topic_cft); + if (nullptr != this->dds_topic_cft) { + rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( + ctx, participant, this->dds_topic_cft); - if (RMW_RET_OK != cft_rc) { - return cft_rc; + if (RMW_RET_OK != cft_rc) { + return cft_rc; + } + this->dds_topic_cft = nullptr; } - } - if (this->created_topic) { - DDS_Topic * const topic = this->dds_topic; + if (this->created_topic) { + DDS_Topic * const topic = this->dds_topic; - RMW_CONNEXT_LOG_DEBUG_A( - "deleting topic: name=%s", - DDS_TopicDescription_get_name( - DDS_Topic_as_topicdescription(topic))) + RMW_CONNEXT_LOG_DEBUG_A( + "deleting topic: name=%s", + DDS_TopicDescription_get_name( + DDS_Topic_as_topicdescription(topic))) - DDS_ReturnCode_t rc = - DDS_DomainParticipant_delete_topic(participant, topic); + DDS_ReturnCode_t rc = + DDS_DomainParticipant_delete_topic(participant, topic); - if (DDS_RETCODE_OK != rc) { - RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS Topic") - return RMW_RET_ERROR; + if (DDS_RETCODE_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS Topic") + return RMW_RET_ERROR; + } } - } - rmw_ret_t rc = RMW_Connext_MessageTypeSupport::unregister_type_support( - this->ctx, participant, this->type_support->type_name()); + rmw_ret_t rc = RMW_Connext_MessageTypeSupport::unregister_type_support( + this->ctx, participant, this->type_support->type_name()); - if (RMW_RET_OK != rc) { - return rc; - } - - delete this->type_support; - this->type_support = nullptr; + if (RMW_RET_OK != rc) { + return rc; + } + delete this->type_support; + this->type_support = nullptr; + } return RMW_RET_OK; } @@ -1552,141 +1584,75 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( const char * filter_expression, const rcutils_string_array_t * expression_parameters) { + RMW_CONNEXT_ASSERT(nullptr != filter_expression) + RMW_CONNEXT_ASSERT(!internal) + + rmw_ret_t ret; std::lock_guard lock(this->cft_mutex); + const bool filter_expression_empty = (*filter_expression == '\0'); if (nullptr == this->dds_topic_cft) { - // case 1: reset if there is no cft topic exist - if (filter_expression && *filter_expression == '\0') { + // allow to call set filter_expresson even if cft not exist + if (filter_expression_empty) { RMW_CONNEXT_LOG_DEBUG("current subscriber has no content filter topic") return RMW_RET_OK; } } else { - if (nullptr != filter_expression && *filter_expression != '\0') { - // case 2: normally to set filter expre if cft already support - DDS_ContentFilteredTopic * const cft_topic = - DDS_ContentFilteredTopic_narrow(dds_topic_cft); - - struct DDS_StringSeq cft_parameters; - DDS_StringSeq_initialize(&cft_parameters); - if (expression_parameters) { - DDS_StringSeq_ensure_length( - &cft_parameters, expression_parameters->size, expression_parameters->size); - DDS_StringSeq_from_array( - &cft_parameters, - const_cast(expression_parameters->data), - expression_parameters->size); - } - - DDS_ReturnCode_t ret = - DDS_ContentFilteredTopic_set_expression(cft_topic, filter_expression, &cft_parameters); - DDS_StringSeq_finalize(&cft_parameters); - if (DDS_RETCODE_OK != ret) { - RMW_CONNEXT_LOG_ERROR("failed to set content-filtered topic") - return RMW_RET_ERROR; - } - return RMW_RET_OK; + if (!filter_expression_empty) { + // set filter expre if cft exist + return rmw_connextdds_set_cft_filter_expression( + this->dds_topic_cft, filter_expression, expression_parameters); } } - if (this->loan_len != 0) { - RMW_CONNEXT_LOG_ERROR("subscriber can't be reset because a message data loaned") - return RMW_RET_ERROR; + // finalization to remove the old data reader + ret = finalize(true); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize subscriber with resetting cft flag") + return ret; } - if (RMW_RET_OK != - rmw_connextdds_graph_on_subscriber_deleted( - ctx, node, this)) - { - RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") - return RMW_RET_ERROR; - } - - this->status_condition->invalidate(); - - if (DDS_RETCODE_OK != - DDS_Subscriber_delete_datareader(this->dds_subscriber(), this->dds_reader)) - { - RMW_CONNEXT_LOG_ERROR_SET( - "failed to delete DDS DataReader") - return RMW_RET_ERROR; - } - DDS_DomainParticipant * const dp = this->dds_participant(); DDS_TopicDescription * sub_topic = nullptr; if (nullptr == this->dds_topic_cft) { - // case 3: create a new cft if there is no cft topic and filter_expression is not empty("") - // remove old reader - // create cft topic and a new reader + // create a new cft topic, and then use cft topic to create a new reader std::string cft_topic_name = fqtopic_name + "_ContentFilterTopic" + RMW_Connext_Subscriber::get_atomic_id(); - rmw_ret_t cft_rc = - rmw_connextdds_create_contentfilteredtopic( + ret = rmw_connextdds_create_contentfilteredtopic( ctx, dp, dds_topic, cft_topic_name.c_str(), filter_expression, expression_parameters, &sub_topic); - if (RMW_RET_OK != cft_rc) { + if (RMW_RET_OK != ret) { RMW_CONNEXT_LOG_ERROR("failed to create content filter topic") - return RMW_RET_ERROR; + return ret; } this->dds_topic_cft = sub_topic; } else { - // case 4: delete cft topic and create a normal reader if filter_expression is empty("") - // that means reset - // remove content filter topic - if (nullptr != this->dds_topic_cft) { - rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( - ctx, dp, this->dds_topic_cft); + // delete cft, and then use the existing parent topic to create a new reader + ret = rmw_connextdds_delete_contentfilteredtopic( + ctx, dp, this->dds_topic_cft); - if (RMW_RET_OK != cft_rc) { - return cft_rc; - } + if (RMW_RET_OK != ret) { + return ret; } - this->dds_topic_cft = nullptr; - // create normal topic and a new reader sub_topic = DDS_Topic_as_topicdescription(dds_topic); } -#if !RMW_CONNEXT_DDS_API_PRO_LEGACY - DDS_DataReaderQos dr_qos = DDS_DataReaderQos_INITIALIZER; -#else - DDS_DataReaderQos dr_qos; - if (DDS_RETCODE_OK != DDS_DataReaderQos_initialize(&dr_qos)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize datareader qos") - return RMW_RET_ERROR; - } -#endif /* !RMW_CONNEXT_DDS_API_PRO_LEGACY */ - - DDS_DataReaderQos * const dr_qos_ptr = &dr_qos; - auto scope_exit_dr_qos_delete = - rcpputils::make_scope_exit( - [dr_qos_ptr]() - { - DDS_DataReaderQos_finalize(dr_qos_ptr); - }); - - if (DDS_RETCODE_OK != - DDS_Subscriber_get_default_datareader_qos(this->dds_subscriber(), &dr_qos)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get default reader QoS") - return RMW_RET_ERROR; - } - - DDS_DataReader * dds_reader = - rmw_connextdds_create_datareader( + DDS_DataReader * dds_reader = initialize_datareader( ctx, dp, this->dds_subscriber(), + fqtopic_name, &qos_policies, + type_support, #if RMW_CONNEXT_HAVE_OPTIONS_PUBSUB - nullptr, + &subscriber_options, #endif /* RMW_CONNEXT_HAVE_OPTIONS_PUBSUB */ internal, - type_support, - sub_topic, - &dr_qos); + sub_topic); if (nullptr == dds_reader) { RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") @@ -1700,19 +1666,16 @@ RMW_Connext_Subscriber::set_cft_expression_parameters( this->dds_reader, this->ignore_local, this->internal)); rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); - if (!internal) { - if (RMW_RET_OK != this->enable()) { - RMW_CONNEXT_LOG_ERROR("failed to enable subscription") - return RMW_RET_ERROR; - } + ret = this->enable(); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR("failed to enable subscription") + return ret; + } - if (RMW_RET_OK != - rmw_connextdds_graph_on_subscriber_created( - ctx, node, this)) - { - RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") - return RMW_RET_ERROR; - } + ret = rmw_connextdds_graph_on_subscriber_created(ctx, node, this); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") + return ret; } return RMW_RET_OK; @@ -1728,76 +1691,9 @@ RMW_Connext_Subscriber::get_cft_expression_parameters( RMW_SET_ERROR_MSG("this subscriber has not created a contentfilteredtopic."); return RMW_RET_ERROR; } - DDS_ContentFilteredTopic * const cft_topic = - DDS_ContentFilteredTopic_narrow(dds_topic_cft); - - int parameters_len; - rcutils_allocator_t allocator = rcutils_get_default_allocator(); - - // get filter_expression - const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); - if (!expression) { - RMW_SET_ERROR_MSG("failed to get filter expression"); - return RMW_RET_ERROR; - } - - *filter_expression = rcutils_strdup(expression, allocator); - if (NULL == *filter_expression) { - RMW_SET_ERROR_MSG("failed to duplicate string"); - return RMW_RET_BAD_ALLOC; - } - auto scope_exit_filter_expression_delete = - rcpputils::make_scope_exit( - [filter_expression, allocator]() - { - if (*filter_expression) { - allocator.deallocate(*filter_expression, allocator.state); - *filter_expression = nullptr; - } - }); - - // get parameters - struct DDS_StringSeq parameters; - DDS_ReturnCode_t status = - DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); - if (DDS_RETCODE_OK != status) { - RMW_SET_ERROR_MSG("failed to get expression parameters"); - return RMW_RET_ERROR; - } - auto scope_exit_parameters_delete = - rcpputils::make_scope_exit( - [¶meters]() - { - DDS_StringSeq_finalize(¶meters); - }); - - parameters_len = DDS_StringSeq_get_length(¶meters); - rcutils_ret_t rcutils_ret = - rcutils_string_array_init(expression_parameters, parameters_len, &allocator); - if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); - return RMW_RET_ERROR; - } - auto scope_exit_expression_parameters_delete = - rcpputils::make_scope_exit( - [expression_parameters]() - { - if (RCUTILS_RET_OK != rcutils_string_array_fini(expression_parameters)) { - RCUTILS_LOG_ERROR("Error while finalizing expression parameter due to another error"); - } - }); - for (int i = 0; i < parameters_len; ++i) { - char * parameter = rcutils_strdup(DDS_StringSeq_get(¶meters, i), allocator); - if (!parameter) { - RMW_SET_ERROR_MSG("failed to allocate memory for parameter"); - return RMW_RET_BAD_ALLOC; - } - expression_parameters->data[i] = parameter; - } - scope_exit_filter_expression_delete.cancel(); - scope_exit_expression_parameters_delete.cancel(); - return RMW_RET_OK; + return rmw_connextdds_get_cft_filter_expression( + this->dds_topic_cft, filter_expression, expression_parameters); } rmw_ret_t diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 3c4f20ea..4d509766 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -212,6 +212,29 @@ rmw_connextdds_initialize_participant_qos_impl( return RMW_RET_OK; } +static +rmw_ret_t +initialize_cft_parameters( + struct DDS_StringSeq * cft_parameters, + const rcutils_string_array_t * cft_expression_parameters) +{ + if (!DDS_StringSeq_ensure_length( + cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to ensure length for cft parameters sequence") + return RMW_RET_ERROR; + } + if (!DDS_StringSeq_from_array( + cft_parameters, + const_cast(cft_expression_parameters->data), + cft_expression_parameters->size)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to copy data for cft parameters sequence") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -227,16 +250,17 @@ rmw_connextdds_create_contentfilteredtopic( RMW_CONNEXT_ASSERT(nullptr != cft_filter) struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; - DDS_StringSeq_initialize(&cft_parameters); + auto scope_exit_cft_params = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); if (cft_expression_parameters) { - DDS_StringSeq_ensure_length( - &cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size); - DDS_StringSeq_from_array( - &cft_parameters, - const_cast(cft_expression_parameters->data), - cft_expression_parameters->size); - } else { - DDS_StringSeq_ensure_length(&cft_parameters, 0, 0); + if (RMW_RET_OK != initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize_cft_parameters") + return RMW_RET_ERROR; + } } *cft_out = nullptr; @@ -244,7 +268,6 @@ rmw_connextdds_create_contentfilteredtopic( DDS_ContentFilteredTopic * cft_topic = DDS_DomainParticipant_create_contentfilteredtopic( dp, cft_name, base_topic, cft_filter, &cft_parameters); - DDS_StringSeq_finalize(&cft_parameters); if (nullptr == cft_topic) { RMW_CONNEXT_LOG_ERROR_A_SET( "failed to create content-filtered topic: " @@ -1287,3 +1310,113 @@ rmw_connextdds_enable_security( return RMW_RET_OK; } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_cft_parameters = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (expression_parameters) { + if (RMW_RET_OK != initialize_cft_parameters(&cft_parameters, expression_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to initialize_cft_parameters") + return RMW_RET_ERROR; + } + } + + DDS_ReturnCode_t ret = + DDS_ContentFilteredTopic_set_expression(cft_topic, filter_expression, &cft_parameters); + if (DDS_RETCODE_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to set content-filtered topic") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * cft_params_out) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + int parameters_len; + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + // get filter_expression + const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); + if (!expression) { + RMW_SET_ERROR_MSG("failed to get filter expression"); + return RMW_RET_ERROR; + } + + *expr_out = rcutils_strdup(expression, allocator); + if (NULL == *expr_out) { + RMW_SET_ERROR_MSG("failed to duplicate string"); + return RMW_RET_BAD_ALLOC; + } + auto scope_exit_filter_expression_delete = + rcpputils::make_scope_exit( + [expr_out, allocator]() + { + if (*expr_out) { + allocator.deallocate(*expr_out, allocator.state); + *expr_out = nullptr; + } + }); + + // get parameters + struct DDS_StringSeq parameters; + DDS_ReturnCode_t status = + DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); + if (DDS_RETCODE_OK != status) { + RMW_SET_ERROR_MSG("failed to get expression parameters"); + return RMW_RET_ERROR; + } + auto scope_exit_parameters_delete = + rcpputils::make_scope_exit( + [¶meters]() + { + DDS_StringSeq_finalize(¶meters); + }); + + parameters_len = DDS_StringSeq_get_length(¶meters); + rcutils_ret_t rcutils_ret = + rcutils_string_array_init(cft_params_out, parameters_len, &allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG("failed to init string array for expression parameters"); + return RMW_RET_ERROR; + } + auto scope_exit_expression_parameters_delete = + rcpputils::make_scope_exit( + [cft_params_out]() + { + if (RCUTILS_RET_OK != rcutils_string_array_fini(cft_params_out)) { + RCUTILS_LOG_ERROR("Error while finalizing expression parameter due to another error"); + } + }); + for (int i = 0; i < parameters_len; ++i) { + char * parameter = rcutils_strdup(DDS_StringSeq_get(¶meters, i), allocator); + if (!parameter) { + RMW_SET_ERROR_MSG("failed to allocate memory for parameter"); + return RMW_RET_BAD_ALLOC; + } + cft_params_out->data[i] = parameter; + } + + scope_exit_filter_expression_delete.cancel(); + scope_exit_expression_parameters_delete.cancel(); + return RMW_RET_OK; +} diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 99a32277..f075f143 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -2003,3 +2003,27 @@ rmw_connextdds_enable_security( return RMW_RET_ERROR; #endif /* RMW_CONNEXT_ENABLE_SECURITY */ } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(filter_expression); + UNUSED_ARG(expression_parameters); + return RMW_RET_UNSUPPORTED; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * cft_params_out); +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(expr_out); + UNUSED_ARG(cft_params_out); + return RMW_RET_UNSUPPORTED; +}