diff --git a/rmw_fastrtps_cpp/CMakeLists.txt b/rmw_fastrtps_cpp/CMakeLists.txt index 108e67071..45314ad1a 100644 --- a/rmw_fastrtps_cpp/CMakeLists.txt +++ b/rmw_fastrtps_cpp/CMakeLists.txt @@ -60,6 +60,7 @@ add_library(rmw_fastrtps_cpp src/rmw_get_gid_for_publisher.cpp src/rmw_get_implementation_identifier.cpp src/rmw_get_serialization_format.cpp + src/rmw_get_topic_endpoint_info.cpp src/rmw_guard_condition.cpp src/rmw_init.cpp src/rmw_node.cpp diff --git a/rmw_fastrtps_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_fastrtps_cpp/src/rmw_get_topic_endpoint_info.cpp new file mode 100644 index 000000000..9982d73a7 --- /dev/null +++ b/rmw_fastrtps_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -0,0 +1,46 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/topic_endpoint_info_array.h" +#include "rmw/types.h" +#include "rmw_fastrtps_cpp/identifier.hpp" +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +extern "C" +{ +rmw_ret_t +rmw_get_publishers_info_by_topic( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * publishers_info) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_publishers_info_by_topic( + eprosima_fastrtps_identifier, node, allocator, topic_name, no_mangle, publishers_info); +} + +rmw_ret_t +rmw_get_subscriptions_info_by_topic( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * subscriptions_info) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_subscriptions_info_by_topic( + eprosima_fastrtps_identifier, node, allocator, topic_name, no_mangle, subscriptions_info); +} +} // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt index f2ac936a8..c64f4215b 100644 --- a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt +++ b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt @@ -64,6 +64,7 @@ add_library(rmw_fastrtps_dynamic_cpp src/rmw_get_gid_for_publisher.cpp src/rmw_get_implementation_identifier.cpp src/rmw_get_serialization_format.cpp + src/rmw_get_topic_endpoint_info.cpp src/rmw_guard_condition.cpp src/rmw_init.cpp src/rmw_node.cpp diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_get_topic_endpoint_info.cpp new file mode 100644 index 000000000..d19492e45 --- /dev/null +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -0,0 +1,46 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw/get_topic_endpoint_info.h" +#include "rmw/topic_endpoint_info_array.h" +#include "rmw/types.h" +#include "rmw_fastrtps_dynamic_cpp/identifier.hpp" +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +extern "C" +{ +rmw_ret_t +rmw_get_publishers_info_by_topic( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * publishers_info) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_publishers_info_by_topic( + eprosima_fastrtps_identifier, node, allocator, topic_name, no_mangle, publishers_info); +} + +rmw_ret_t +rmw_get_subscriptions_info_by_topic( + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * subscriptions_info) +{ + return rmw_fastrtps_shared_cpp::__rmw_get_subscriptions_info_by_topic( + eprosima_fastrtps_identifier, node, allocator, topic_name, no_mangle, subscriptions_info); +} +} // extern "C" diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index efa321401..a8ba7c2f5 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -56,6 +56,7 @@ add_library(rmw_fastrtps_shared_cpp src/rmw_compare_gids_equal.cpp src/rmw_count.cpp src/rmw_get_gid_for_publisher.cpp + src/rmw_get_topic_endpoint_info.cpp src/rmw_guard_condition.cpp src/rmw_logging.cpp src/rmw_node.cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp index eb6fbc4fe..3ce19dbd3 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_participant_info.hpp @@ -164,16 +164,22 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener { auto & topic_cache = is_reader ? reader_topic_cache : writer_topic_cache; - bool trigger; { std::lock_guard guard(topic_cache.getMutex()); if (is_alive) { - trigger = topic_cache().addTopic(proxyData.RTPSParticipantKey(), - proxyData.topicName().to_string(), proxyData.typeName().to_string()); + trigger = topic_cache().addTopic( + proxyData.RTPSParticipantKey(), + proxyData.guid(), + proxyData.topicName().to_string(), + proxyData.typeName().to_string(), + proxyData.m_qos); } else { - trigger = topic_cache().removeTopic(proxyData.RTPSParticipantKey(), - proxyData.topicName().to_string(), proxyData.typeName().to_string()); + trigger = topic_cache().removeTopic( + proxyData.RTPSParticipantKey(), + proxyData.guid(), + proxyData.topicName().to_string(), + proxyData.typeName().to_string()); } } if (trigger) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp index 1139413cb..fed276204 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp @@ -18,6 +18,8 @@ #include "rmw/rmw.h" #include "./visibility_control.h" +#include "fastrtps/qos/WriterQos.h" +#include "fastrtps/qos/ReaderQos.h" namespace eprosima { @@ -44,6 +46,68 @@ get_datawriter_qos( const rmw_qos_profile_t & qos_policies, eprosima::fastrtps::PublisherAttributes & pattr); +/* + * Converts the low-level QOS Policy; of type WriterQos or ReaderQos into rmw_qos_profile_t. + * Since WriterQos or ReaderQos does not have information about history and depth, these values are not set + * by this function. + * + * \param[in] dds_qos of type WriterQos or ReaderQos + * \param[out] qos the equivalent of the data in WriterQos or ReaderQos in rmw_qos_profile_t + */ +template +void +dds_qos_to_rmw_qos( + const DDSQoSPolicyT & dds_qos, + rmw_qos_profile_t * qos) +{ + switch (dds_qos.m_reliability.kind) { + case eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS: + qos->reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + break; + case eprosima::fastrtps::RELIABLE_RELIABILITY_QOS: + qos->reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + break; + default: + qos->reliability = RMW_QOS_POLICY_RELIABILITY_UNKNOWN; + break; + } + + switch (dds_qos.m_durability.kind) { + case eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS: + qos->durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + break; + case eprosima::fastrtps::VOLATILE_DURABILITY_QOS: + qos->durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; + break; + default: + qos->durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN; + break; + } + + qos->deadline.sec = dds_qos.m_deadline.period.seconds; + qos->deadline.nsec = dds_qos.m_deadline.period.nanosec; + + qos->lifespan.sec = dds_qos.m_lifespan.duration.seconds; + qos->lifespan.nsec = dds_qos.m_lifespan.duration.nanosec; + + switch (dds_qos.m_liveliness.kind) { + case eprosima::fastrtps::AUTOMATIC_LIVELINESS_QOS: + qos->liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + break; + case eprosima::fastrtps::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: + qos->liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE; + break; + case eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS: + qos->liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + break; + default: + qos->liveliness = RMW_QOS_POLICY_LIVELINESS_UNKNOWN; + break; + } + qos->liveliness_lease_duration.sec = dds_qos.m_liveliness.lease_duration.seconds; + qos->liveliness_lease_duration.nsec = dds_qos.m_liveliness.lease_duration.nanosec; +} + template void dds_attributes_to_rmw_qos( diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 7c01a6d08..a73ce8fbf 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -18,9 +18,10 @@ #include "./visibility_control.h" #include "rmw/error_handling.h" +#include "rmw/event.h" #include "rmw/rmw.h" +#include "rmw/topic_endpoint_info_array.h" #include "rmw/types.h" -#include "rmw/event.h" #include "rmw/names_and_types.h" namespace rmw_fastrtps_shared_cpp @@ -349,6 +350,26 @@ RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_destroy_wait_set(const char * identifier, rmw_wait_set_t * wait_set); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_get_publishers_info_by_topic( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * publishers_info); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_get_subscriptions_info_by_topic( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * subscriptions_info); + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__RMW_COMMON_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp index c5604ee93..a6201a3fb 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/topic_cache.hpp @@ -31,27 +31,41 @@ #include "rcpputils/thread_safety_annotations.hpp" #include "rcutils/logging_macros.h" +#include "qos.hpp" + typedef eprosima::fastrtps::rtps::GUID_t GUID_t; +/** + * A data structure that encapsulates all the data associated with a publisher + * or subscription by the topic it publishes or subscribes to + */ +struct TopicData +{ + GUID_t participant_guid; + GUID_t entity_guid; + std::string topic_type; + rmw_qos_profile_t qos_profile; +}; + /** * Topic cache data structure. Manages relationships between participants and topics. */ class TopicCache { private: - typedef std::map>> ParticipantTopicMap; - typedef std::unordered_map> TopicToTypes; + using TopicToTypes = std::unordered_map>; + using ParticipantTopicMap = std::map; + using TopicNameToTopicData = std::unordered_map>; /** - * Map of topic names to a vector of types that topic may use. + * Map of topic names to TopicData. Where topic data is vector of tuples containing + * participant GUID, topic type and the qos policy of the respective participant. * Topics here are represented as one to many, DDS XTypes 1.2 - * specifies application code 'generally' uses a 1-1 relationship. + * specifies application code that 'generally' uses a 1-1 relationship. * However, generic services such as logger and monitor, can discover * multiple types on the same topic. - * */ - TopicToTypes topic_to_types_; + TopicNameToTopicData topic_name_to_topic_data_; /** * Map of participant GUIDS to a set of topic-type. @@ -59,11 +73,27 @@ class TopicCache ParticipantTopicMap participant_to_topics_; /** - * Helper function to initialize a topic vector. + * Helper function to initialize an empty TopicData for a topic name. * - * @param topic_name + * \param topic_name the topic name for which the TopicNameToTopicData map should be initialized. + * \param topic_name_to_topic_data the map to initialize. */ - void initializeTopic(const std::string & topic_name, TopicToTypes & topic_to_types) + void initializeTopicDataMap( + const std::string & topic_name, + TopicNameToTopicData & topic_name_to_topic_data) + { + if (topic_name_to_topic_data.find(topic_name) == topic_name_to_topic_data.end()) { + topic_name_to_topic_data[topic_name] = std::vector(); + } + } + + /** + * Helper function to initialize a topic vector. + * + * \param topic_name the topic name for which the TopicToTypes map should be initialized. + * \param topic_to_types the map to initialize. + */ + void initializeTopicTypesMap(const std::string & topic_name, TopicToTypes & topic_to_types) { if (topic_to_types.find(topic_name) == topic_to_types.end()) { topic_to_types[topic_name] = std::vector(); @@ -73,8 +103,8 @@ class TopicCache /** * Helper function to initialize the set inside a participant map. * - * @param map - * @param guid + * \param map + * \param guid */ void initializeParticipantMap( ParticipantTopicMap & map, @@ -87,67 +117,97 @@ class TopicCache public: /** - * @return a map of topic name to the vector of topic types used. + * \return a map of topic name to the vector of topic types used. */ - const TopicToTypes & getTopicToTypes() const + const TopicToTypes getTopicToTypes() const { - return topic_to_types_; + TopicToTypes topic_to_types; + for (const auto & it : topic_name_to_topic_data_) { + topic_to_types[it.first] = std::vector(); + for (const auto & topic_data : it.second) { + topic_to_types[it.first].push_back(topic_data.topic_type); + } + } + return topic_to_types; } /** - * @return a map of participant guid to the vector of topic names used. + * \return a map of participant guid to the vector of topic names used. */ const ParticipantTopicMap & getParticipantToTopics() const { return participant_to_topics_; } + /** + * \return a map of topic name to a vector of GUID_t, type name and qos profile tuple. + */ + const TopicNameToTopicData & getTopicNameToTopicData() const + { + return topic_name_to_topic_data_; + } + /** * Add a topic based on discovery. * - * @param rtpsParticipantKey - * @param topic_name - * @param type_name - * @return true if a change has been recorded + * \param rtpsParticipantKey the participant key of the discovered publisher or subscription + * \param entity_guid the guid of the publisher or subscription + * \param topic_name the topic name associated with the discovered publisher or subscription + * \param type_name the topic type associated with the discovered publisher or subscription + * \param dds_qos the dds qos policy of the discovered publisher or subscription + * \return true if a change has been recorded */ + template bool addTopic( const eprosima::fastrtps::rtps::InstanceHandle_t & rtpsParticipantKey, + const GUID_t & entity_guid, const std::string & topic_name, - const std::string & type_name) + const std::string & type_name, + const T & dds_qos) { - initializeTopic(topic_name, topic_to_types_); - auto guid = iHandle2GUID(rtpsParticipantKey); - initializeParticipantMap(participant_to_topics_, guid); - initializeTopic(topic_name, participant_to_topics_[guid]); + initializeTopicDataMap(topic_name, topic_name_to_topic_data_); + auto participant_guid = iHandle2GUID(rtpsParticipantKey); + initializeParticipantMap(participant_to_topics_, participant_guid); + initializeTopicTypesMap(topic_name, participant_to_topics_[entity_guid]); if (rcutils_logging_logger_is_enabled_for("rmw_fastrtps_shared_cpp", RCUTILS_LOG_SEVERITY_DEBUG)) { std::stringstream guid_stream; - guid_stream << guid; + guid_stream << entity_guid; RCUTILS_LOG_DEBUG_NAMED( "rmw_fastrtps_shared_cpp", "Adding topic '%s' with type '%s' for node '%s'", topic_name.c_str(), type_name.c_str(), guid_stream.str().c_str()); } - topic_to_types_[topic_name].push_back(type_name); - participant_to_topics_[guid][topic_name].push_back(type_name); + rmw_qos_profile_t qos_profile = rmw_qos_profile_unknown; + dds_qos_to_rmw_qos(dds_qos, &qos_profile); + TopicData topic_data = { + participant_guid, + entity_guid, + type_name, + qos_profile + }; + topic_name_to_topic_data_[topic_name].push_back(topic_data); + participant_to_topics_[participant_guid][topic_name].push_back(type_name); return true; } /** * Remove a topic based on discovery. * - * @param rtpsParticipantKey - * @param topic_name - * @param type_name - * @return true if a change has been recorded + * \param rtpsParticipantKey the participant key of the publisher or subscription + * \param entity_guid the guid of the publisher or subscription + * \param topic_name the topic name associated with the publisher or subscription + * \param type_name the topic type associated with the publisher or subscription + * \return true if a change has been recorded */ bool removeTopic( const eprosima::fastrtps::rtps::InstanceHandle_t & rtpsParticipantKey, + const eprosima::fastrtps::rtps::GUID_t & entity_guid, const std::string & topic_name, const std::string & type_name) { - if (topic_to_types_.find(topic_name) == topic_to_types_.end()) { + if (topic_name_to_topic_data_.find(topic_name) == topic_name_to_topic_data_.end()) { RCUTILS_LOG_DEBUG_NAMED( "rmw_fastrtps_shared_cpp", "unexpected removal on topic '%s' with type '%s'", @@ -155,28 +215,31 @@ class TopicCache return false; } { - auto & type_vec = topic_to_types_[topic_name]; - type_vec.erase(std::find(type_vec.begin(), type_vec.end(), type_name)); + auto & type_vec = topic_name_to_topic_data_[topic_name]; + type_vec.erase(std::find_if(type_vec.begin(), type_vec.end(), + [type_name, entity_guid](const auto & topic_data) { + return type_name.compare(topic_data.topic_type) == 0 && + entity_guid == topic_data.entity_guid; + })); if (type_vec.empty()) { - topic_to_types_.erase(topic_name); + topic_name_to_topic_data_.erase(topic_name); } } - - auto guid = iHandle2GUID(rtpsParticipantKey); - auto guid_topics_pair = participant_to_topics_.find(guid); + auto participant_guid = iHandle2GUID(rtpsParticipantKey); + auto guid_topics_pair = participant_to_topics_.find(participant_guid); if (guid_topics_pair != participant_to_topics_.end() && guid_topics_pair->second.find(topic_name) != guid_topics_pair->second.end()) { auto & type_vec = guid_topics_pair->second[topic_name]; type_vec.erase(std::find(type_vec.begin(), type_vec.end(), type_name)); if (type_vec.empty()) { - participant_to_topics_[guid].erase(topic_name); + participant_to_topics_[participant_guid].erase(topic_name); } - if (participant_to_topics_[guid].empty()) { - participant_to_topics_.erase(guid); + if (participant_to_topics_[participant_guid].empty()) { + participant_to_topics_.erase(participant_guid); } } else { - RCUTILS_LOG_DEBUG_NAMED( + RCUTILS_LOG_ERROR_NAMED( "rmw_fastrtps_shared_cpp", "Unable to remove topic, does not exist '%s' with type '%s'", topic_name.c_str(), type_name.c_str()); @@ -223,7 +286,7 @@ class LockedObject public: /** - * @return a reference to this object to lock. + * \return a reference to this object to lock. */ std::mutex & getMutex() const RCPPUTILS_TSA_RETURN_CAPABILITY(mutex_) { diff --git a/rmw_fastrtps_shared_cpp/src/qos.cpp b/rmw_fastrtps_shared_cpp/src/qos.cpp index beaae41fb..e6ca5a20c 100644 --- a/rmw_fastrtps_shared_cpp/src/qos.cpp +++ b/rmw_fastrtps_shared_cpp/src/qos.cpp @@ -14,7 +14,6 @@ #include -#include "qos_converter.hpp" #include "rmw_fastrtps_shared_cpp/qos.hpp" #include "fastrtps/attributes/PublisherAttributes.h" diff --git a/rmw_fastrtps_shared_cpp/src/qos_converter.hpp b/rmw_fastrtps_shared_cpp/src/qos_converter.hpp deleted file mode 100644 index 6f7460cd1..000000000 --- a/rmw_fastrtps_shared_cpp/src/qos_converter.hpp +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - - -#ifndef QOS_CONVERTER_HPP_ -#define QOS_CONVERTER_HPP_ - -#include "rmw/types.h" - -#include "fastrtps/qos/WriterQos.h" -#include "fastrtps/qos/ReaderQos.h" - -/* - * Converts the low-level QOS Policy; of type WriterQos or ReaderQos into rmw_qos_profile_t. - * Since WriterQos or ReaderQos does not have information about history and depth, these values are not set - * by this function. - * - * \param[in] dds_qos of type WriterQos or ReaderQos - * \param[out] qos the equivalent of the data in WriterQos or ReaderQos in rmw_qos_profile_t - */ -template -void -dds_qos_to_rmw_qos( - const DDSQoSPolicyT & dds_qos, - rmw_qos_profile_t * qos) -{ - switch (dds_qos.m_reliability.kind) { - case eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS: - qos->reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - break; - case eprosima::fastrtps::RELIABLE_RELIABILITY_QOS: - qos->reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; - break; - default: - qos->reliability = RMW_QOS_POLICY_RELIABILITY_UNKNOWN; - break; - } - - switch (dds_qos.m_durability.kind) { - case eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS: - qos->durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; - break; - case eprosima::fastrtps::VOLATILE_DURABILITY_QOS: - qos->durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; - break; - default: - qos->durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN; - break; - } - - qos->deadline.sec = dds_qos.m_deadline.period.seconds; - qos->deadline.nsec = dds_qos.m_deadline.period.nanosec; - - qos->lifespan.sec = dds_qos.m_lifespan.duration.seconds; - qos->lifespan.nsec = dds_qos.m_lifespan.duration.nanosec; - - switch (dds_qos.m_liveliness.kind) { - case eprosima::fastrtps::AUTOMATIC_LIVELINESS_QOS: - qos->liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; - break; - case eprosima::fastrtps::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: - qos->liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE; - break; - case eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS: - qos->liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; - break; - default: - qos->liveliness = RMW_QOS_POLICY_LIVELINESS_UNKNOWN; - break; - } - qos->liveliness_lease_duration.sec = dds_qos.m_liveliness.lease_duration.seconds; - qos->liveliness_lease_duration.nsec = dds_qos.m_liveliness.lease_duration.nanosec; -} - -#endif // QOS_CONVERTER_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_get_topic_endpoint_info.cpp b/rmw_fastrtps_shared_cpp/src/rmw_get_topic_endpoint_info.cpp new file mode 100644 index 000000000..336e0359c --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/rmw_get_topic_endpoint_info.cpp @@ -0,0 +1,333 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "rmw/rmw.h" +#include "rmw/types.h" +#include "rmw/topic_endpoint_info_array.h" +#include "rmw/topic_endpoint_info.h" + +#include "demangle.hpp" +#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp" +#include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp" +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +namespace rmw_fastrtps_shared_cpp +{ + +rmw_ret_t +_validate_params( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + rmw_topic_endpoint_info_array_t * participants_info) +{ + if (!identifier) { + RMW_SET_ERROR_MSG("null implementation identifier provided"); + return RMW_RET_ERROR; + } + + if (!topic_name) { + RMW_SET_ERROR_MSG("null topic_name provided"); + return RMW_RET_ERROR; + } + + if (!allocator) { + RMW_SET_ERROR_MSG("null allocator provided"); + return RMW_RET_ERROR; + } + + if (!node) { + RMW_SET_ERROR_MSG("null node handle"); + return RMW_RET_ERROR; + } + + // Get participant pointer from node + if (node->implementation_identifier != identifier) { + RMW_SET_ERROR_MSG("node handle not from this implementation"); + return RMW_RET_ERROR; + } + + if (!participants_info) { + RMW_SET_ERROR_MSG("null participants_info provided"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +std::vector +_get_topic_fqdns(const char * topic_name, bool no_mangle) +{ + std::vector topic_fqdns; + topic_fqdns.push_back(topic_name); + if (!no_mangle) { + auto ros_prefixes = _get_all_ros_prefixes(); + // Build the list of all possible topic FQDN + std::for_each(ros_prefixes.begin(), ros_prefixes.end(), + [&topic_fqdns, &topic_name](const std::string & prefix) { + topic_fqdns.push_back(prefix + topic_name); + }); + } + return topic_fqdns; +} + +void +_handle_topic_endpoint_info_fini( + rmw_topic_endpoint_info_t * topic_endpoint_info, + rcutils_allocator_t * allocator) +{ + bool had_error = rmw_error_is_set(); + std::string error_string; + if (had_error) { + error_string = rmw_get_error_string().str; + } + rmw_reset_error(); + + rmw_ret_t ret = rmw_topic_endpoint_info_fini(topic_endpoint_info, allocator); + if (ret != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_fastrtps_cpp", + "rmw_topic_endpoint_info_fini failed: %s", + rmw_get_error_string().str); + rmw_reset_error(); + } + + if (had_error) { + RMW_SET_ERROR_MSG(error_string.c_str()); + } +} + +rmw_ret_t +_set_rmw_topic_endpoint_info( + rmw_topic_endpoint_info_t * topic_endpoint_info, + const GUID_t & participant_guid, + const char * node_name, + const char * node_namespace, + TopicData topic_data, + bool no_mangle, + bool is_publisher, + ::ParticipantListener * slave_target, + rcutils_allocator_t * allocator) +{ + static_assert( + sizeof(eprosima::fastrtps::rtps::GUID_t) <= RMW_GID_STORAGE_SIZE, + "RMW_GID_STORAGE_SIZE insufficient to store the rmw_fastrtps_cpp GID implementation." + ); + rmw_ret_t ret; + // set endpoint type + if (is_publisher) { + ret = rmw_topic_endpoint_info_set_endpoint_type(topic_endpoint_info, RMW_ENDPOINT_PUBLISHER); + } else { + ret = rmw_topic_endpoint_info_set_endpoint_type(topic_endpoint_info, RMW_ENDPOINT_SUBSCRIPTION); + } + if (ret != RMW_RET_OK) { + return ret; + } + // set endpoint gid + uint8_t rmw_gid[RMW_GID_STORAGE_SIZE]; + memset(&rmw_gid, 0, RMW_GID_STORAGE_SIZE); + memcpy(&rmw_gid, &topic_data.entity_guid, sizeof(eprosima::fastrtps::rtps::GUID_t)); + ret = rmw_topic_endpoint_info_set_gid( + topic_endpoint_info, + rmw_gid, + sizeof(eprosima::fastrtps::rtps::GUID_t)); + if (ret != RMW_RET_OK) { + return ret; + } + // set qos profile + ret = rmw_topic_endpoint_info_set_qos_profile(topic_endpoint_info, &topic_data.qos_profile); + if (ret != RMW_RET_OK) { + return ret; + } + // set topic type + std::string type_name = + no_mangle ? topic_data.topic_type : _demangle_if_ros_type(topic_data.topic_type); + ret = rmw_topic_endpoint_info_set_topic_type(topic_endpoint_info, type_name.c_str(), allocator); + if (ret != RMW_RET_OK) { + return ret; + } + // Check if this participant is the same as the node that is passed + if (topic_data.participant_guid == participant_guid) { + ret = rmw_topic_endpoint_info_set_node_name(topic_endpoint_info, node_name, allocator); + if (ret != RMW_RET_OK) { + return ret; + } + ret = + rmw_topic_endpoint_info_set_node_namespace(topic_endpoint_info, node_namespace, allocator); + if (ret != RMW_RET_OK) { + return ret; + } + return ret; + } + // This means that this discovered participant is not associated with the passed node + // and hence we must find its name and namespace from the discovered_names and + // discovered_namespace maps + // set node name + const auto & d_name_it = slave_target->discovered_names.find(topic_data.participant_guid); + if (d_name_it != slave_target->discovered_names.end()) { + ret = rmw_topic_endpoint_info_set_node_name( + topic_endpoint_info, d_name_it->second.c_str(), allocator); + } else { + ret = rmw_topic_endpoint_info_set_node_name( + topic_endpoint_info, "_NODE_NAME_UNKNOWN_", allocator); + } + if (ret != RMW_RET_OK) { + return ret; + } + // set node namespace + const auto & d_namespace_it = + slave_target->discovered_namespaces.find(topic_data.participant_guid); + if (d_namespace_it != slave_target->discovered_namespaces.end()) { + ret = rmw_topic_endpoint_info_set_node_namespace( + topic_endpoint_info, d_namespace_it->second.c_str(), allocator); + } else { + ret = rmw_topic_endpoint_info_set_node_namespace( + topic_endpoint_info, "_NODE_NAMESPACE_UNKNOWN_", allocator); + } + return ret; +} + + +rmw_ret_t +_get_info_by_topic( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + bool is_publisher, + rmw_topic_endpoint_info_array_t * participants_info) +{ + rmw_ret_t ret = _validate_params( + identifier, + node, + allocator, + topic_name, + participants_info); + if (ret != RMW_RET_OK) { + return ret; + } + + const auto & topic_fqdns = _get_topic_fqdns(topic_name, no_mangle); + auto impl = static_cast(node->data); + // The GUID of the participant associated with this node + const auto & participant_guid = impl->participant->getGuid(); + const auto & node_name = node->name; + const auto & node_namespace = node->namespace_; + ::ParticipantListener * slave_target = impl->listener; + auto & topic_cache = + is_publisher ? slave_target->writer_topic_cache : slave_target->reader_topic_cache; + { + std::lock_guard guard(topic_cache.getMutex()); + const auto & topic_name_to_data = topic_cache().getTopicNameToTopicData(); + std::vector topic_endpoint_info_vector; + for (const auto & topic_name : topic_fqdns) { + const auto it = topic_name_to_data.find(topic_name); + if (it != topic_name_to_data.end()) { + for (const auto & data : it->second) { + rmw_topic_endpoint_info_t topic_endpoint_info = + rmw_get_zero_initialized_topic_endpoint_info(); + rmw_ret_t ret = _set_rmw_topic_endpoint_info( + &topic_endpoint_info, + participant_guid, + node_name, + node_namespace, + data, + no_mangle, + is_publisher, + slave_target, + allocator); + if (ret != RMW_RET_OK) { + // Free topic_endpoint_info + _handle_topic_endpoint_info_fini(&topic_endpoint_info, allocator); + // Free all the space allocated to the previous topic_endpoint_infos + for (auto & tinfo : topic_endpoint_info_vector) { + _handle_topic_endpoint_info_fini(&tinfo, allocator); + } + return ret; + } + // add rmw_topic_endpoint_info_t to a vector + topic_endpoint_info_vector.push_back(topic_endpoint_info); + } + } + } + + // add all the elements from the vector to rmw_topic_endpoint_info_array_t + auto count = topic_endpoint_info_vector.size(); + ret = rmw_topic_endpoint_info_array_init_with_size(participants_info, count, allocator); + if (ret != RMW_RET_OK) { + rmw_error_string_t error_message = rmw_get_error_string(); + rmw_reset_error(); + RMW_SET_ERROR_MSG_WITH_FORMAT_STRING( + "rmw_topic_endpoint_info_array_init_with_size failed to allocate memory: %s", + error_message.str); + // Free all the space allocated to the previous topic_endpoint_infos + for (auto & tinfo : topic_endpoint_info_vector) { + _handle_topic_endpoint_info_fini(&tinfo, allocator); + } + return ret; + } + for (auto i = 0u; i < count; i++) { + participants_info->info_array[i] = topic_endpoint_info_vector.at(i); + } + participants_info->count = count; + } + return RMW_RET_OK; +} + +rmw_ret_t +__rmw_get_publishers_info_by_topic( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * publishers_info) +{ + return _get_info_by_topic( + identifier, + node, + allocator, + topic_name, + no_mangle, + true, /*is_publisher*/ + publishers_info); +} + +rmw_ret_t +__rmw_get_subscriptions_info_by_topic( + const char * identifier, + const rmw_node_t * node, + rcutils_allocator_t * allocator, + const char * topic_name, + bool no_mangle, + rmw_topic_endpoint_info_array_t * subscriptions_info) +{ + return _get_info_by_topic( + identifier, + node, + allocator, + topic_name, + no_mangle, + false, /*is_publisher*/ + subscriptions_info); +} +} // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp index 65b01607a..37e70e9d9 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_node_info_and_types.cpp @@ -47,12 +47,12 @@ constexpr char kLoggerTag[] = "rmw_fastrtps_shared_cpp"; /** * Get the guid that corresponds to the node and namespace. * - * @param node to discover other participants with - * @param node_name of the desired node - * @param node_namespace of the desired node - * @param guid [out] result - * @return RMW_RET_ERROR if unable to find guid - * @return RMW_RET_OK if guid is available + * \param node to discover other participants with + * \param node_name of the desired node + * \param node_namespace of the desired node + * \param guid [out] result + * \return RMW_RET_ERROR if unable to find guid + * \return RMW_RET_OK if guid is available */ rmw_ret_t __get_guid_by_name( const rmw_node_t * node, const char * node_name, @@ -96,9 +96,9 @@ rmw_ret_t __get_guid_by_name( /** * Validate the input data of node_info_and_types functions. * - * @return RMW_RET_INVALID_ARGUMENT for null input args - * @return RMW_RET_ERROR if identifier is not the same as the input node - * @return RMW_RET_OK if all input is valid + * \return RMW_RET_INVALID_ARGUMENT for null input args + * \return RMW_RET_ERROR if identifier is not the same as the input node + * \return RMW_RET_OK if all input is valid */ rmw_ret_t __validate_input( const char * identifier, @@ -145,10 +145,10 @@ rmw_ret_t __validate_input( * Get info from publisher and subscriber * Combined results from the two lists * - * @param topic_cache cache with topic information - * @param topics [out] resulting topics - * @param node_guid_ to find information for - * @param no_demangle true if demangling will not occur + * \param topic_cache cache with topic information + * \param topics [out] resulting topics + * \param node_guid_ to find information for + * \param no_demangle true if demangling will not occur */ void __accumulate_topics( @@ -183,11 +183,11 @@ __accumulate_topics( /** * Copy topic data to results * - * @param topics to copy over - * @param allocator to use - * @param no_demangle true if demangling will not occur - * @param topic_names_and_types [out] final rmw result - * @return RMW_RET_OK if successful + * \param topics to copy over + * \param allocator to use + * \param no_demangle true if demangling will not occur + * \param topic_names_and_types [out] final rmw result + * \return RMW_RET_OK if successful */ rmw_ret_t __copy_data_to_results( @@ -313,15 +313,15 @@ typedef std::function&(CustomParticipantInfo & pa /** * Get topic names and types for the specific node_name and node_namespace requested. * - * @param identifier corresponding to the input node - * @param node to use for discovery - * @param allocator for returned value - * @param node_name to search - * @param node_namespace to search - * @param no_demangle true if the topics should not be demangled - * @param retrieve_cache_func getter for topic cache - * @param topic_names_and_types result - * @return RMW_RET_OK if successful + * \param identifier corresponding to the input node + * \param node to use for discovery + * \param allocator for returned value + * \param node_name to search + * \param node_namespace to search + * \param no_demangle true if the topics should not be demangled + * \param retrieve_cache_func getter for topic cache + * \param topic_names_and_types result + * \return RMW_RET_OK if successful */ rmw_ret_t __rmw_get_topic_names_and_types_by_node( diff --git a/rmw_fastrtps_shared_cpp/test/CMakeLists.txt b/rmw_fastrtps_shared_cpp/test/CMakeLists.txt index 8abf78e36..868a5bcbf 100644 --- a/rmw_fastrtps_shared_cpp/test/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/test/CMakeLists.txt @@ -1,6 +1,13 @@ find_package(ament_cmake_gtest REQUIRED) + ament_add_gtest(test_dds_attributes_to_rmw_qos test_dds_attributes_to_rmw_qos.cpp) if(TARGET test_dds_attributes_to_rmw_qos) ament_target_dependencies(test_dds_attributes_to_rmw_qos) target_link_libraries(test_dds_attributes_to_rmw_qos ${PROJECT_NAME}) endif() + +ament_add_gtest(test_topic_cache test_topic_cache.cpp) +if(TARGET test_topic_cache) + ament_target_dependencies(test_topic_cache) + target_link_libraries(test_topic_cache ${PROJECT_NAME}) +endif() diff --git a/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp b/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp new file mode 100644 index 000000000..c5b6bf031 --- /dev/null +++ b/rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp @@ -0,0 +1,265 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "rmw_fastrtps_shared_cpp/topic_cache.hpp" + +#include "fastrtps/rtps/common/InstanceHandle.h" +#include "fastrtps/qos/ReaderQos.h" +#include "fastrtps/qos/WriterQos.h" +#include "rmw/types.h" + +using eprosima::fastrtps::WriterQos; +using eprosima::fastrtps::ReaderQos; +using eprosima::fastrtps::rtps::GUID_t; +using eprosima::fastrtps::rtps::GuidPrefix_t; +using eprosima::fastrtps::rtps::InstanceHandle_t; + +class TopicCacheTestFixture : public ::testing::Test +{ +public: + TopicCache topic_cache; + WriterQos qos[2]; + rmw_qos_profile_t rmw_qos[2]; + InstanceHandle_t participant_instance_handler[2]; + GUID_t participant_guid[2]; + GUID_t guid[2]; + + void SetUp() + { + // Create instance handlers + for (int i = 0; i < 2; i++) { + guid[i] = GUID_t(GuidPrefix_t(), i + 100); + participant_guid[i] = GUID_t(GuidPrefix_t(), i + 1); + participant_instance_handler[i] = participant_guid[i]; + } + + // Populating WriterQos -> which is from the DDS layer and + // rmw_qos_profile_t which is from rmw/types.h. + // This is done to test if topic_cache.getTopicNameToTopicData() returns + // the correct value in rmw_qos_profile_t for a given WriterQos + + // DDS qos + qos[0].m_durability.kind = eprosima::fastrtps::TRANSIENT_DURABILITY_QOS; + qos[0].m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + qos[0].m_deadline.period = {123, 5678}; + qos[0].m_lifespan.duration = {190, 1234}; + qos[0].m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS; + qos[0].m_liveliness.lease_duration = {80, 5555555}; + + // equivalent rmq qos + rmw_qos[0].durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN; + rmw_qos[0].reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + rmw_qos[0].liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + rmw_qos[0].liveliness_lease_duration.sec = 80u; + rmw_qos[0].liveliness_lease_duration.nsec = 5555555u; + rmw_qos[0].deadline.sec = 123u; + rmw_qos[0].deadline.nsec = 5678u; + rmw_qos[0].lifespan.sec = 190u; + rmw_qos[0].lifespan.nsec = 1234u; + + // DDS qos + qos[1].m_durability.kind = eprosima::fastrtps::TRANSIENT_DURABILITY_QOS; + qos[1].m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + qos[1].m_deadline.period = {12, 1234}; + qos[1].m_lifespan.duration = {19, 5432}; + qos[1].m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS; + qos[1].m_liveliness.lease_duration = {8, 78901234}; + + // equivalent rmq qos + rmw_qos[1].durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN; + rmw_qos[1].reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + rmw_qos[1].liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + rmw_qos[1].liveliness_lease_duration.sec = 8u; + rmw_qos[1].liveliness_lease_duration.nsec = 78901234u; + rmw_qos[1].deadline.sec = 12u; + rmw_qos[1].deadline.nsec = 1234u; + rmw_qos[1].lifespan.sec = 19u; + rmw_qos[1].lifespan.nsec = 5432u; + + // Add data to topic_cache + topic_cache.addTopic(participant_instance_handler[0], guid[0], "topic1", "type1", qos[0]); + topic_cache.addTopic(participant_instance_handler[0], guid[0], "topic2", "type2", qos[0]); + topic_cache.addTopic(participant_instance_handler[1], guid[1], "topic1", "type1", qos[1]); + topic_cache.addTopic(participant_instance_handler[1], guid[1], "topic2", "type1", qos[1]); + } +}; + +TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_types) +{ + const auto topic_type_map = this->topic_cache.getTopicToTypes(); + + // topic1 + const auto & it = topic_type_map.find("topic1"); + ASSERT_TRUE(it != topic_type_map.end()); + // Verify that the object returned from the map is indeed the one expected + auto topic_types = it->second; + // Verify that there are two entries for type1 + EXPECT_EQ(std::count(topic_types.begin(), topic_types.end(), "type1"), 2); + EXPECT_EQ(topic_types.at(0), "type1"); + EXPECT_EQ(topic_types.at(1), "type1"); + + // topic2 + const auto & it2 = topic_type_map.find("topic2"); + ASSERT_TRUE(it2 != topic_type_map.end()); + // Verify that the object returned from the map is indeed the one expected + topic_types = it2->second; + // Verify that there are entries for type1 and type2 + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type2") != topic_types.end()); +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_get_participant_map) +{ + const auto & participant_topic_map = this->topic_cache.getParticipantToTopics(); + + // participant 1 + const auto & it = participant_topic_map.find(this->participant_guid[0]); + ASSERT_TRUE(it != participant_topic_map.end()); + // Verify that the topic and respective types are present + const auto & topic_type_map = it->second; + + const auto & topic1it = topic_type_map.find("topic1"); + ASSERT_TRUE(topic1it != topic_type_map.end()); + auto topic_types = topic1it->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); + + const auto & topic2it = topic_type_map.find("topic2"); + ASSERT_TRUE(topic2it != topic_type_map.end()); + topic_types = topic2it->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type2") != topic_types.end()); + + // participant 2 + const auto & it2 = participant_topic_map.find(this->participant_guid[1]); + ASSERT_TRUE(it2 != participant_topic_map.end()); + // Verify that the topic and respective types are present + const auto & topic_type_map2 = it2->second; + + const auto & topic1it2 = topic_type_map2.find("topic1"); + ASSERT_TRUE(topic1it2 != topic_type_map2.end()); + topic_types = topic1it2->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); + + const auto & topic2it2 = topic_type_map2.find("topic2"); + ASSERT_TRUE(topic2it2 != topic_type_map2.end()); + topic_types = topic2it2->second; + EXPECT_TRUE(std::find(topic_types.begin(), topic_types.end(), "type1") != topic_types.end()); +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_name_topic_data_map) +{ + const auto & topic_data_map = this->topic_cache.getTopicNameToTopicData(); + auto expected_results = std::map>(); + expected_results["topic1"].push_back({participant_guid[0], guid[0], "type1", rmw_qos[0]}); + expected_results["topic1"].push_back({participant_guid[1], guid[1], "type1", rmw_qos[1]}); + expected_results["topic2"].push_back({participant_guid[0], guid[0], "type2", rmw_qos[0]}); + expected_results["topic2"].push_back({participant_guid[1], guid[1], "type1", rmw_qos[1]}); + for (const auto & result_it : expected_results) { + const auto & topic_name = result_it.first; + const auto & expected_topic_data = result_it.second; + + const auto & it = topic_data_map.find(topic_name); + ASSERT_TRUE(it != topic_data_map.end()); + // Verify that the topic has all the associated data + const auto & topic_data = it->second; + for (auto i = 0u; i < expected_topic_data.size(); i++) { + // PARTICIPANT GUID + EXPECT_EQ(topic_data.at(i).participant_guid, expected_topic_data.at(i).participant_guid); + // GUID + EXPECT_EQ(topic_data.at(i).entity_guid, expected_topic_data.at(i).entity_guid); + // TYPE + EXPECT_EQ(topic_data.at(i).topic_type, expected_topic_data.at(i).topic_type); + // QOS + const auto & qos = topic_data.at(i).qos_profile; + const auto & expected_qos = expected_topic_data.at(i).qos_profile; + EXPECT_EQ(qos.durability, expected_qos.durability); + EXPECT_EQ(qos.reliability, expected_qos.reliability); + EXPECT_EQ(qos.liveliness, expected_qos.liveliness); + EXPECT_EQ(qos.liveliness_lease_duration.sec, expected_qos.liveliness_lease_duration.sec); + EXPECT_EQ(qos.liveliness_lease_duration.nsec, expected_qos.liveliness_lease_duration.nsec); + EXPECT_EQ(qos.deadline.sec, expected_qos.deadline.sec); + EXPECT_EQ(qos.deadline.nsec, expected_qos.deadline.nsec); + EXPECT_EQ(qos.lifespan.sec, expected_qos.lifespan.sec); + EXPECT_EQ(qos.lifespan.nsec, expected_qos.lifespan.nsec); + } + } +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_add_topic) +{ + // Add Topic + const bool did_add = + this->topic_cache.addTopic(this->participant_instance_handler[1], this->guid[1], "TestTopic", + "TestType", this->qos[1]); + // Verify that the returned value was true + EXPECT_TRUE(did_add); +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_remove_topic_element_exists) +{ + auto did_remove = + this->topic_cache.removeTopic(this->participant_instance_handler[0], this->guid[0], "topic1", + "type1"); + // Assert that the return was true + ASSERT_TRUE(did_remove); + // Verify it is removed from TopicToTypes + const auto & topic_type_map = this->topic_cache.getTopicToTypes(); + const auto & topic_type_it = topic_type_map.find("topic1"); + ASSERT_TRUE(topic_type_it != topic_type_map.end()); + EXPECT_EQ(std::count(topic_type_it->second.begin(), topic_type_it->second.end(), "type1"), 1); + // Verify it is removed from ParticipantTopicMap + const auto & participant_topic_map = this->topic_cache.getParticipantToTopics(); + const auto & participant_topic_it = participant_topic_map.find(this->participant_guid[0]); + ASSERT_TRUE(participant_topic_it != participant_topic_map.end()); + const auto & p_topic_type_it = participant_topic_it->second.find("topic1"); + ASSERT_TRUE(p_topic_type_it == participant_topic_it->second.end()); + // Verify it is removed from TopicNameToTopicTypeMap + const auto & topic_data_map = this->topic_cache.getTopicNameToTopicData(); + const auto & topic_data_it = topic_data_map.find("topic1"); + ASSERT_TRUE(topic_data_it != topic_data_map.end()); + EXPECT_EQ(topic_data_it->second.size(), 1u); + + did_remove = this->topic_cache.removeTopic(this->participant_instance_handler[1], this->guid[1], + "topic1", "type1"); + ASSERT_TRUE(did_remove); + const auto & topic_type_map2 = this->topic_cache.getTopicToTypes(); + const auto & topic_type_it2 = topic_type_map2.find("topic1"); + ASSERT_TRUE(topic_type_it2 == topic_type_map2.end()); + const auto & participant_topic_map2 = this->topic_cache.getParticipantToTopics(); + const auto & participant_topic_it2 = participant_topic_map2.find(this->participant_guid[1]); + ASSERT_TRUE(participant_topic_it2 != participant_topic_map2.end()); + const auto & p_topic_type_it2 = participant_topic_it2->second.find("topic1"); + ASSERT_TRUE(p_topic_type_it2 == participant_topic_it2->second.end()); + const auto & topic_data_map2 = this->topic_cache.getTopicNameToTopicData(); + const auto & topic_data_it2 = topic_data_map2.find("topic1"); + ASSERT_TRUE(topic_data_it2 == topic_data_map2.end()); +} + +TEST_F(TopicCacheTestFixture, test_topic_cache_remove_policy_element_does_not_exist) +{ + // add topic + this->topic_cache.addTopic(this->participant_instance_handler[1], this->guid[1], "TestTopic", + "TestType", this->qos[1]); + // Assert that the return was false + const auto did_remove = this->topic_cache.removeTopic(this->participant_instance_handler[1], + this->guid[1], "NewTestTopic", + "TestType"); + ASSERT_FALSE(did_remove); +}