diff --git a/rmw_connextdds/src/rmw_api_impl_ndds.cpp b/rmw_connextdds/src/rmw_api_impl_ndds.cpp index 81af9ee7..3bf150ef 100644 --- a/rmw_connextdds/src/rmw_api_impl_ndds.cpp +++ b/rmw_connextdds/src/rmw_api_impl_ndds.cpp @@ -637,6 +637,28 @@ rmw_subscription_get_actual_qos( } +rmw_ret_t +rmw_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + return rmw_api_connextdds_subscription_set_cft_expression_parameters( + subscription, filter_expression, expression_parameters); +} + + +rmw_ret_t +rmw_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters) +{ + return rmw_api_connextdds_subscription_get_cft_expression_parameters( + subscription, filter_expression, expression_parameters); +} + + rmw_ret_t rmw_destroy_subscription( rmw_node_t * node, diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index f342e4b3..94a3ac81 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -72,6 +72,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out); rmw_ret_t @@ -207,6 +208,18 @@ rmw_connextdds_enable_security( rmw_context_impl_t * const ctx, DDS_DomainParticipantQos * const qos); +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const filter_expression, + const rcutils_string_array_t * const expression_parameters); + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * const cft_params_out); + // Define some macro aliases for security-related properties #ifndef DDS_SECURITY_PROPERTY_PREFIX #define DDS_SECURITY_PROPERTY_PREFIX \ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp index e70dd802..697ade38 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp @@ -430,6 +430,22 @@ rmw_api_connextdds_subscription_get_actual_qos( rmw_qos_profile_t * qos); +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters); + + +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters); + + RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_destroy_subscription( diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 19d55c41..c040e46f 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -273,6 +274,7 @@ class RMW_Connext_Subscriber const rmw_qos_profile_t * const qos_policies, const rmw_subscription_options_t * const subscriber_options, const bool internal = false, + const rmw_node_t * const node = nullptr, const RMW_Connext_MessageType msg_type = RMW_CONNEXT_MESSAGE_USERDATA, const void * const intro_members = nullptr, const bool intro_members_cpp = false, @@ -281,7 +283,7 @@ class RMW_Connext_Subscriber const char * const cft_filter = nullptr); rmw_ret_t - finalize(); + finalize(const bool reset_cft = false); DDS_DataReader * reader() const @@ -353,7 +355,7 @@ class RMW_Connext_Subscriber RMW_Connext_SubscriberStatusCondition * condition() { - return &this->status_condition; + return this->status_condition.get(); } const rmw_gid_t * gid() const @@ -396,7 +398,7 @@ class RMW_Connext_Subscriber } if (this->internal) { - return this->status_condition.trigger_loan_guard_condition(this->loan_len > 0); + return this->status_condition->trigger_loan_guard_condition(this->loan_len > 0); } return RMW_RET_OK; @@ -437,6 +439,18 @@ class RMW_Connext_Subscriber rmw_message_info_t * const message_info, bool * const taken); + rmw_ret_t + set_cft_expression_parameters( + const char * const filter_expression, + const rcutils_string_array_t * const expression_parameters + ); + + rmw_ret_t + get_cft_expression_parameters( + char ** const filter_expression, + rcutils_string_array_t * const expression_parameters + ); + bool has_data() { @@ -467,6 +481,17 @@ class RMW_Connext_Subscriber return this->dds_topic; } + static std::string get_atomic_id() + { + static std::atomic_uint64_t id; + return std::to_string(id++); + } + + bool is_cft_supported() + { + return nullptr != dds_topic_cft; + } + const bool internal; const bool ignore_local; @@ -478,12 +503,16 @@ class RMW_Connext_Subscriber RMW_Connext_MessageTypeSupport * type_support; rmw_gid_t ros_gid; const bool created_topic; - RMW_Connext_SubscriberStatusCondition status_condition; + std::shared_ptr status_condition; RMW_Connext_UntypedSampleSeq loan_data; DDS_SampleInfoSeq loan_info; size_t loan_len; size_t loan_next; std::mutex loan_mutex; + std::mutex cft_mutex; + const rmw_node_t * const node; + rmw_qos_profile_t qos_policies; + rmw_subscription_options_t subscriber_options; RMW_Connext_Subscriber( rmw_context_impl_t * const ctx, @@ -493,7 +522,23 @@ class RMW_Connext_Subscriber const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, - const bool internal); + const bool internal, + const rmw_node_t * const node, + const rmw_qos_profile_t * const qos_policies, + const rmw_subscription_options_t * const subscriber_options); + + static + DDS_DataReader * + initialize_datareader( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const dp, + DDS_Subscriber * const sub, + const std::string & fqtopic_name, + const rmw_qos_profile_t * const qos_policies, + RMW_Connext_MessageTypeSupport * const type_support, + const rmw_subscription_options_t * const subscriber_options, + const bool internal, + DDS_TopicDescription * const sub_topic); friend class RMW_Connext_SubscriberStatusCondition; }; diff --git a/rmw_connextdds_common/src/common/rmw_event.cpp b/rmw_connextdds_common/src/common/rmw_event.cpp index 23acf6ee..4e124200 100644 --- a/rmw_connextdds_common/src/common/rmw_event.cpp +++ b/rmw_connextdds_common/src/common/rmw_event.cpp @@ -124,12 +124,16 @@ rmw_api_connextdds_take_event( } else { condition = RMW_Connext_Event::publisher(event_handle)->condition(); } - rmw_ret_t rc = condition->get_status(event_handle->event_type, event_info); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR_SET("failed to get status from DDS entity") - return rc; + if (nullptr != condition) { + rmw_ret_t rc = condition->get_status(event_handle->event_type, event_info); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get status from DDS entity") + return rc; + } + + *taken = true; + } else { + RMW_CONNEXT_LOG_WARNING("condition was reset because of resetting cft") } - - *taken = true; return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 29b671d7..d85e4c4a 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -27,6 +27,7 @@ const char * const ROS_TOPIC_PREFIX = "rt"; const char * const ROS_SERVICE_REQUESTER_PREFIX = ROS_SERVICE_REQUESTER_PREFIX_STR; const char * const ROS_SERVICE_RESPONSE_PREFIX = ROS_SERVICE_RESPONSE_PREFIX_STR; +const char * const ROS_CFT_TOPIC_NAME_INFIX = "_ContentFilterTopic"; std::string rmw_connextdds_create_topic_name( @@ -1086,7 +1087,10 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, - const bool internal) + const bool internal, + const rmw_node_t * const node, + const rmw_qos_profile_t * const qos_policies, + const rmw_subscription_options_t * const subscriber_options) : internal(internal), ignore_local(ignore_local), ctx(ctx), @@ -1095,7 +1099,10 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( dds_topic_cft(dds_topic_cft), type_support(type_support), created_topic(created_topic), - status_condition(dds_reader, ignore_local, internal) + status_condition(new RMW_Connext_SubscriberStatusCondition(dds_reader, ignore_local, internal)), + node(node), + qos_policies(*qos_policies), + subscriber_options(*subscriber_options) { rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); @@ -1124,6 +1131,7 @@ RMW_Connext_Subscriber::create( const rmw_qos_profile_t * const qos_policies, const rmw_subscription_options_t * const subscriber_options, const bool internal, + const rmw_node_t * const node, const RMW_Connext_MessageType msg_type, const void * const intro_members, const bool intro_members_cpp, @@ -1215,21 +1223,96 @@ RMW_Connext_Subscriber::create( }); DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic); - - if (nullptr != cft_name) { - rmw_ret_t cft_rc = - rmw_connextdds_create_contentfilteredtopic( - ctx, dp, topic, cft_name, cft_filter, &cft_topic); + std::string cft_topic_name; + if (nullptr != cft_name || nullptr != subscriber_options->filter_expression) { + rmw_ret_t cft_rc = RMW_RET_OK; + + if (nullptr != cft_name) { + cft_rc = + rmw_connextdds_create_contentfilteredtopic( + ctx, dp, topic, cft_name, cft_filter, NULL, &cft_topic); + } else if (nullptr != subscriber_options->filter_expression) { + cft_topic_name = + fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id(); + cft_rc = + rmw_connextdds_create_contentfilteredtopic( + ctx, dp, topic, cft_topic_name.c_str(), + subscriber_options->filter_expression, + subscriber_options->expression_parameters, &cft_topic); + } if (RMW_RET_OK != cft_rc) { if (RMW_RET_UNSUPPORTED != cft_rc) { return nullptr; } } else { - sub_topic = cft_topic; + if (nullptr != cft_topic) { + sub_topic = cft_topic; + } } } + DDS_DataReader * dds_reader = RMW_Connext_Subscriber::initialize_datareader( + ctx, + dp, + sub, + fqtopic_name, + qos_policies, + type_support, + subscriber_options, + internal, + sub_topic); + + auto scope_exit_dds_reader_delete = + rcpputils::make_scope_exit( + [sub, dds_reader]() + { + if (DDS_RETCODE_OK != + DDS_Subscriber_delete_datareader(sub, dds_reader)) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to delete DDS DataWriter") + } + }); + + RMW_Connext_Subscriber * rmw_sub_impl = + new (std::nothrow) RMW_Connext_Subscriber( + ctx, + dds_reader, + topic, + type_support, + subscriber_options->ignore_local_publications, + topic_created, + cft_topic, + internal, + node, + qos_policies, + subscriber_options); + + if (nullptr == rmw_sub_impl) { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") + return nullptr; + } + + scope_exit_type_unregister.cancel(); + scope_exit_topic_delete.cancel(); + scope_exit_dds_reader_delete.cancel(); + + return rmw_sub_impl; +} + +DDS_DataReader * +RMW_Connext_Subscriber::initialize_datareader( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const dp, + DDS_Subscriber * const sub, + const std::string & fqtopic_name, + const rmw_qos_profile_t * const qos_policies, + RMW_Connext_MessageTypeSupport * const type_support, + const rmw_subscription_options_t * const subscriber_options, + const bool internal, + DDS_TopicDescription * const sub_topic) +{ // The following initialization generates warnings when built // with RTI Connext DDS Professional < 6 (e.g. 5.3.1), so use // DDS_DataWriterQos_initialize() for older versions. @@ -1274,58 +1357,36 @@ RMW_Connext_Subscriber::create( RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") return nullptr; } - - auto scope_exit_dds_reader_delete = - rcpputils::make_scope_exit( - [sub, dds_reader]() - { - if (DDS_RETCODE_OK != - DDS_Subscriber_delete_datareader(sub, dds_reader)) - { - RMW_CONNEXT_LOG_ERROR_SET( - "failed to delete DDS DataWriter") - } - }); - - RMW_Connext_Subscriber * rmw_sub_impl = - new (std::nothrow) RMW_Connext_Subscriber( - ctx, - dds_reader, - topic, - type_support, - subscriber_options->ignore_local_publications, - topic_created, - cft_topic, - internal); - - if (nullptr == rmw_sub_impl) { - RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") - return nullptr; - } - scope_exit_type_unregister.cancel(); - scope_exit_topic_delete.cancel(); - scope_exit_dds_reader_delete.cancel(); - - return rmw_sub_impl; + return dds_reader; } rmw_ret_t -RMW_Connext_Subscriber::finalize() +RMW_Connext_Subscriber::finalize(const bool reset_cft) { RMW_CONNEXT_LOG_DEBUG_A( "finalizing subscriber: sub=%p, type=%s", (void *)this, this->type_support->type_name()) + if (nullptr == this->dds_reader) { + RMW_CONNEXT_LOG_ERROR_SET("DDS DataReader is invalid") + return RMW_RET_ERROR; + } + // Make sure subscriber's condition is detached from any waitset - this->status_condition.invalidate(); + this->status_condition->invalidate(); - if (this->loan_len > 0) { - this->loan_next = this->loan_len; - if (RMW_RET_OK != this->return_messages()) { - return RMW_RET_ERROR; + { + std::lock_guard lock(this->loan_mutex); + if (this->loan_len > 0) { + if (RMW_RET_OK != this->return_messages()) { + return RMW_RET_ERROR; + } } } + // to get the participant needs a valid dds_reader + DDS_DomainParticipant * const participant = this->dds_participant(); + if (DDS_RETCODE_OK != DDS_Subscriber_delete_datareader( this->dds_subscriber(), this->dds_reader)) @@ -1333,45 +1394,46 @@ RMW_Connext_Subscriber::finalize() RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS DataReader") return RMW_RET_ERROR; } + this->dds_reader = nullptr; - DDS_DomainParticipant * const participant = this->dds_participant(); + if (!reset_cft) { + if (nullptr != this->dds_topic_cft) { + rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( + this->ctx, participant, this->dds_topic_cft); - if (nullptr != this->dds_topic_cft) { - rmw_ret_t cft_rc = rmw_connextdds_delete_contentfilteredtopic( - ctx, participant, this->dds_topic_cft); - - if (RMW_RET_OK != cft_rc) { - return cft_rc; + if (RMW_RET_OK != cft_rc) { + return cft_rc; + } + this->dds_topic_cft = nullptr; } - } - if (this->created_topic) { - DDS_Topic * const topic = this->dds_topic; + if (this->created_topic) { + DDS_Topic * const topic = this->dds_topic; - RMW_CONNEXT_LOG_DEBUG_A( - "deleting topic: name=%s", - DDS_TopicDescription_get_name( - DDS_Topic_as_topicdescription(topic))) + RMW_CONNEXT_LOG_DEBUG_A( + "deleting topic: name=%s", + DDS_TopicDescription_get_name( + DDS_Topic_as_topicdescription(topic))) - DDS_ReturnCode_t rc = - DDS_DomainParticipant_delete_topic(participant, topic); + DDS_ReturnCode_t rc = + DDS_DomainParticipant_delete_topic(participant, topic); - if (DDS_RETCODE_OK != rc) { - RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS Topic") - return RMW_RET_ERROR; + if (DDS_RETCODE_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS Topic") + return RMW_RET_ERROR; + } } - } - rmw_ret_t rc = RMW_Connext_MessageTypeSupport::unregister_type_support( - this->ctx, participant, this->type_support->type_name()); + rmw_ret_t rc = RMW_Connext_MessageTypeSupport::unregister_type_support( + this->ctx, participant, this->type_support->type_name()); - if (RMW_RET_OK != rc) { - return rc; - } - - delete this->type_support; - this->type_support = nullptr; + if (RMW_RET_OK != rc) { + return rc; + } + delete this->type_support; + this->type_support = nullptr; + } return RMW_RET_OK; } @@ -1488,6 +1550,120 @@ RMW_Connext_Subscriber::take_serialized( return rc; } +rmw_ret_t +RMW_Connext_Subscriber::set_cft_expression_parameters( + const char * const filter_expression, + const rcutils_string_array_t * const expression_parameters) +{ + RMW_CONNEXT_ASSERT(nullptr != filter_expression) + RMW_CONNEXT_ASSERT(!this->internal) + RMW_CONNEXT_ASSERT(nullptr != this->node) + + rmw_ret_t ret; + std::lock_guard lock(this->cft_mutex); + const bool filter_expression_empty = (*filter_expression == '\0'); + if (nullptr == this->dds_topic_cft && filter_expression_empty) { + // allow to call set filter_expresson even if cft not exist + RMW_CONNEXT_LOG_DEBUG("current subscriber has no content filter topic") + return RMW_RET_OK; + } else if (nullptr != this->dds_topic_cft && !filter_expression_empty) { + // set filter expre if cft exist + return rmw_connextdds_set_cft_filter_expression( + this->dds_topic_cft, filter_expression, expression_parameters); + } + + DDS_DomainParticipant * const dp = this->dds_participant(); + DDS_Subscriber * const sub = this->dds_subscriber(); + DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(this->dds_topic); + std::string fqtopic_name = DDS_TopicDescription_get_name(sub_topic); + + // finalization to remove the old data reader + ret = this->finalize(true /* reset_cft */); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize subscriber with resetting cft flag") + return ret; + } + + if (nullptr == this->dds_topic_cft) { + // create a new cft topic, and then use cft topic to create a new reader + std::string cft_topic_name = + fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id(); + + ret = rmw_connextdds_create_contentfilteredtopic( + this->ctx, dp, this->dds_topic, cft_topic_name.c_str(), + filter_expression, + expression_parameters, &sub_topic); + + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR("failed to create content filter topic") + return ret; + } + + this->dds_topic_cft = sub_topic; + } else { + // delete cft, and then use the existing parent topic to create a new reader + ret = rmw_connextdds_delete_contentfilteredtopic( + this->ctx, dp, this->dds_topic_cft); + + if (RMW_RET_OK != ret) { + return ret; + } + this->dds_topic_cft = nullptr; + } + + DDS_DataReader * dds_reader = RMW_Connext_Subscriber::initialize_datareader( + this->ctx, + dp, + sub, + fqtopic_name, + &this->qos_policies, + this->type_support, + &this->subscriber_options, + this->internal, + sub_topic); + + if (nullptr == dds_reader) { + RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS reader") + return RMW_RET_ERROR; + } + + this->dds_reader = dds_reader; + + this->status_condition.reset( + new RMW_Connext_SubscriberStatusCondition( + this->dds_reader, this->ignore_local, this->internal)); + rmw_connextdds_get_entity_gid(this->dds_reader, this->ros_gid); + + ret = this->enable(); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR("failed to enable subscription") + return ret; + } + + ret = rmw_connextdds_graph_on_subscriber_created(this->ctx, this->node, this); + if (RMW_RET_OK != ret) { + RMW_CONNEXT_LOG_ERROR("failed to update graph for subscriber") + return ret; + } + + return RMW_RET_OK; +} + +rmw_ret_t +RMW_Connext_Subscriber::get_cft_expression_parameters( + char ** const filter_expression, + rcutils_string_array_t * const expression_parameters) +{ + std::lock_guard lock(this->cft_mutex); + if (nullptr == this->dds_topic_cft) { + RMW_CONNEXT_LOG_ERROR_SET("this subscriber has not created a contentfilteredtopic") + return RMW_RET_ERROR; + } + + return rmw_connextdds_get_cft_filter_expression( + this->dds_topic_cft, filter_expression, expression_parameters); +} + rmw_ret_t RMW_Connext_Subscriber::loan_messages() { @@ -1507,7 +1683,7 @@ RMW_Connext_Subscriber::loan_messages() this->type_support->type_name(), this->loan_len) if (this->loan_len > 0) { - return this->status_condition.set_data_available(true); + return this->status_condition->set_data_available(true); } return RMW_RET_OK; @@ -1523,16 +1699,16 @@ RMW_Connext_Subscriber::return_messages() "[%s] return loaned messages: %lu", this->type_support->type_name(), this->loan_len) - this->loan_len = 0; - this->loan_next = 0; - rmw_ret_t rc_result = RMW_RET_OK; rmw_ret_t rc = rmw_connextdds_return_samples(this); if (RMW_RET_OK != rc) { rc_result = rc; } - rc = this->status_condition.set_data_available(false); + this->loan_len = 0; + this->loan_next = 0; + + rc = this->status_condition->set_data_available(false); if (RMW_RET_OK != rc) { rc_result = rc; } @@ -1723,14 +1899,14 @@ rmw_connextdds_create_subscriber( topic_name, qos_policies, subscriber_options, - internal); + internal, + node); if (nullptr == rmw_sub_impl) { RMW_CONNEXT_LOG_ERROR( "failed to allocate RMW_Connext_Subscriber") return nullptr; } - auto scope_exit_rmw_reader_impl_delete = rcpputils::make_scope_exit( [rmw_sub_impl]() @@ -1774,6 +1950,7 @@ rmw_connextdds_create_subscriber( topic_name_len + 1); rmw_subscriber->options = *subscriber_options; rmw_subscriber->can_loan_messages = false; + rmw_subscriber->is_cft_supported = rmw_sub_impl->is_cft_supported(); if (!internal) { if (RMW_RET_OK != rmw_sub_impl->enable()) { @@ -2394,6 +2571,7 @@ RMW_Connext_Client::create( qos_policies, &sub_options, false /* internal */, + nullptr /* node */, RMW_CONNEXT_MESSAGE_REPLY, svc_members_res, svc_members_res_cpp, @@ -2686,6 +2864,7 @@ RMW_Connext_Service::create( qos_policies, &sub_options, false /* internal */, + nullptr /* node */, RMW_CONNEXT_MESSAGE_REQUEST, svc_members_req, svc_members_req_cpp, diff --git a/rmw_connextdds_common/src/common/rmw_subscription.cpp b/rmw_connextdds_common/src/common/rmw_subscription.cpp index 378322e9..2e088496 100644 --- a/rmw_connextdds_common/src/common/rmw_subscription.cpp +++ b/rmw_connextdds_common/src/common/rmw_subscription.cpp @@ -160,6 +160,54 @@ rmw_api_connextdds_subscription_get_actual_qos( } +rmw_ret_t +rmw_api_connextdds_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(filter_expression, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + rmw_ret_t rc = sub_impl->set_cft_expression_parameters(filter_expression, expression_parameters); + subscription->is_cft_supported = sub_impl->is_cft_supported(); + + return rc; +} + + +rmw_ret_t +rmw_api_connextdds_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(filter_expression, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(expression_parameters, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + rmw_ret_t rc = sub_impl->get_cft_expression_parameters(filter_expression, expression_parameters); + + return rc; +} + + rmw_ret_t rmw_api_connextdds_destroy_subscription( rmw_node_t * node, diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 0e4dfc12..ba7ede4b 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -210,6 +210,29 @@ rmw_connextdds_initialize_participant_qos_impl( return RMW_RET_OK; } +static +rmw_ret_t +rmw_connextdds_initialize_cft_parameters( + struct DDS_StringSeq * const cft_parameters, + const rcutils_string_array_t * const cft_expression_parameters) +{ + if (!DDS_StringSeq_ensure_length( + cft_parameters, cft_expression_parameters->size, cft_expression_parameters->size)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to ensure length for cft parameters sequence") + return RMW_RET_ERROR; + } + if (!DDS_StringSeq_from_array( + cft_parameters, + const_cast(cft_expression_parameters->data), + cft_expression_parameters->size)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to copy data for cft parameters sequence") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -217,6 +240,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -224,7 +248,20 @@ rmw_connextdds_create_contentfilteredtopic( RMW_CONNEXT_ASSERT(nullptr != cft_filter) struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; - DDS_StringSeq_ensure_length(&cft_parameters, 0, 0); + auto scope_exit_cft_params = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (cft_expression_parameters) { + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") + return RMW_RET_ERROR; + } + } *cft_out = nullptr; @@ -1224,3 +1261,120 @@ rmw_connextdds_enable_security( return RMW_RET_OK; } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const filter_expression, + const rcutils_string_array_t * const expression_parameters) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_cft_parameters = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (expression_parameters) { + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") + return RMW_RET_ERROR; + } + } + + DDS_ReturnCode_t ret = + DDS_ContentFilteredTopic_set_expression(cft_topic, filter_expression, &cft_parameters); + if (DDS_RETCODE_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to set content-filtered topic") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * cft_params_out) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + // get filter_expression + const char * expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); + if (!expression) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get filter expression") + return RMW_RET_ERROR; + } + + *expr_out = rcutils_strdup(expression, allocator); + if (NULL == *expr_out) { + RMW_CONNEXT_LOG_ERROR_SET("failed to duplicate string") + return RMW_RET_BAD_ALLOC; + } + auto scope_exit_filter_expression_delete = + rcpputils::make_scope_exit( + [expr_out, allocator]() + { + if (*expr_out) { + allocator.deallocate(*expr_out, allocator.state); + *expr_out = nullptr; + } + }); + + // get parameters + struct DDS_StringSeq parameters = DDS_SEQUENCE_INITIALIZER; + DDS_ReturnCode_t status = + DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); + if (DDS_RETCODE_OK != status) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get expression parameters") + return RMW_RET_ERROR; + } + auto scope_exit_parameters_delete = + rcpputils::make_scope_exit( + [¶meters]() + { + DDS_StringSeq_finalize(¶meters); + }); + + const DDS_Long parameters_len = DDS_StringSeq_get_length(¶meters); + rcutils_ret_t rcutils_ret = + rcutils_string_array_init(cft_params_out, static_cast(parameters_len), &allocator); + if (rcutils_ret != RCUTILS_RET_OK) { + RMW_CONNEXT_LOG_ERROR_SET("failed to init string array for expression parameters") + return RMW_RET_ERROR; + } + auto scope_exit_expression_parameters_delete = + rcpputils::make_scope_exit( + [cft_params_out]() + { + if (RCUTILS_RET_OK != rcutils_string_array_fini(cft_params_out)) { + RCUTILS_LOG_ERROR("Error while finalizing expression parameter due to another error"); + } + }); + for (DDS_Long i = 0; i < parameters_len; ++i) { + const char * parameter_ref = *DDS_StringSeq_get_reference(¶meters, i); + if (!parameter_ref) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to get a reference for parameter with index %d", i) + return RMW_RET_ERROR; + } + char * parameter = rcutils_strdup(parameter_ref, allocator); + if (!parameter) { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate memory for parameter") + return RMW_RET_BAD_ALLOC; + } + cft_params_out->data[i] = parameter; + } + + scope_exit_filter_expression_delete.cancel(); + scope_exit_expression_parameters_delete.cancel(); + return RMW_RET_OK; +} diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index d4397ab2..7b4d602c 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -867,6 +867,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -874,6 +875,7 @@ rmw_connextdds_create_contentfilteredtopic( UNUSED_ARG(base_topic); UNUSED_ARG(cft_name); UNUSED_ARG(cft_filter); + UNUSED_ARG(cft_expression_parameters); UNUSED_ARG(cft_out); return RMW_RET_UNSUPPORTED; } @@ -1952,3 +1954,27 @@ rmw_connextdds_enable_security( return RMW_RET_ERROR; #endif /* RMW_CONNEXT_ENABLE_SECURITY */ } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const filter_expression, + const rcutils_string_array_t * const expression_parameters) +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(filter_expression); + UNUSED_ARG(expression_parameters); + return RMW_RET_UNSUPPORTED; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + char ** const expr_out, + rcutils_string_array_t * const cft_params_out); +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(expr_out); + UNUSED_ARG(cft_params_out); + return RMW_RET_UNSUPPORTED; +} diff --git a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp index 9760208b..3b280046 100644 --- a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp +++ b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp @@ -637,6 +637,26 @@ rmw_subscription_get_actual_qos( } +rmw_ret_t +rmw_subscription_set_cft_expression_parameters( + rmw_subscription_t * subscription, + const char * filter_expression, + const rcutils_string_array_t * expression_parameters) +{ + return RMW_RET_UNSUPPORTED; +} + + +rmw_ret_t +rmw_subscription_get_cft_expression_parameters( + const rmw_subscription_t * subscription, + char ** filter_expression, + rcutils_string_array_t * expression_parameters) +{ + return RMW_RET_UNSUPPORTED; +} + + rmw_ret_t rmw_destroy_subscription( rmw_node_t * node,