Skip to content

Commit

Permalink
Stop using group data. Use the new group cache
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
  • Loading branch information
ivanpauno committed Oct 25, 2019
1 parent 4b8d914 commit 98175eb
Show file tree
Hide file tree
Showing 28 changed files with 366 additions and 218 deletions.
4 changes: 2 additions & 2 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ add_library(rmw_fastrtps_cpp
src/subscription.cpp
src/type_support_common.cpp
src/gid__type_support.cpp
src/node_custom_info__type_support.cpp
src/participant_custom_info__type_support.cpp
src/node_entities_info__type_support.cpp
src/participant_entities_info__type_support.cpp
)
target_link_libraries(rmw_fastrtps_cpp
fastcdr fastrtps)
Expand Down
2 changes: 0 additions & 2 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ create_publisher(
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const char * node_namespace,
const char * node_name,
bool keyed,
bool create_publisher_listener);

Expand Down
2 changes: 0 additions & 2 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ create_subscription(
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
bool ignore_local_publications,
const char * node_namespace,
const char * node_name,
bool keyed,
bool create_subscription_listener);

Expand Down
17 changes: 8 additions & 9 deletions rmw_fastrtps_cpp/src/listener_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
#include "rmw/init.h"
#include "rmw/ret_types.h"
#include "rmw/rmw.h"
#include "rmw/types.h"
#include "rmw/impl/cpp/macros.hpp"

#include "rmw_dds_common/context.hpp"
#include "rmw_dds_common/gid_utils.hpp"
#include "rmw_dds_common/node_cache.hpp"
#include "rmw_dds_common/msg/participant_custom_info.hpp"
#include "rmw_dds_common/msg/participant_entities_info.hpp"

#include "rmw_fastrtps_shared_cpp/rmw_context_impl.h"

Expand Down Expand Up @@ -132,7 +132,7 @@ node_listener(rmw_context_t * context)
terminate("rmw_wait failed");
}
if (subscriptions_buffer[0]) {
rmw_dds_common::msg::ParticipantCustomInfo msg;
rmw_dds_common::msg::ParticipantEntitiesInfo msg;
bool taken;
if (RMW_RET_OK != rmw_take(
common_context->sub,
Expand All @@ -144,21 +144,20 @@ node_listener(rmw_context_t * context)
}
if (taken) {
// TODO(ivanpauno): Should the program be terminated if taken is false?
rmw_gid_t gid;
rmw_dds_common::convert_msg_to_gid(&msg.id, &gid);
if (std::strcmp(
if (std::strncmp(
reinterpret_cast<char *>(common_context->gid.data),
reinterpret_cast<char *>(gid.data)) == 0)
reinterpret_cast<char *>(&msg.gid.data),
RMW_GID_STORAGE_SIZE) == 0)
{
// ignore local messages
continue;
}
common_context->node_cache.update_node_names(gid, msg.nodes_info);
common_context->graph_cache.update_participant_entities(msg);
if (rcutils_logging_logger_is_enabled_for("rmw_dds_common",
RCUTILS_LOG_SEVERITY_DEBUG))
{
std::ostringstream ss;
ss << common_context->node_cache;
ss << common_context->graph_cache;
RCUTILS_LOG_DEBUG_NAMED("rmw_fastrtps_cpp", "%s", ss.str().c_str());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "rosidl_generator_c/message_type_support_struct.h"
// rosidl_typesupport_cpp/message_type_support.hpp is installed by rosidl_generator_cpp
#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "rmw_dds_common/msg/node_custom_info__struct.hpp"
#include "rmw_dds_common/msg/node_custom_info__rosidl_typesupport_fastrtps_cpp.hpp"
#include "rmw_dds_common/msg/node_entities_info__struct.hpp"
#include "rmw_dds_common/msg/node_entities_info__rosidl_typesupport_fastrtps_cpp.hpp"
#include "rmw_fastrtps_cpp/visibility_control.h"

namespace rosidl_typesupport_cpp
Expand All @@ -27,10 +27,10 @@ namespace rosidl_typesupport_cpp
template<>
RMW_FASTRTPS_CPP_PUBLIC
const rosidl_message_type_support_t *
get_message_type_support_handle<rmw_dds_common::msg::NodeCustomInfo>()
get_message_type_support_handle<rmw_dds_common::msg::NodeEntitiesInfo>()
{
return ROSIDL_TYPESUPPORT_INTERFACE__MESSAGE_SYMBOL_NAME(
rosidl_typesupport_fastrtps_cpp, rmw_dds_common, msg, NodeCustomInfo)();
rosidl_typesupport_fastrtps_cpp, rmw_dds_common, msg, NodeEntitiesInfo)();
}

#ifdef __cplusplus
Expand All @@ -41,9 +41,9 @@ extern "C"
RMW_FASTRTPS_CPP_PUBLIC
const rosidl_message_type_support_t *
ROSIDL_TYPESUPPORT_INTERFACE__MESSAGE_SYMBOL_NAME(
rosidl_typesupport_cpp, rmw_dds_common, msg, NodeCustomInfo)()
rosidl_typesupport_cpp, rmw_dds_common, msg, NodeEntitiesInfo)()
{
return get_message_type_support_handle<rmw_dds_common::msg::NodeCustomInfo>();
return get_message_type_support_handle<rmw_dds_common::msg::NodeEntitiesInfo>();
}

#ifdef __cplusplus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "rosidl_generator_c/message_type_support_struct.h"
// rosidl_typesupport_cpp/message_type_support.hpp is installed by rosidl_generator_cpp
#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "rmw_dds_common/msg/participant_custom_info__struct.hpp"
#include "rmw_dds_common/msg/participant_custom_info__rosidl_typesupport_fastrtps_cpp.hpp"
#include "rmw_dds_common/msg/participant_entities_info__struct.hpp"
#include "rmw_dds_common/msg/participant_entities_info__rosidl_typesupport_fastrtps_cpp.hpp"
#include "rmw_fastrtps_cpp/visibility_control.h"

namespace rosidl_typesupport_cpp
Expand All @@ -27,10 +27,10 @@ namespace rosidl_typesupport_cpp
template<>
RMW_FASTRTPS_CPP_PUBLIC
const rosidl_message_type_support_t *
get_message_type_support_handle<rmw_dds_common::msg::ParticipantCustomInfo>()
get_message_type_support_handle<rmw_dds_common::msg::ParticipantEntitiesInfo>()
{
return ROSIDL_TYPESUPPORT_INTERFACE__MESSAGE_SYMBOL_NAME(
rosidl_typesupport_fastrtps_cpp, rmw_dds_common, msg, ParticipantCustomInfo)();
rosidl_typesupport_fastrtps_cpp, rmw_dds_common, msg, ParticipantEntitiesInfo)();
}

#ifdef __cplusplus
Expand All @@ -41,9 +41,9 @@ extern "C"
RMW_FASTRTPS_CPP_PUBLIC
const rosidl_message_type_support_t *
ROSIDL_TYPESUPPORT_INTERFACE__MESSAGE_SYMBOL_NAME(
rosidl_typesupport_cpp, rmw_dds_common, msg, ParticipantCustomInfo)()
rosidl_typesupport_cpp, rmw_dds_common, msg, ParticipantEntitiesInfo)()
{
return get_message_type_support_handle<rmw_dds_common::msg::ParticipantCustomInfo>();
return get_message_type_support_handle<rmw_dds_common::msg::ParticipantEntitiesInfo>();
}

#ifdef __cplusplus
Expand Down
12 changes: 0 additions & 12 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ rmw_fastrtps_cpp::create_publisher(
const rosidl_message_type_support_t * type_supports,
const char * topic_name,
const rmw_qos_profile_t * qos_policies,
const char * node_namespace,
const char * node_name,
bool keyed,
bool create_publisher_listener)
{
Expand Down Expand Up @@ -107,16 +105,6 @@ rmw_fastrtps_cpp::create_publisher(
_register_type(participant_info->participant, info->type_support_);
}

if (node_name || node_namespace) {
if (RMW_RET_OK != get_group_data_qos(
node_name,
node_namespace,
publisherParam.qos.m_groupData))
{
goto fail;
}
}

if (!participant_info->leave_middleware_default_qos) {
publisherParam.qos.m_publishMode.kind = eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE;
publisherParam.historyMemoryPolicy =
Expand Down
40 changes: 40 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ rmw_create_client(
return nullptr;
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto participant_info =
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info);
if (!participant_info) {
Expand Down Expand Up @@ -212,15 +213,54 @@ rmw_create_client(
}
memcpy(const_cast<char *>(rmw_client->service_name), service_name, strlen(service_name) + 1);

{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_publisher_->getGuid());
common_context->graph_cache.associate_writer(
gid,
common_context->gid,
node->name,
node->namespace_);
gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_subscriber_->getGuid());
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_reader(
gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
goto fail;
}
}

return rmw_client;

fail:
if (info != nullptr) {
if (info->request_publisher_ != nullptr) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->request_publisher_->getGuid());
common_context->graph_cache.deassociate_writer(
gid,
common_context->gid,
node->name,
node->namespace_);
Domain::removePublisher(info->request_publisher_);
}

if (info->response_subscriber_ != nullptr) {
rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->response_subscriber_->getGuid());
common_context->graph_cache.deassociate_reader(
gid,
common_context->gid,
node->name,
node->namespace_);
Domain::removeSubscriber(info->response_subscriber_);
}

Expand Down
16 changes: 7 additions & 9 deletions rmw_fastrtps_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "rmw/rmw.h"

#include "rmw_dds_common/context.hpp"
#include "rmw_dds_common/msg/participant_custom_info.hpp"
#include "rmw_dds_common/msg/participant_entities_info.hpp"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/participant.hpp"
Expand Down Expand Up @@ -87,7 +87,7 @@ rmw_init_options_fini(rmw_init_options_t * init_options)
return RMW_RET_OK;
}

using rmw_dds_common::msg::ParticipantCustomInfo;
using rmw_dds_common::msg::ParticipantEntitiesInfo;

rmw_ret_t
rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
Expand Down Expand Up @@ -146,11 +146,9 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)

publisher = rmw_fastrtps_cpp::create_publisher(
participant_info,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantCustomInfo>(),
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"_participant_info",
&qos,
nullptr, // node namespace
nullptr, // node name
false, // our fastrtps typesupport doesn't support keyed topics
true); // don't create a publisher listener
if (nullptr == publisher) {
Expand All @@ -160,12 +158,10 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
// using same qos for the subscription
subscription = rmw_fastrtps_cpp::create_subscription(
participant_info,
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantCustomInfo>(),
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
"_participant_info",
&qos,
true, // ignore_local_publications, currently not implemented
nullptr, // node namespace
nullptr, // node name
false, // our fastrtps typesupport doesn't support keyed topics
true); // don't create a subscriber listener
if (nullptr == subscription) {
Expand All @@ -176,7 +172,9 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
eprosima_fastrtps_identifier, participant_info->participant->getGuid());
common_context->pub = publisher;
common_context->sub = subscription;
common_context->node_cache.add_gid(common_context->gid);
// This is not more needed.
// node_cache.add_gid(common_context->gid);
common_context->graph_cache.add_participant(common_context->gid);

ret = rmw_fastrtps_cpp::run_listener_thread(context);
if (RMW_RET_OK != ret) {
Expand Down
31 changes: 28 additions & 3 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include "rmw_fastrtps_cpp/publisher.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"

#include "rmw_dds_common/context.hpp"
#include "rmw_dds_common/msg/participant_entities_info.hpp"

#include "type_support_common.hpp"

using Domain = eprosima::fastrtps::Domain;
Expand Down Expand Up @@ -77,15 +80,37 @@ rmw_create_publisher(
return nullptr;
}

return rmw_fastrtps_cpp::create_publisher(
rmw_publisher_t * publisher = rmw_fastrtps_cpp::create_publisher(
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info),
type_supports,
topic_name,
qos_policies,
node->namespace_,
node->name,
false,
true);
if (!publisher) {
return nullptr;
}

auto common_context = static_cast<rmw_dds_common::Context *>(node->context->impl->common);
auto info = static_cast<const CustomPublisherInfo *>(publisher->data);
{
// Update graph
std::lock_guard<std::mutex> guard(common_context->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common_context->graph_cache.associate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_);
rmw_ret_t rmw_ret = rmw_fastrtps_shared_cpp::__rmw_publish(
eprosima_fastrtps_identifier,
common_context->pub,
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
rmw_fastrtps_shared_cpp::__rmw_destroy_publisher(
eprosima_fastrtps_identifier, node, publisher);
return nullptr;
}
}
return publisher;
}

rmw_ret_t
Expand Down
Loading

0 comments on commit 98175eb

Please sign in to comment.