diff --git a/rmw_connextdds_common/include/rmw_connextdds/context.hpp b/rmw_connextdds_common/include/rmw_connextdds/context.hpp index 3cc22bef..2cbb2cf6 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/context.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/context.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "rmw_connextdds/dds_api.hpp" #include "rmw_connextdds/log.hpp" @@ -78,8 +79,11 @@ struct rmw_context_impl_s DDS_DataReader * dr_publications; DDS_DataReader * dr_subscriptions; - /* Keep track of whether the DomainParticipant is localhost only */ - bool localhost_only; + /* Keep track of what discovery settings were used when initializing */ + rmw_discovery_options_t * discovery_options; + + /* Manage the memory of the domain tag */ + char * domain_tag; /* Global configuration for QoS profiles */ std::string qos_ctx_name; @@ -160,7 +164,8 @@ struct rmw_context_impl_s dr_participants(nullptr), dr_publications(nullptr), dr_subscriptions(nullptr), - localhost_only(base->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED) + discovery_options(nullptr), + domain_tag(nullptr) { /* destructor relies on these being initialized properly */ common.thread_is_running.store(false); @@ -180,9 +185,7 @@ struct rmw_context_impl_s // node_count is increased rmw_ret_t initialize_node( - const char * const node_name, - const char * const node_namespace, - const bool localhost_only); + const rmw_discovery_options_t * const discovery_options); // Destroys the participant, when node_count reaches 0. rmw_ret_t @@ -190,7 +193,7 @@ struct rmw_context_impl_s // Initialize the DomainParticipant associated with the context. rmw_ret_t - initialize_participant(const bool localhost_only); + initialize_participant(); // Enable the DomainParticipant associated with the context. rmw_ret_t diff --git a/rmw_connextdds_common/src/common/rmw_context.cpp b/rmw_connextdds_common/src/common/rmw_context.cpp index 4ca175f7..02aeea28 100644 --- a/rmw_connextdds_common/src/common/rmw_context.cpp +++ b/rmw_connextdds_common/src/common/rmw_context.cpp @@ -25,6 +25,8 @@ #include "rcutils/env.h" #include "rcutils/filesystem.h" +#include "rcutils/process.h" +#include "rcutils/snprintf.h" /****************************************************************************** * Global reference to the Domain Participant Factory. @@ -74,6 +76,224 @@ rmw_connextdds_initialize_participant_factory_qos( return RMW_RET_OK; } +static +rmw_ret_t rmw_connextdds_extend_initial_peer_list( + const rmw_peer_address_t * const static_peers, + const size_t static_peer_count, + struct DDS_StringSeq * const out) +{ + if (static_peer_count == 0) { + return RMW_RET_OK; + } + + if (static_peers == nullptr) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "received nullptr static_peers but a static_peer_count of %lu", + static_peer_count); + return RMW_RET_ERROR; + } + + const auto initial_length = DDS_StringSeq_get_length(out); + const DDS_Long new_seq_length = static_cast(initial_length + static_peer_count); + if (!DDS_StringSeq_ensure_length(out, new_seq_length, new_seq_length)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to resize string sequence") + return RMW_RET_ERROR; + } + for (size_t s = 0; s < static_peer_count; ++s) { + const DDS_Long index = static_cast(initial_length + s); + const char * peer = static_peers[s].peer_address; + char ** const element_ref = DDS_StringSeq_get_reference(out, index); + RMW_CONNEXT_ASSERT(nullptr != element_ref); + if (nullptr != *element_ref) { + /* If some strings are still hanging around this dead space in the seq, + then free their memory. */ + DDS_String_free(*element_ref); + } + *element_ref = DDS_String_dup(peer); + if (nullptr == *element_ref) { + RMW_CONNEXT_LOG_ERROR_A_SET("failed to duplicate peer string: %s", peer); + return RMW_RET_ERROR; + } + + RMW_CONNEXT_LOG_TRACE_A( + "inserted static peer: i=%d, peer='%s'", + index, peer); + } + + return RMW_RET_OK; +} + +static +rmw_ret_t +rmw_connextdds_initialize_discovery_options( + rmw_context_impl_t * const ctx, + DDS_DomainParticipantQos & dp_qos) +{ + const auto range = ctx->discovery_options->automatic_discovery_range; + switch (range) { + case RMW_AUTOMATIC_DISCOVERY_RANGE_SYSTEM_DEFAULT: + case RMW_AUTOMATIC_DISCOVERY_RANGE_SUBNET: + /* No action needed. This is the default discovery behavior for DDS */ + break; + case RMW_AUTOMATIC_DISCOVERY_RANGE_NOT_SET: + RMW_CONNEXT_LOG_ERROR_SET( + "automatic_discovery_range unexpectedly RMW_AUTOMATIC_DISCOVERY_RANGE_NOT_SET"); + return RMW_RET_ERROR; + case RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST: + case RMW_AUTOMATIC_DISCOVERY_RANGE_OFF: + /*Limit the UDPv4 transport to use only unicast interfaces, by setting + the list of multicast interfaces to a "localhost" value. + Note: We allow the LOCALHOST interface for the OFF range + because if we leave this property completely blank then it + has the opposite effect and allows all interfaces to be used. + Allowing only LOCALHOST at least minimizes the unnecessary + discovery traffic and prevents discovery with other host + machines, while the domain_tag protects against same-host + connections. */ + if (DDS_RETCODE_OK != DDS_PropertyQosPolicyHelper_assert_property( + &dp_qos.property, + "dds.transport.UDPv4.builtin.parent.allow_multicast_interfaces_list", + RMW_CONNEXT_LOCALHOST_ONLY_ADDRESS, + DDS_BOOLEAN_FALSE /* propagate */)) + { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to assert property on participant: %s", + "dds.transport.UDPv4.builtin.parent.allow_multicast_interfaces_list") + return RMW_RET_ERROR; + } + break; + default: + RMW_CONNEXT_LOG_ERROR_A_SET( + "Unknown value provided for automatic discovery range: %i", + range); + return RMW_RET_ERROR; + } + + if (RMW_AUTOMATIC_DISCOVERY_RANGE_OFF == range) { + // When discovery rage is RMW_AUTOMATIC_DISCOVERY_RANGE_OFF, + // prevent the participant from discovery anyone by setting an empty + // initial peers list and disabling "accept_unknown_peers". + // Also, assign a host-wide unique domain tag to the participant to + // prevent discovery with other local participant (e.g. through shared + // memory transport). + const DDS_Long ros_peers = DDS_StringSeq_get_length(&ctx->initial_peers); + const DDS_Long qos_peers = DDS_StringSeq_get_length(&dp_qos.discovery.initial_peers); + dp_qos.discovery.accept_unknown_peers = DDS_BOOLEAN_FALSE; + if (ros_peers > 0) { + RMW_CONNEXT_LOG_WARNING_A( + "requested %d initial peers using %s, but discovery range is off", + ros_peers, + RMW_CONNEXT_ENV_INITIAL_PEERS); + if (!DDS_StringSeq_ensure_length(&ctx->initial_peers, 0, 0)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to clear initial peers list") + return RMW_RET_ERROR; + } + } + if (qos_peers > 0) { + RMW_CONNEXT_LOG_WARNING_A( + "requested %d initial peers from DomainParticipantQos, but discovery range is off", + qos_peers); + if (!DDS_StringSeq_ensure_length(&ctx->initial_peers, 0, 0)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to clear initial peers list") + return RMW_RET_ERROR; + } + } + /* See earlier note about why we allow LOCALHOST interface for + OFF range. */ + if (DDS_RETCODE_OK != DDS_PropertyQosPolicyHelper_assert_property( + &dp_qos.property, + "dds.transport.UDPv4.builtin.parent.allow_interfaces_list", + RMW_CONNEXT_LOCALHOST_ONLY_ADDRESS, + DDS_BOOLEAN_FALSE /* propagate */)) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to assert property on participant: " + "dds.transport.UDPv4.builtin.parent.allow_interfaces_list"); + return RMW_RET_ERROR; + } + + /* Give this participant its own unique domain tag to prevent + unicast discovery from happening. */ + if (!ctx->domain_tag) { + const auto pid = rcutils_get_pid(); + static const char * format_string = "ros_discovery_off_%d"; + const int bytes_needed = rcutils_snprintf(nullptr, 0, format_string, pid); + ctx->domain_tag = DDS_String_alloc(bytes_needed); + if (nullptr == ctx->domain_tag) { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate domain tag string"); + return RMW_RET_BAD_ALLOC; + } + if (rcutils_snprintf(ctx->domain_tag, bytes_needed + 1, format_string, pid) < 0) { + RMW_CONNEXT_LOG_ERROR_SET("failed to format ros discovery off information into domain tag"); + return RMW_RET_ERROR; + } + } + if (DDS_RETCODE_OK != DDS_PropertyQosPolicyHelper_assert_property( + &dp_qos.property, + "dds.domain_participant.domain_tag", + ctx->domain_tag, + DDS_BOOLEAN_FALSE)) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to assert property on participant: " + "dds.domain_participant.domain_tag"); + return RMW_RET_ERROR; + } + } else if ( // NOLINT + RMW_AUTOMATIC_DISCOVERY_RANGE_SYSTEM_DEFAULT != + ctx->discovery_options->automatic_discovery_range) + { + // For any other discovery range, copy the list of static peers to so that + // it will be later copied to DomainParticipantQos::discovery::initial_peers. + dp_qos.discovery.accept_unknown_peers = DDS_BOOLEAN_TRUE; + const auto rc = rmw_connextdds_extend_initial_peer_list( + ctx->discovery_options->static_peers, + ctx->discovery_options->static_peers_count, + &ctx->initial_peers); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR( + "failed to extend initial peers with the static peers"); + return rc; + } + /* Support for ROS_AUTOMATIC_DISCOVERY_RANGE == LOCALHOST + ----------------------------------------- + */ + if (RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST == + ctx->discovery_options->automatic_discovery_range) + { + // Make sure that the participant is not listening on any multicast address. + if (!DDS_StringSeq_ensure_length( + &dp_qos.discovery.multicast_receive_addresses, 0, 0)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to resize multicast_receive_addresses") + return RMW_RET_ERROR; + } + // Increase maximum participant ID + // This controls the number of participants that can be discovered on a single host, + // which is roughly equivalent to the number of ROS 2 processes. + // If it's too small then we won't connect to all participants. + // If it's too large then we will send a lot of announcement traffic. + // The default number here is picked arbitrarily. + const rmw_peer_address_t localhost_peers[2] = { + "32@builtin.udpv4://127.0.0.1", + "32@builtin.shmem://", + }; + const auto rc2 = rmw_connextdds_extend_initial_peer_list( + localhost_peers, + 2, + &ctx->initial_peers); + if (RMW_RET_OK != rc2) { + RMW_CONNEXT_LOG_ERROR( + "failed to extend initial peers with the static peers"); + return rc2; + } + } + } + + return RMW_RET_OK; +} + + static rmw_ret_t rmw_connextdds_initialize_participant_qos( @@ -94,14 +314,25 @@ rmw_connextdds_initialize_participant_qos( return RMW_RET_ERROR; } - if (ctx->participant_qos_override_policy == - rmw_context_impl_t::participant_qos_override_policy_t::All && - DDS_StringSeq_get_length(&ctx->initial_peers) > 0) - { - if (!DDS_StringSeq_copy(&dp_qos.discovery.initial_peers, &ctx->initial_peers)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to copy initial peers sequence") - return RMW_RET_ERROR; - } + switch (ctx->participant_qos_override_policy) { + case rmw_context_impl_t::participant_qos_override_policy_t::All: + case rmw_context_impl_t::participant_qos_override_policy_t::Basic: + if (nullptr != ctx->discovery_options) { + const auto rc = rmw_connextdds_initialize_discovery_options(ctx, dp_qos); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR("failed to initialize discovery options") + return RMW_RET_ERROR; + } + } + if (DDS_StringSeq_get_length(&ctx->initial_peers) > 0 && + !DDS_StringSeq_copy(&dp_qos.discovery.initial_peers, &ctx->initial_peers)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to copy initial peers sequence") + return RMW_RET_ERROR; + } + break; + default: + break; } return RMW_RET_OK; @@ -109,61 +340,73 @@ rmw_connextdds_initialize_participant_qos( rmw_ret_t rmw_context_impl_t::initialize_node( - const char * const node_name, - const char * const node_namespace, - const bool localhost_only) + const rmw_discovery_options_t * const discovery_options_in) { - UNUSED_ARG(node_name); - UNUSED_ARG(node_namespace); - - RMW_CONNEXT_LOG_DEBUG_A( - "initializing new node: total=%lu, localhost=%d", - this->node_count, localhost_only) - - if (0u != this->node_count) { - if ((this->localhost_only && !localhost_only) || - (!this->localhost_only && localhost_only)) + if (this->node_count > 0) { + bool params_equal = false; + if (rmw_discovery_options_equal( + this->discovery_options, discovery_options_in, ¶ms_equal) != RMW_RET_OK) { - RMW_CONNEXT_LOG_ERROR_A_SET( - "incompatible node for context:" - "ctx.localhost_only=%d, node.localhost_only=%d", - this->localhost_only, localhost_only) - return RMW_RET_ERROR; + RMW_CONNEXT_LOG_ERROR_SET("invalid discovery params argument"); + return RMW_RET_INVALID_ARGUMENT; } - this->node_count += 1; - RMW_CONNEXT_LOG_DEBUG_A( - "initialized new node: total=%lu", this->node_count) - return RMW_RET_OK; - } + if (!params_equal) { + RMW_CONNEXT_LOG_ERROR_SET( + "node is being initialized with incompatible discovery parameters"); + return RMW_RET_ERROR; + } + } else { + if (nullptr != discovery_options_in) { + RMW_CONNEXT_ASSERT(nullptr == this->discovery_options) + this->discovery_options = static_cast( + this->base->options.allocator.allocate( + sizeof(rmw_discovery_options_t), + this->base->options.allocator.state)); + if (nullptr == this->discovery_options) { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate discovery options") + return RMW_RET_BAD_ALLOC; + } + *this->discovery_options = rmw_get_zero_initialized_discovery_options(); + const rmw_ret_t rc = rmw_discovery_options_copy( + discovery_options_in, + &this->base->options.allocator, + this->discovery_options); + if (rc != RMW_RET_OK) { + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to copy discovery parameters: %s", + prev_error_string.str); + return rc; + } + } - rmw_ret_t rc = this->initialize_participant(localhost_only); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to initialize DomainParticipant") - return rc; - } + rmw_ret_t rc = this->initialize_participant(); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR("failed to initialize DomainParticipant") + return rc; + } - rc = this->enable_participant(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable DomainParticipant") - if (RMW_RET_OK != this->finalize_participant()) { - RMW_CONNEXT_LOG_ERROR("failed to finalize participant on error") + rc = this->enable_participant(); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR("failed to enable DomainParticipant") + if (RMW_RET_OK != this->finalize_participant()) { + RMW_CONNEXT_LOG_ERROR("failed to finalize participant on error") + } + return rc; } - return rc; } - this->node_count = 1; - + this->node_count += 1; + RMW_CONNEXT_LOG_DEBUG_A("initialized new node: total=%lu", this->node_count); return RMW_RET_OK; } rmw_ret_t -rmw_context_impl_t::initialize_participant(const bool localhost_only) +rmw_context_impl_t::initialize_participant() { RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipant") - this->localhost_only = localhost_only; - if (nullptr == RMW_Connext_gv_DomainParticipantFactory) { RMW_CONNEXT_LOG_ERROR("DDS DomainParticipantFactory not initialized") return RMW_RET_ERROR; @@ -435,6 +678,21 @@ rmw_context_impl_t::finalize() { rmw_ret_t rc_exit = RMW_RET_OK; + if (nullptr != this->discovery_options) { + const auto rc = rmw_discovery_options_fini( + this->discovery_options); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR("failed to deallocate discovery options"); + rc_exit = RMW_RET_ERROR; + } + this->discovery_options = nullptr; + } + + if (nullptr != this->domain_tag) { + DDS_String_free(this->domain_tag); + this->domain_tag = nullptr; + } + RMW_CONNEXT_LOG_DEBUG_A( "finalizing RMW context: %p", reinterpret_cast(this)) @@ -596,7 +854,8 @@ rmw_api_connextdds_init_options_init( init_options->domain_id = RMW_DEFAULT_DOMAIN_ID; init_options->enclave = nullptr; init_options->security_options = rmw_get_zero_initialized_security_options(); - return RMW_RET_OK; + init_options->discovery_options = rmw_get_zero_initialized_discovery_options(); + return rmw_discovery_options_init(&(init_options->discovery_options), 0, &allocator); } @@ -1023,6 +1282,7 @@ rmw_api_connextdds_init( } if (nullptr == RMW_Connext_gv_DomainParticipantFactory) { + RMW_CONNEXT_ASSERT(1 == RMW_Connext_gv_ContextCount) RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipantFactory") if (RMW_RET_OK != @@ -1050,7 +1310,6 @@ rmw_api_connextdds_init( RMW_CONNEXT_LOG_DEBUG("DDS DomainParticipantFactory initialized") } RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_gv_DomainParticipantFactory) - RMW_CONNEXT_ASSERT(1 == RMW_Connext_gv_ContextCount) scope_exit_context_finalize.cancel(); scope_exit_context_opts_finalize.cancel(); diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index d81ddf54..e309b45c 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -143,7 +143,7 @@ rmw_connextdds_parse_string_list( input_i += 2, next_i_start = input_i) { - // determine token's lenght by finding a delimiter (or end of input) + // determine token's length by finding a delimiter (or end of input) for (; input_i + 1 < input_len && delimiter != list[input_i + 1]; input_i += 1) @@ -181,7 +181,7 @@ rmw_connextdds_parse_string_list( DDS_String_free(*el_ref); } *el_ref = DDS_String_alloc(next_len); - if (nullptr == el_ref) { + if (nullptr == *el_ref) { RMW_CONNEXT_LOG_ERROR_SET("failed to allocate string") return RMW_RET_ERROR; } diff --git a/rmw_connextdds_common/src/common/rmw_node.cpp b/rmw_connextdds_common/src/common/rmw_node.cpp index 86f57735..1422551c 100644 --- a/rmw_connextdds_common/src/common/rmw_node.cpp +++ b/rmw_connextdds_common/src/common/rmw_node.cpp @@ -40,12 +40,9 @@ rmw_api_connextdds_create_node( "expected initialized context", return nullptr); - bool node_localhost_only = - context->options.localhost_only == RMW_LOCALHOST_ONLY_ENABLED; - RMW_CONNEXT_LOG_DEBUG_A( - "creating new node: name=%s, ns=%s, localhost_only=%d", - name, ns, node_localhost_only) + "creating new node: name=%s, ns=%s", + name, ns) rmw_context_impl_t * ctx = context->impl; std::lock_guard guard(ctx->initialization_mutex); @@ -83,7 +80,7 @@ rmw_api_connextdds_create_node( return nullptr; } - ret = ctx->initialize_node(ns, name, node_localhost_only); + ret = ctx->initialize_node(&context->options.discovery_options); if (RMW_RET_OK != ret) { RMW_CONNEXT_LOG_ERROR("failed to initialize node in context") return nullptr; diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index d8a21eee..9fd69360 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include "rmw/impl/cpp/key_value.hpp" #include "rmw_connextdds/custom_sql_filter.hpp" @@ -202,23 +203,6 @@ rmw_connextdds_initialize_participant_qos_impl( case rmw_context_impl_t::participant_qos_override_policy_t::All: case rmw_context_impl_t::participant_qos_override_policy_t::Basic: { - // Parse and apply QoS parameters derived from ROS 2 configuration options. - - if (ctx->localhost_only) { - if (DDS_RETCODE_OK != - DDS_PropertyQosPolicyHelper_assert_property( - &dp_qos->property, - "dds.transport.UDPv4.builtin.parent.allow_interfaces", - RMW_CONNEXT_LOCALHOST_ONLY_ADDRESS, - DDS_BOOLEAN_FALSE /* propagate */)) - { - RMW_CONNEXT_LOG_ERROR_A_SET( - "failed to assert property on participant: %s", - "dds.transport.UDPv4.builtin.parent.allow_interfaces") - return RMW_RET_ERROR; - } - } - const size_t user_data_len_in = DDS_OctetSeq_get_length(&dp_qos->user_data.value);