Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rmw_connextdds] New RMW discovery options #108

Merged
merged 27 commits into from
Apr 8, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e59a43d
Use the new discovery params
mxgrey Mar 8, 2023
43002e6
Update to latest rmw API
mxgrey Mar 9, 2023
e2ff364
Fix typo
mxgrey Mar 9, 2023
4738aba
Fix memory management
mxgrey Mar 10, 2023
d550c7d
Merge branch 'rolling' into discovery-peers-specification
sloretz Mar 24, 2023
dedf94b
Make sure NDDS_DISCOVERY_PEERS is not empty when ROS_AUTOMATIC_DISCOV…
asorbini Mar 30, 2023
5ae99e9
fix up finalize logic
wjwwood Mar 30, 2023
77768a8
typo
wjwwood Mar 30, 2023
2d52d8f
check the return code of ensure_length()
wjwwood Mar 30, 2023
9aef6d1
fixup use of memcpy when copying a string
wjwwood Mar 30, 2023
a492c17
improve formatting of domain_tag string
wjwwood Mar 30, 2023
43e6382
use c++17 [[fallthrough]] attribute
wjwwood Mar 30, 2023
b4c9e9c
change style of switch statement
wjwwood Mar 30, 2023
2d32141
undo change to request c++17 vs 14 since we're not using fallthrough
wjwwood Mar 30, 2023
d5c59dc
Handle all discovery options in rmw_context.cpp
asorbini Mar 31, 2023
cf370ca
Reset multicast_receive_addresses instead of setting NDDS_DISCOVERY_P…
asorbini Mar 31, 2023
84238ad
error when range is NOT_SET
wjwwood Mar 31, 2023
84db854
fixup review comments
wjwwood Mar 31, 2023
4bafd73
fix a zero initialization issue
wjwwood Mar 31, 2023
e3a5c8f
remove redundant break statement
wjwwood Mar 31, 2023
8a4fbfd
Init discovery_options when initializing node options
sloretz Apr 1, 2023
5c56565
Set maximum participant ID to 32 on localhost
sloretz Apr 4, 2023
c501a76
Skipping adding peers when automatic_discovery_range is SYSTEM_DEFAULT
sloretz Apr 4, 2023
b74dda7
Merge branch 'rolling' into discovery-peers-specification
sloretz Apr 5, 2023
a054497
Try solving los of precision warning on Windows
sloretz Apr 6, 2023
ef89f78
Fix windows warning
sloretz Apr 7, 2023
429b63d
Merge branch 'rolling' into discovery-peers-specification
sloretz Apr 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 121 additions & 78 deletions rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,23 @@ rmw_connextdds_initialize_participant_qos(
return RMW_RET_ERROR;
}

if (ctx->participant_qos_override_policy ==
rmw_context_impl_t::participant_qos_override_policy_t::All &&
DDS_StringSeq_get_length(&ctx->initial_peers) > 0)
{
if (!DDS_StringSeq_copy(&dp_qos.discovery.initial_peers, &ctx->initial_peers)) {
RMW_CONNEXT_LOG_ERROR_SET("failed to copy initial peers sequence")
return RMW_RET_ERROR;
}
switch (ctx->participant_qos_override_policy) {
case rmw_context_impl_t::participant_qos_override_policy_t::All:
case rmw_context_impl_t::participant_qos_override_policy_t::Basic:
{
if (DDS_StringSeq_get_length(&ctx->initial_peers) > 0)
{
if (!DDS_StringSeq_copy(&dp_qos.discovery.initial_peers, &ctx->initial_peers)) {
RMW_CONNEXT_LOG_ERROR_SET("failed to copy initial peers sequence")
return RMW_RET_ERROR;
}
}
break;
}
default:
{
break;
}
}

return RMW_RET_OK;
Expand All @@ -111,7 +120,7 @@ rmw_ret_t
rmw_context_impl_t::initialize_node(
const rmw_discovery_options_t * const discovery_options)
{
if (0u != this->node_count) {
if (this->node_count > 0) {
bool params_equal = false;
if (rmw_discovery_options_equal(this->discovery_options, discovery_options, &params_equal) != RMW_RET_OK) {
RMW_CONNEXT_LOG_ERROR_SET("invalid discovery params argument");
Expand All @@ -123,46 +132,110 @@ rmw_context_impl_t::initialize_node(
"node is being initialized with incompatible discovery parameters");
return RMW_RET_ERROR;
}
} else {
if (nullptr != discovery_options) {
RMW_CONNEXT_ASSERT(nullptr == this->discovery_options)
this->discovery_options = (rmw_discovery_options_t*)
this->base->options.allocator.allocate(
sizeof(rmw_discovery_options_t),
this->base->options.allocator.state);
if (nullptr == this->discovery_options) {
RMW_CONNEXT_LOG_ERROR_SET("failed to allocate discovery options")
return RMW_RET_ERROR;
}
const auto rc = rmw_discovery_options_copy(
discovery_options,
&this->base->options.allocator,
this->discovery_options);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to copy discovery parameters");
return rc;
}
/* Support for ROS_AUTOMATIC_DISCOVERY_RANGE == LOCALHOST
-----------------------------------------
Connext looks at variable NDDS_DISCOVERY_PEERS to determine
whether it should add a multicast locator to the set of locators
used for discovery. If this variable is empty, or if it
contains at least one multicast address, a multicast locator is
used for discovery (the default 239.255.0.1 group when the
variable is empty, the first multicast locator found in the list
when one or more multicast addresses are present).
Because of this, we must make sure that NDDS_DISCOVERY_PEERS
contains some value to prevent this default behavior, otherwise
the participant will be announcing the default multicast group,
which in turn will cause ROS_STATIC_PEERS not to work correctly. */
if (RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST ==
this->discovery_options->automatic_discovery_range)
{
static const char * const NDDS_DISCOVERY_PEERS = "NDDS_DISCOVERY_PEERS";
const char * ndds_disc_peers = nullptr;
const char * lookup_rc = rcutils_get_env(NDDS_DISCOVERY_PEERS, &ndds_disc_peers);
if (nullptr != lookup_rc || nullptr == ndds_disc_peers) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
NDDS_DISCOVERY_PEERS,
lookup_rc)
return RMW_RET_ERROR;
}
if (ndds_disc_peers[0] == '\0'
&& !rcutils_set_env(NDDS_DISCOVERY_PEERS, RMW_CONNEXT_LOCALHOST_ONLY_ADDRESS))
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to set environment: var=%s",
NDDS_DISCOVERY_PEERS)
return RMW_RET_ERROR;
}
}
}

this->node_count += 1;
RMW_CONNEXT_LOG_DEBUG_A(
"initialized new node: total=%lu", this->node_count);
return RMW_RET_OK;
}
if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipantFactory")

if (RMW_RET_OK !=
rmw_connextdds_initialize_participant_factory_context(this))
{
RMW_CONNEXT_LOG_ERROR(
"failed to initialize DDS DomainParticipantFactory context")
return RMW_RET_ERROR;
}

if (!this->discovery_options && discovery_options) {
this->discovery_options = (rmw_discovery_options_t*)
this->base->options.allocator.allocate(
sizeof(rmw_discovery_options_t),
this->base->options.allocator.state);
RMW_Connext_gv_DomainParticipantFactory =
DDS_DomainParticipantFactory_get_instance();
if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
RMW_CONNEXT_LOG_ERROR_SET("failed to get DDS participant factory")
return RMW_RET_ERROR;
}

const auto rc = rmw_discovery_options_copy(
discovery_options,
this->base->options.allocator,
this->discovery_options);
if (RMW_RET_OK !=
rmw_connextdds_initialize_participant_factory_qos(this))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to set DDS participant factory QoS")
return RMW_RET_ERROR;
}

RMW_CONNEXT_LOG_DEBUG("DDS DomainParticipantFactory initialized")
}
RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_gv_DomainParticipantFactory)

rmw_ret_t rc = this->initialize_participant();
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to copy discovery parameters");
RMW_CONNEXT_LOG_ERROR("failed to initialize DomainParticipant")
return rc;
}
}

rmw_ret_t rc = this->initialize_participant();
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to initialize DomainParticipant")
return rc;
}

rc = this->enable_participant();
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to enable DomainParticipant")
if (RMW_RET_OK != this->finalize_participant()) {
RMW_CONNEXT_LOG_ERROR("failed to finalize participant on error")
rc = this->enable_participant();
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR("failed to enable DomainParticipant")
if (RMW_RET_OK != this->finalize_participant()) {
RMW_CONNEXT_LOG_ERROR("failed to finalize participant on error")
}
return rc;
}
return rc;
}

this->node_count = 1;

this->node_count += 1;
RMW_CONNEXT_LOG_DEBUG_A("initialized new node: total=%lu", this->node_count);
return RMW_RET_OK;
}

Expand Down Expand Up @@ -442,19 +515,19 @@ rmw_context_impl_t::finalize()
{
rmw_ret_t rc_exit = RMW_RET_OK;

if (this->discovery_options && this->base) {
if (nullptr != this->discovery_options) {
const auto rc = rmw_discovery_options_fini(
this->discovery_options,
this->base->options.allocator);
this->discovery_options);
if (RMW_RET_OK != rc) {
RMW_CONNEXT_LOG_ERROR_A(
"failed to deallocate discovery options: %i",
rc);
RMW_CONNEXT_LOG_ERROR("failed to deallocate discovery options");
rc_exit = RMW_RET_ERROR;
}
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
this->discovery_options = nullptr;
}

if (this->domain_tag) {
if (nullptr != this->domain_tag) {
DDS_String_free(this->domain_tag);
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
this->domain_tag = nullptr;
}

RMW_CONNEXT_LOG_DEBUG_A(
Expand Down Expand Up @@ -1013,7 +1086,7 @@ rmw_api_connextdds_init(
ctx->optimize_large_data = '\0' == disable_optimize_large_data_env[0];
#endif /* RMW_CONNEXT_DEFAULT_LARGE_DATA_OPTIMIZATIONS */

/* Lookup and configure initial peer from environment */
/* Lookup and configure initial peer from environment */
const char * initial_peers = nullptr;
lookup_rc =
rcutils_get_env(RMW_CONNEXT_ENV_INITIAL_PEERS, &initial_peers);
Expand Down Expand Up @@ -1044,36 +1117,6 @@ rmw_api_connextdds_init(
RMW_CONNEXT_LOG_DEBUG_A("initial DDS peers: %s", initial_peers)
}

if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipantFactory")

if (RMW_RET_OK !=
rmw_connextdds_initialize_participant_factory_context(ctx))
{
RMW_CONNEXT_LOG_ERROR(
"failed to initialize DDS DomainParticipantFactory context")
return RMW_RET_ERROR;
}

RMW_Connext_gv_DomainParticipantFactory =
DDS_DomainParticipantFactory_get_instance();
if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
RMW_CONNEXT_LOG_ERROR_SET("failed to get DDS participant factory")
return RMW_RET_ERROR;
}

if (RMW_RET_OK !=
rmw_connextdds_initialize_participant_factory_qos(ctx))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to set DDS participant factory QoS")
return RMW_RET_ERROR;
}

RMW_CONNEXT_LOG_DEBUG("DDS DomainParticipantFactory initialized")
}
RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_gv_DomainParticipantFactory)
RMW_CONNEXT_ASSERT(1 == RMW_Connext_gv_ContextCount)

scope_exit_context_finalize.cancel();
scope_exit_context_opts_finalize.cancel();
scope_exit_context_reset.cancel();
Expand Down
11 changes: 7 additions & 4 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ rmw_ret_t rmw_connextdds_extend_initial_peer_list(

const auto initial_length = DDS_StringSeq_get_length(out);
const auto new_seq_length = initial_length + static_peer_count;
DDS_StringSeq_ensure_length(out, new_seq_length, new_seq_length);
if (!DDS_StringSeq_ensure_length(out, new_seq_length, new_seq_length)) {
RMW_CONNEXT_LOG_ERROR_SET("failed to resize string sequence")
return RMW_RET_ERROR;
}
for (size_t s=0; s < static_peer_count; ++s) {
const auto index = initial_length + s;
const char * peer = static_peers[s].peer_address;
Expand All @@ -234,14 +237,14 @@ rmw_ret_t rmw_connextdds_extend_initial_peer_list(
then free their memory. */
DDS_String_free(*element_ref);
}
const auto peer_char_len = strlen(peer);
*element_ref = DDS_String_alloc(peer_char_len);
const auto peer_char_len = strnlen(peer, RMW_DISCOVERY_OPTIONS_STATIC_PEERS_MAX_LENGTH);
*element_ref = DDS_String_alloc(peer_char_len + 1); // +1 for nul character
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
if (nullptr == *element_ref) {
RMW_CONNEXT_LOG_ERROR_SET("failed to allocate string");
return RMW_RET_ERROR;
}

memcpy(*element_ref, peer, peer_char_len);
std::memcpy(*element_ref, peer, peer_char_len + 1);
RMW_CONNEXT_LOG_TRACE_A(
"inserted static peer: i=%d, peer='%s'",
index, peer);
Expand Down
64 changes: 36 additions & 28 deletions rmw_connextdds_common/src/ndds/dds_api_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cmath>

#include "rcutils/process.h"
#include "rcutils/snprintf.h"

#include "rmw/impl/cpp/key_value.hpp"
#include "rmw_connextdds/custom_sql_filter.hpp"
Expand Down Expand Up @@ -202,8 +203,17 @@ rmw_connextdds_initialize_participant_qos_impl(
#endif /* RMW_CONNEXT_SHARE_DDS_ENTITIES_WITH_CPP */
if (!ctx->domain_tag) {
const auto pid = rcutils_get_pid();
ctx->domain_tag = DDS_String_alloc(18 + log10(pid));
sprintf(ctx->domain_tag, "ros_discovery_off_%d", pid);
static const char * format_string = "ros_discovery_off_%d";
int bytes_needed = rcutils_snprintf(nullptr, 0, format_string, pid);
ctx->domain_tag = DDS_String_alloc(bytes_needed + 1);
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
if (nullptr == ctx->domain_tag) {
RMW_CONNEXT_LOG_ERROR_SET("failed to allocate domain tag string");
return RMW_RET_BAD_ALLOC;
}
if (rcutils_snprintf(ctx->domain_tag, bytes_needed + 1, format_string, pid) < 0) {
RMW_CONNEXT_LOG_ERROR_SET("failed to format ros discovery off information into domain tag");
return RMW_RET_ERROR;
}
}

switch (ctx->participant_qos_override_policy) {
Expand All @@ -215,44 +225,42 @@ rmw_connextdds_initialize_participant_qos_impl(
// https://community.rti.com/static/documentation/connext-dds/6.1.1/doc/manuals/connext_dds_professional/properties_reference/index.html
if (ctx->discovery_options) {
const auto range = ctx->discovery_options->automatic_discovery_range;
bool set_multicast_peers = false;
switch (range) {
case RMW_AUTOMATIC_DISCOVERY_RANGE_SUBNET:
/* No action needed. This is the default discovery behavior for DDS */
break;
case RMW_AUTOMATIC_DISCOVERY_RANGE_DEFAULT:
case RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST:
case RMW_AUTOMATIC_DISCOVERY_RANGE_OFF:
set_multicast_peers = true;
break;
default:
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
RMW_CONNEXT_LOG_WARNING_A(
"Unknown value provided for automatic discovery range: %i",
ctx->discovery_options->automatic_discovery_range);
/* Fall back to the default behavior */
[[clang::fallthrough]];
/* [[fallthrough]]; // Uncomment this when migrating to C++17 */
case RMW_AUTOMATIC_DISCOVERY_RANGE_DEFAULT:
/* Same behavior as LOCALHOST */
[[clang::fallthrough]];
/* [[fallthrough]]; // Uncomment this when migrating to C++17 */
case RMW_AUTOMATIC_DISCOVERY_RANGE_LOCALHOST:
/* Same interface settings as OFF */
[[clang::fallthrough]];
/* [[fallthrough]]; // Uncomment this when migrating to C++17 */
case RMW_AUTOMATIC_DISCOVERY_RANGE_OFF:
/* Note: We allow the LOCALHOST interface for the OFF range
because if we leave this property completely blank then it
has the opposite effect and allows all interfaces to be used.
Allowing only LOCALHOST at least minimizes the unnecessary
discovery traffic and prevents discovery with other host
machines, while the domain_tag protects against same-host
connections. */
if (DDS_RETCODE_OK != DDS_PropertyQosPolicyHelper_assert_property(
set_multicast_peers = true;
break;
}
if (set_multicast_peers) {
/* Note: We allow the LOCALHOST interface for the OFF range
because if we leave this property completely blank then it
has the opposite effect and allows all interfaces to be used.
Allowing only LOCALHOST at least minimizes the unnecessary
discovery traffic and prevents discovery with other host
machines, while the domain_tag protects against same-host
connections. */
if (DDS_RETCODE_OK != DDS_PropertyQosPolicyHelper_assert_property(
&dp_qos->property,
"dds.transport.UDPv4.builtin.parent.allow_multicast_interfaces_list",
RMW_CONNEXT_LOCALHOST_ONLY_ADDRESS,
DDS_BOOLEAN_FALSE /* propagate */))
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to assert property on participant: %s",
"dds.transport.UDPv4.builtin.parent.allow_multicast_interfaces_list")
return RMW_RET_ERROR;
}
{
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to assert property on participant: %s",
"dds.transport.UDPv4.builtin.parent.allow_multicast_interfaces_list")
return RMW_RET_ERROR;
}
}

if (RMW_AUTOMATIC_DISCOVERY_RANGE_OFF == range) {
Expand Down