Skip to content

Commit

Permalink
Add support for user-specified content filters.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Sorbini <[email protected]>
  • Loading branch information
asorbini committed Oct 22, 2021
1 parent 944da9d commit 1e0027e
Show file tree
Hide file tree
Showing 17 changed files with 1,297 additions and 14 deletions.
18 changes: 18 additions & 0 deletions rmw_connextdds/src/rmw_api_impl_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,24 @@ rmw_subscription_get_actual_qos(
return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const rmw_subscription_content_filtered_topic_options_t * options)
{
return rmw_api_connextdds_subscription_set_cft_expression_parameters(
subscription, options);
}

rmw_ret_t
rmw_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filtered_topic_options_t * options)
{
return rmw_api_connextdds_subscription_get_cft_expression_parameters(
subscription, allocator, options);
}

rmw_ret_t
rmw_destroy_subscription(
Expand Down
3 changes: 2 additions & 1 deletion rmw_connextdds_common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ set(RMW_CONNEXT_DEPS
rosidl_typesupport_fastrtps_cpp
rosidl_typesupport_introspection_c
rosidl_typesupport_introspection_cpp
rti_connext_dds_cmake_module)
rti_connext_dds_cmake_module
rti_connext_dds_custom_sql_filter)

foreach(pkg_dep ${RMW_CONNEXT_DEPS})
find_package(${pkg_dep} REQUIRED)
Expand Down
18 changes: 18 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ rmw_connextdds_initialize_participant_qos_impl(
rmw_context_impl_t * const ctx,
DDS_DomainParticipantQos * const dp_qos);

rmw_ret_t
rmw_connextdds_configure_participant(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const participant);

rmw_ret_t
rmw_connextdds_create_contentfilteredtopic(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Topic * const base_topic,
const char * const cft_name,
const char * const cft_filter,
const rcutils_string_array_t * const cft_expression_parameters,
DDS_TopicDescription ** const cft_out);

rmw_ret_t
Expand Down Expand Up @@ -258,4 +264,16 @@ rmw_connextdds_enable_security(
DDS_SECURITY_PROPERTY_PREFIX ".logging.log_level"
#endif /* DDS_SECURITY_LOGGING_LEVEL_PROPERTY */

rmw_ret_t
rmw_connextdds_set_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
const char * const cft_expression,
const rcutils_string_array_t * const cft_expression_parameters);

rmw_ret_t
rmw_connextdds_get_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filtered_topic_options_t * const options);

#endif // RMW_CONNEXTDDS__DDS_API_HPP_
12 changes: 12 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@ rmw_api_connextdds_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const rmw_subscription_content_filtered_topic_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filtered_topic_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
Expand Down
22 changes: 22 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ class RMW_Connext_Subscriber
rmw_message_info_t * const message_info,
bool * const taken);

rmw_ret_t
set_cft_expression_parameters(
const rmw_subscription_content_filtered_topic_options_t * const options);

rmw_ret_t
get_cft_expression_parameters(
rcutils_allocator_t * allocator,
rmw_subscription_content_filtered_topic_options_t * const options);

bool
has_data()
{
Expand Down Expand Up @@ -470,6 +479,17 @@ class RMW_Connext_Subscriber
return this->dds_topic;
}

static std::string get_atomic_id()
{
static std::atomic_uint64_t id;
return std::to_string(id++);
}

bool is_cft_enabled()
{
return !this->cft_expression.empty();
}

const bool internal;
const bool ignore_local;

Expand All @@ -478,6 +498,7 @@ class RMW_Connext_Subscriber
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
std::string cft_expression;
RMW_Connext_MessageTypeSupport * type_support;
rmw_gid_t ros_gid;
const bool created_topic;
Expand All @@ -496,6 +517,7 @@ class RMW_Connext_Subscriber
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);

friend class RMW_Connext_SubscriberStatusCondition;
Expand Down
1 change: 1 addition & 0 deletions rmw_connextdds_common/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<buildtool_export_depend>ament_cmake</buildtool_export_depend>

<depend>rti_connext_dds_cmake_module</depend>
<depend>rti_connext_dds_custom_sql_filter</depend>
<depend>fastcdr</depend>
<depend>rcutils</depend>
<depend>rcpputils</depend>
Expand Down
6 changes: 6 additions & 0 deletions rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only)
return RMW_RET_ERROR;
}

rmw_ret_t cfg_rc = rmw_connextdds_configure_participant(this, this->participant);
if (RMW_RET_OK != cfg_rc) {
RMW_CONNEXT_LOG_ERROR("failed to configure DDS participant")
return cfg_rc;
}

/* Create DDS publisher/subscriber objects that will be used for all DDS
writers/readers created to support RMW publishers/subscriptions. */

Expand Down
75 changes: 66 additions & 9 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
const char * const ROS_TOPIC_PREFIX = "rt";
const char * const ROS_SERVICE_REQUESTER_PREFIX = ROS_SERVICE_REQUESTER_PREFIX_STR;
const char * const ROS_SERVICE_RESPONSE_PREFIX = ROS_SERVICE_RESPONSE_PREFIX_STR;
const char * const ROS_CFT_TOPIC_NAME_INFIX = "_ContentFilterTopic";

std::string
rmw_connextdds_create_topic_name(
Expand Down Expand Up @@ -1129,13 +1130,15 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber(
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal)
: internal(internal),
ignore_local(ignore_local),
ctx(ctx),
dds_reader(dds_reader),
dds_topic(dds_topic),
dds_topic_cft(dds_topic_cft),
cft_expression(cft_expression),
type_support(type_support),
created_topic(created_topic),
status_condition(dds_reader, ignore_local, internal)
Expand Down Expand Up @@ -1256,19 +1259,40 @@ RMW_Connext_Subscriber::create(
});

DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic);
std::string sub_cft_name;
const char * sub_cft_expr = "";
const rcutils_string_array_t * sub_cft_params = nullptr;

if (nullptr != cft_name) {
rmw_ret_t cft_rc =
rmw_connextdds_create_contentfilteredtopic(
ctx, dp, topic, cft_name, cft_filter, &cft_topic);
sub_cft_name = cft_name;
sub_cft_expr = cft_filter;
} else {
sub_cft_name =
fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id();
if (nullptr != subscriber_options->content_filtered_topic_options) {
sub_cft_expr =
subscriber_options->content_filtered_topic_options->filter_expression;
sub_cft_params =
subscriber_options->content_filtered_topic_options->expression_parameters;
}
}

if (RMW_RET_OK != cft_rc) {
if (RMW_RET_UNSUPPORTED != cft_rc) {
return nullptr;
}
} else {
sub_topic = cft_topic;
rmw_ret_t cft_rc =
rmw_connextdds_create_contentfilteredtopic(
ctx,
dp,
topic,
sub_cft_name.c_str(),
sub_cft_expr,
sub_cft_params,
&cft_topic);

if (RMW_RET_OK != cft_rc) {
if (RMW_RET_UNSUPPORTED != cft_rc) {
return nullptr;
}
} else {
sub_topic = cft_topic;
}

// The following initialization generates warnings when built
Expand Down Expand Up @@ -1337,6 +1361,7 @@ RMW_Connext_Subscriber::create(
subscriber_options->ignore_local_publications,
topic_created,
cft_topic,
sub_cft_expr,
internal);

if (nullptr == rmw_sub_impl) {
Expand Down Expand Up @@ -1529,6 +1554,37 @@ RMW_Connext_Subscriber::take_serialized(
return rc;
}


rmw_ret_t
RMW_Connext_Subscriber::set_cft_expression_parameters(
const rmw_subscription_content_filtered_topic_options_t * const options)
{
const char * const filter_expression =
(nullptr != options && nullptr != options->filter_expression) ?
options->filter_expression : "";

const rcutils_string_array_t * filter_params =
(nullptr != options) ? options->expression_parameters : nullptr;

rmw_ret_t rc = rmw_connextdds_set_cft_filter_expression(
this->dds_topic_cft, filter_expression, filter_params);
if (RMW_RET_OK != rc) {
return rc;
}

this->cft_expression = filter_expression;

return RMW_RET_OK;
}

rmw_ret_t
RMW_Connext_Subscriber::get_cft_expression_parameters(
rcutils_allocator_t * allocator,
rmw_subscription_content_filtered_topic_options_t * const options)
{
return rmw_connextdds_get_cft_filter_expression(this->dds_topic_cft, allocator, options);
}

rmw_ret_t
RMW_Connext_Subscriber::loan_messages(const bool update_condition)
{
Expand Down Expand Up @@ -1815,6 +1871,7 @@ rmw_connextdds_create_subscriber(
topic_name_len + 1);
rmw_subscriber->options = *subscriber_options;
rmw_subscriber->can_loan_messages = false;
rmw_subscriber->is_cft_enabled = rmw_sub_impl->is_cft_enabled();

if (!internal) {
if (RMW_RET_OK != rmw_sub_impl->enable()) {
Expand Down
45 changes: 45 additions & 0 deletions rmw_connextdds_common/src/common/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,51 @@ rmw_api_connextdds_subscription_get_actual_qos(
return sub_impl->qos(qos);
}

rmw_ret_t
rmw_api_connextdds_subscription_set_cft_expression_parameters(
rmw_subscription_t * subscription,
const rmw_subscription_content_filtered_topic_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
// RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);

RMW_Connext_Subscriber * const sub_impl =
reinterpret_cast<RMW_Connext_Subscriber *>(subscription->data);

rmw_ret_t rc = sub_impl->set_cft_expression_parameters(options);
subscription->is_cft_enabled = sub_impl->is_cft_enabled();

return rc;
}


rmw_ret_t
rmw_api_connextdds_subscription_get_cft_expression_parameters(
const rmw_subscription_t * subscription,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filtered_topic_options_t * options)
{
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
RMW_CONNEXTDDS_ID,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT);

RMW_Connext_Subscriber * const sub_impl =
reinterpret_cast<RMW_Connext_Subscriber *>(subscription->data);

rmw_ret_t rc = sub_impl->get_cft_expression_parameters(allocator, options);

return rc;
}

rmw_ret_t
rmw_api_connextdds_destroy_subscription(
Expand Down
Loading

0 comments on commit 1e0027e

Please sign in to comment.