Skip to content

Commit

Permalink
- Modified topic_cache to include qos policies
Browse files Browse the repository at this point in the history
- Modified call to topic_cache.addTopic at custom_participant_info.hpp
- Wrote tests for topic_cache

Signed-off-by: Jaison Titus <[email protected]>
  • Loading branch information
jaisontj committed Oct 18, 2019
1 parent 1342f8e commit 8cd49bd
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener
{
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
if (is_alive) {
trigger = topic_cache().addTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName().to_string(), proxyData.typeName().to_string());
trigger = topic_cache().addTopic(
proxyData.RTPSParticipantKey(),
proxyData.topicName().to_string(),
proxyData.typeName().to_string(),
proxyData.m_qos);
} else {
trigger = topic_cache().removeTopic(proxyData.RTPSParticipantKey(),
proxyData.topicName().to_string(), proxyData.typeName().to_string());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
#include <unordered_map>
#include <utility>
#include <vector>
#include <tuple>

#include "fastrtps/participant/Participant.h"
#include "fastrtps/rtps/common/Guid.h"
#include "fastrtps/rtps/common/InstanceHandle.h"
#include "qos_converter.hpp"
#include "rcpputils/thread_safety_annotations.hpp"
#include "rcutils/logging_macros.h"

Expand All @@ -39,31 +41,48 @@ typedef eprosima::fastrtps::rtps::GUID_t GUID_t;
class TopicCache
{
private:
typedef std::map<GUID_t,
std::unordered_map<std::string, std::vector<std::string>>> ParticipantTopicMap;
typedef std::unordered_map<std::string, std::vector<std::string>> TopicToTypes;
typedef std::map<GUID_t, TopicToTypes> ParticipantTopicMap;
typedef std::vector<std::tuple<GUID_t, std::string, rmw_qos_profile_t>> TopicData;
typedef std::unordered_map<std::string, TopicData> TopicNameToTopicData;

/**
* Map of topic names to a vector of types that topic may use.
* Map of topic names to TopicData. Where topic data is vector of tuples containing
* participant GUID, topic type and the qos policy of the respective participant.
* Topics here are represented as one to many, DDS XTypes 1.2
* specifies application code 'generally' uses a 1-1 relationship.
* However, generic services such as logger and monitor, can discover
* multiple types on the same topic.
*
*/
TopicToTypes topic_to_types_;
TopicNameToTopicData topic_name_to_topic_data_;

/**
* Map of participant GUIDS to a set of topic-type.
*/
ParticipantTopicMap participant_to_topics_;

/**
* Helper function to initialize a topic vector.
* Helper function to initialize an empty TopicData for a topic name.
*
* @param topic_name
* @param topic_name the topic name for which the TopicNameToTopicData map should be initialised.
* @param topic_name_to_topic_data the map to initialise.
*/
void initializeTopic(const std::string & topic_name, TopicToTypes & topic_to_types)
void initializeTopicDataMap(
const std::string & topic_name,
TopicNameToTopicData & topic_name_to_topic_data)
{
if (topic_name_to_topic_data.find(topic_name) == topic_name_to_topic_data.end()) {
topic_name_to_topic_data[topic_name] = TopicData();
}
}

/**
* Helper function to initialize a topic vector.
*
* @param topic_name the topic name for which the TopicToTypes map should be initialised.
* @param topic_to_types the map to initialise.
*/
void initializeTopicTypesMap(const std::string & topic_name, TopicToTypes & topic_to_types)
{
if (topic_to_types.find(topic_name) == topic_to_types.end()) {
topic_to_types[topic_name] = std::vector<std::string>();
Expand All @@ -89,9 +108,16 @@ class TopicCache
/**
* @return a map of topic name to the vector of topic types used.
*/
const TopicToTypes & getTopicToTypes() const
const TopicToTypes getTopicToTypes() const
{
return topic_to_types_;
TopicToTypes topic_to_types;
for (const auto & it : topic_name_to_topic_data_) {
topic_to_types[it.first] = std::vector<std::string>();
for (const auto & mit : it.second) {
topic_to_types[it.first].push_back(std::get<1>(mit));
}
}
return topic_to_types;
}

/**
Expand All @@ -102,6 +128,14 @@ class TopicCache
return participant_to_topics_;
}

/**
* @return a map of topic name to a vector of GUID_t, type name and qos profile tuple.
*/
const TopicNameToTopicData & getTopicNameToTopicData() const
{
return topic_name_to_topic_data_;
}

/**
* Add a topic based on discovery.
*
Expand All @@ -110,15 +144,17 @@ class TopicCache
* @param type_name
* @return true if a change has been recorded
*/
template<class T>
bool addTopic(
const eprosima::fastrtps::rtps::InstanceHandle_t & rtpsParticipantKey,
const std::string & topic_name,
const std::string & type_name)
const std::string & type_name,
const T & dds_qos)
{
initializeTopic(topic_name, topic_to_types_);
initializeTopicDataMap(topic_name, topic_name_to_topic_data_);
auto guid = iHandle2GUID(rtpsParticipantKey);
initializeParticipantMap(participant_to_topics_, guid);
initializeTopic(topic_name, participant_to_topics_[guid]);
initializeTopicTypesMap(topic_name, participant_to_topics_[guid]);
if (rcutils_logging_logger_is_enabled_for("rmw_fastrtps_shared_cpp",
RCUTILS_LOG_SEVERITY_DEBUG))
{
Expand All @@ -129,7 +165,9 @@ class TopicCache
"Adding topic '%s' with type '%s' for node '%s'",
topic_name.c_str(), type_name.c_str(), guid_stream.str().c_str());
}
topic_to_types_[topic_name].push_back(type_name);
auto qos_profile = rmw_qos_profile_t();
dds_qos_to_rmw_qos(dds_qos, &qos_profile);
topic_name_to_topic_data_[topic_name].push_back(std::make_tuple(guid, type_name, qos_profile));
participant_to_topics_[guid][topic_name].push_back(type_name);
return true;
}
Expand All @@ -147,22 +185,26 @@ class TopicCache
const std::string & topic_name,
const std::string & type_name)
{
if (topic_to_types_.find(topic_name) == topic_to_types_.end()) {
if (topic_name_to_topic_data_.find(topic_name) == topic_name_to_topic_data_.end()) {
RCUTILS_LOG_DEBUG_NAMED(
"rmw_fastrtps_shared_cpp",
"unexpected removal on topic '%s' with type '%s'",
topic_name.c_str(), type_name.c_str());
return false;
}
auto guid = iHandle2GUID(rtpsParticipantKey);
{
auto & type_vec = topic_to_types_[topic_name];
type_vec.erase(std::find(type_vec.begin(), type_vec.end(), type_name));
auto & type_vec = topic_name_to_topic_data_[topic_name];
type_vec.erase(std::find_if(type_vec.begin(), type_vec.end(),
[type_name, guid](const auto & topic_info) {
return type_name.compare(std::get<1>(topic_info)) == 0 &&
guid == std::get<0>(topic_info);
}));
if (type_vec.empty()) {
topic_to_types_.erase(topic_name);
topic_name_to_topic_data_.erase(topic_name);
}
}

auto guid = iHandle2GUID(rtpsParticipantKey);
auto guid_topics_pair = participant_to_topics_.find(guid);
if (guid_topics_pair != participant_to_topics_.end() &&
guid_topics_pair->second.find(topic_name) != guid_topics_pair->second.end())
Expand All @@ -180,6 +222,7 @@ class TopicCache
"rmw_fastrtps_shared_cpp",
"Unable to remove topic, does not exist '%s' with type '%s'",
topic_name.c_str(), type_name.c_str());
return false;
}
return true;
}
Expand Down
Loading

0 comments on commit 8cd49bd

Please sign in to comment.