Skip to content

Commit

Permalink
add participant listener
Browse files Browse the repository at this point in the history
  • Loading branch information
dirk-thomas committed Mar 23, 2018
1 parent 4ba2761 commit 252bee5
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,82 @@
#ifndef RMW_FASTRTPS_CPP__CUSTOM_PARTICIPANT_INFO_HPP_
#define RMW_FASTRTPS_CPP__CUSTOM_PARTICIPANT_INFO_HPP_

#include <map>
#include <string>
#include <vector>

#include "fastrtps/attributes/ParticipantAttributes.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/participant/ParticipantListener.h"

#include "rmw/impl/cpp/key_value.hpp"
#include "rmw/rmw.h"

#include "rmw_fastrtps_cpp/reader_info.hpp"
#include "rmw_fastrtps_cpp/writer_info.hpp"

class ParticipantListener;

typedef struct CustomParticipantInfo
{
eprosima::fastrtps::Participant * participant;
::ParticipantListener * listener;
ReaderInfo * secondarySubListener;
WriterInfo * secondaryPubListener;
rmw_guard_condition_t * graph_guard_condition;
} CustomParticipantInfo;

class ParticipantListener : public eprosima::fastrtps::ParticipantListener
{
public:
void onParticipantDiscovery(Participant *, ParticipantDiscoveryInfo info) override
{
if (
info.rtps.m_status != DISCOVERED_RTPSPARTICIPANT &&
info.rtps.m_status != REMOVED_RTPSPARTICIPANT &&
info.rtps.m_status != DROPPED_RTPSPARTICIPANT)
{
return;
}

if (DISCOVERED_RTPSPARTICIPANT == info.rtps.m_status) {
// ignore already known GUIDs
if (discovered_names.find(info.rtps.m_guid) == discovered_names.end()) {
auto map = rmw::impl::cpp::parse_key_value(info.rtps.m_userData);
auto found = map.find("name");
std::string name;
if (found != map.end()) {
name = std::string(found->second.begin(), found->second.end());
}
if (name.empty()) {
// use participant name if no name was found in the user data
name = info.rtps.m_RTPSParticipantName;
}
// ignore discovered participants without a name
if (!name.empty()) {
discovered_names[info.rtps.m_guid] = name;
}
}
} else {
auto it = discovered_names.find(info.rtps.m_guid);
// only consider known GUIDs
if (it != discovered_names.end()) {
discovered_names.erase(it);
}
}
}

std::vector<std::string> get_discovered_names() const
{
std::vector<std::string> names(discovered_names.size());
size_t i = 0;
for (auto it : discovered_names) {
names[i++] = it.second;
}
return names;
}

std::map<GUID_t, std::string> discovered_names;
};

#endif // RMW_FASTRTPS_CPP__CUSTOM_PARTICIPANT_INFO_HPP_
19 changes: 16 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,23 @@ create_node(
}

// Declare everything before beginning to create things.
::ParticipantListener * listener = nullptr;
Participant * participant = nullptr;
rmw_guard_condition_t * graph_guard_condition = nullptr;
CustomParticipantInfo * node_impl = nullptr;
rmw_node_t * node_handle = nullptr;
ReaderInfo * tnat_1 = nullptr;
WriterInfo * tnat_2 = nullptr;
std::pair<StatefulReader *, StatefulReader *> edp_readers;

Participant * participant = Domain::createParticipant(participantAttrs);
try {
listener = new ::ParticipantListener();
} catch (std::bad_alloc &) {
RMW_SET_ERROR_MSG("failed to allocate participant listener");
goto fail;
}

participant = Domain::createParticipant(participantAttrs, listener);
if (!participant) {
RMW_SET_ERROR_MSG("create_node() could not create participant");
return nullptr;
Expand All @@ -98,6 +107,7 @@ create_node(
}
node_handle->implementation_identifier = eprosima_fastrtps_identifier;
node_impl->participant = participant;
node_impl->listener = listener;
node_impl->graph_guard_condition = graph_guard_condition;
node_handle->data = node_impl;

Expand Down Expand Up @@ -160,6 +170,7 @@ create_node(
"failed to destroy guard condition during error handling")
}
}
rmw_free(listener);
if (participant) {
Domain::removeParticipant(participant);
}
Expand Down Expand Up @@ -317,10 +328,12 @@ rmw_destroy_node(rmw_node_t * node)
result_ret = RMW_RET_ERROR;
}

delete impl;

Domain::removeParticipant(participant);

delete impl->listener;
impl->listener = nullptr;
delete impl;

return result_ret;
}

Expand Down
13 changes: 8 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_node_names.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,21 @@ rmw_get_node_names(
}

auto impl = static_cast<CustomParticipantInfo *>(node->data);
Participant * participant = impl->participant;
auto participant_names = impl->listener->get_discovered_names();

auto participant_names = participant->getParticipantNames();
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcutils_ret_t rcutils_ret =
rcutils_string_array_init(node_names, participant_names.size(), &allocator);
rcutils_string_array_init(node_names, participant_names.size() + 1, &allocator);
if (rcutils_ret != RCUTILS_RET_OK) {
RMW_SET_ERROR_MSG(rcutils_get_error_string_safe())
return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret);
}
for (size_t i = 0; i < participant_names.size(); ++i) {
node_names->data[i] = rcutils_strdup(participant_names[i].c_str(), allocator);
for (size_t i = 0; i < participant_names.size() + 1; ++i) {
if (0 == i) {
node_names->data[i] = rcutils_strdup(node->name, allocator);
} else {
node_names->data[i] = rcutils_strdup(participant_names[i - 1].c_str(), allocator);
}
if (!node_names->data[i]) {
RMW_SET_ERROR_MSG("failed to allocate memory for node name")
rcutils_ret = rcutils_string_array_fini(node_names);
Expand Down

0 comments on commit 252bee5

Please sign in to comment.