Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for user-specified content filters #68

Merged
merged 30 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1e0027e
Add support for user-specified content filters.
asorbini Oct 22, 2021
d4788f9
- Resolve memory leak of custom content-filter resources
asorbini Oct 25, 2021
d9a2ba0
Assume non-null options argument
asorbini Oct 25, 2021
1c313e3
- Return error when retrieving content-filter from a subscription tha…
asorbini Oct 25, 2021
3c532ce
Fix compilation error, oops.
asorbini Oct 25, 2021
669ff31
- Define RMW_CONNEXT_DEBUG when building Debug libraries.
asorbini Oct 25, 2021
13087f1
Resolve memory leak for finalization on error.
asorbini Oct 26, 2021
883e9cf
Rename content filter public API.
asorbini Oct 26, 2021
ced2a15
Add client/service QoS getters (#67)
mauropasse Nov 19, 2021
896839f
Changelogs
ivanpauno Nov 19, 2021
a3bbfb6
0.8.1
ivanpauno Nov 19, 2021
8b10deb
Fix cpplint errors (#69)
jacobperron Jan 12, 2022
099692a
0.8.2
paudrow Jan 15, 2022
70d6e2e
Update rti-connext-dds dependency to 6.0.1. (#71)
nuclearsandwich Feb 10, 2022
bbdf5af
0.8.3
nuclearsandwich Feb 10, 2022
4129331
Add rmw listener apis (#44)
Feb 24, 2022
49f3b55
Changelog. (#73)
clalancette Mar 1, 2022
4becb93
0.9.0
clalancette Mar 1, 2022
203f820
add stub for content filtered topic
Mar 18, 2022
5a36784
* Rebased branch asorbini/cft on top of 0.9.0.
asorbini Mar 19, 2022
d43b032
Move custom SQL filter to rmw_connextdds_common
asorbini Mar 19, 2022
60a347c
Try to resolve linking error on Windows.
asorbini Mar 19, 2022
d7d211c
Optionally disable writer-side CFT optimizations to support Windows.
asorbini Mar 20, 2022
ad985bc
No need to declare private CFT function on Windows.
asorbini Mar 20, 2022
f44dceb
Merge branch 'master' into asorbini/cft
fujitatomoya Mar 21, 2022
d30bb6e
remove stub implementation for ContentFilteredTopic.
fujitatomoya Mar 21, 2022
995e244
address cpplint error.
fujitatomoya Mar 21, 2022
b12c6b2
Avoid conversion warnings on Windows.
asorbini Mar 21, 2022
b1b5851
Use strtol instead of sscanf to avoid warnings on Windows.
asorbini Mar 21, 2022
3edb5ae
Avoid finalizing participants if factory is not available.
asorbini Mar 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions rmw_connextdds/src/rmw_api_impl_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,33 +716,25 @@ rmw_subscription_get_actual_qos(
return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos);
}


rmw_ret_t
rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
UNUSED_ARG(subscription);
UNUSED_ARG(options);
RMW_CONNEXT_LOG_NOT_IMPLEMENTED
return RMW_RET_UNSUPPORTED;
return rmw_api_connextdds_subscription_set_content_filter(
subscription, options);
}


rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
UNUSED_ARG(subscription);
UNUSED_ARG(allocator);
UNUSED_ARG(options);
RMW_CONNEXT_LOG_NOT_IMPLEMENTED
return RMW_RET_UNSUPPORTED;
return rmw_api_connextdds_subscription_get_content_filter(
subscription, allocator, options);
}


rmw_ret_t
rmw_destroy_subscription(
rmw_node_t * node,
Expand Down
11 changes: 11 additions & 0 deletions rmw_connextdds_common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ function(rtirmw_add_library)
ament_target_dependencies(${_rti_build_NAME}
${_rti_build_DEPS})

set(_extra_defines)
if("${CMAKE_BUILD_TYPE}" MATCHES "[dD]ebug")
list(APPEND _extra_defines "RMW_CONNEXT_DEBUG=1")
endif()

target_compile_definitions(${_rti_build_NAME}
PUBLIC
RMW_VERSION_MAJOR=${rmw_VERSION_MAJOR}
RMW_VERSION_MINOR=${rmw_VERSION_MINOR}
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
RMW_CONNEXT_DDS_API=RMW_CONNEXT_DDS_API_${_rti_build_API}
${_rti_build_DEFINES}
${_extra_defines}
)

set(private_defines)
Expand Down Expand Up @@ -189,6 +195,9 @@ else()
if("${CONNEXTDDS_VERSION}" VERSION_LESS "6.0.0")
list(APPEND extra_defines "RMW_CONNEXT_DDS_API_PRO_LEGACY=1")
endif()
if(CONNEXTDDS_ARCH MATCHES ".*Win.*")
list(APPEND extra_defines "RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE=1")
endif()
rtirmw_add_library(
NAME ${PROJECT_NAME}_pro
API PRO
Expand All @@ -197,8 +206,10 @@ else()
src/ndds/rmw_type_support_ndds.cpp
src/ndds/rmw_typecode.cpp
src/ndds/dds_api_ndds.cpp
src/ndds/custom_sql_filter.cpp
include/rmw_connextdds/typecode.hpp
include/rmw_connextdds/dds_api_ndds.hpp
include/rmw_connextdds/custom_sql_filter.hpp
DEPS ${RMW_CONNEXT_DEPS}
LIBRARIES RTIConnextDDS::c_api
DEFINES ${extra_defines})
Expand Down
66 changes: 66 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 Real-Time Innovations, Inc. (RTI)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_
#define RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_

#include "rmw_connextdds/dds_api.hpp"

#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO

namespace rti_connext_dds_custom_sql_filter
{

struct CustomSqlFilterData
{
DDS_SqlFilterGeneratorQos base;

CustomSqlFilterData();

DDS_ReturnCode_t
set_memory_management_property(
const DDS_DomainParticipantQos & dp_qos);
};

RMW_CONNEXTDDS_PUBLIC
DDS_ReturnCode_t
register_content_filter(
DDS_DomainParticipant * const participant,
CustomSqlFilterData * const filter_data);

RMW_CONNEXTDDS_PUBLIC
extern const char * const PLUGIN_NAME;

} // namespace rti_connext_dds_custom_sql_filter

#if !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE
extern "C" {
// This is an internal function from RTI Connext DDS which allows a filter to
// be registered as "built-in". We need this because we want this custom filter
// to be a replacement for the built-in SQL-like filter.
RMW_CONNEXTDDS_PUBLIC
DDS_ReturnCode_t
DDS_ContentFilter_register_filter(
DDS_DomainParticipant * participant,
const char * name,
const struct DDS_ContentFilter * filter,
const DDS_ContentFilterEvaluateFunction evaluateOnSerialized,
const DDS_ContentFilterWriterEvaluateFunction writerEvaluateOnSerialized,
const DDS_ContentFilterQueryFunction query,
DDS_Boolean isBuiltin);
}
#endif // RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE

#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO

#endif // RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_
18 changes: 18 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ rmw_connextdds_initialize_participant_qos_impl(
rmw_context_impl_t * const ctx,
DDS_DomainParticipantQos * const dp_qos);

rmw_ret_t
rmw_connextdds_configure_participant(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const participant);

rmw_ret_t
rmw_connextdds_create_contentfilteredtopic(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Topic * const base_topic,
const char * const cft_name,
const char * const cft_filter,
const rcutils_string_array_t * const cft_expression_parameters,
DDS_TopicDescription ** const cft_out);

rmw_ret_t
Expand Down Expand Up @@ -258,4 +264,16 @@ rmw_connextdds_enable_security(
DDS_SECURITY_PROPERTY_PREFIX ".logging.log_level"
#endif /* DDS_SECURITY_LOGGING_LEVEL_PROPERTY */

rmw_ret_t
rmw_connextdds_set_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
const char * const cft_expression,
const rcutils_string_array_t * const cft_expression_parameters);

rmw_ret_t
rmw_connextdds_get_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * const options);

#endif // RMW_CONNEXTDDS__DDS_API_HPP_
12 changes: 0 additions & 12 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,6 @@

#include "rcutils/types.h"

DDS_SEQUENCE(RMW_Connext_Uint8ArrayPtrSeq, rcutils_uint8_array_t *);

typedef RMW_Connext_Uint8ArrayPtrSeq RMW_Connext_UntypedSampleSeq;

#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER

#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \
*RMW_Connext_Uint8ArrayPtrSeq_get_reference(seq_, i_)

#define DDS_UntypedSampleSeq_get_length(seq_) \
RMW_Connext_Uint8ArrayPtrSeq_get_length(seq_)

#if RMW_CONNEXT_DDS_API_PRO_LEGACY
#ifndef RTIXCdrLong_MAX
#define RTIXCdrLong_MAX 2147483647
Expand Down
12 changes: 12 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ rmw_api_connextdds_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
Expand Down
22 changes: 22 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ class RMW_Connext_Subscriber
rmw_message_info_t * const message_info,
bool * const taken);

rmw_ret_t
set_content_filter(
const rmw_subscription_content_filter_options_t * const options);

rmw_ret_t
get_content_filter(
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * const options);

bool
has_data()
{
Expand Down Expand Up @@ -470,6 +479,17 @@ class RMW_Connext_Subscriber
return this->dds_topic;
}

static std::string get_atomic_id()
{
static std::atomic_uint64_t id;
return std::to_string(id++);
}

bool is_cft_enabled()
{
return !this->cft_expression.empty();
}

const bool internal;
const bool ignore_local;

Expand All @@ -478,6 +498,7 @@ class RMW_Connext_Subscriber
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
std::string cft_expression;
RMW_Connext_MessageTypeSupport * type_support;
rmw_gid_t ros_gid;
const bool created_topic;
Expand All @@ -496,6 +517,7 @@ class RMW_Connext_Subscriber
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);

friend class RMW_Connext_SubscriberStatusCondition;
Expand Down
15 changes: 15 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/static_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
#ifndef RMW_CONNEXTDDS__STATIC_CONFIG_HPP_
#define RMW_CONNEXTDDS__STATIC_CONFIG_HPP_

/******************************************************************************
* Debug flags
******************************************************************************/
#ifndef RMW_CONNEXT_DEBUG
#define RMW_CONNEXT_DEBUG 0
#endif // RMW_CONNEXT_DEBUG

/******************************************************************************
* Default User Configuration
******************************************************************************/
Expand Down Expand Up @@ -311,6 +318,14 @@
#define RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE 1
#endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */

/******************************************************************************
* On windows, the custom SQL filter cannot be registered as "built-in", so we
* must enable some additional code to register it as a user plugin.
******************************************************************************/
#ifndef RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE
#define RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE 0
#endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */

#include "resource_limits.hpp"

#endif // RMW_CONNEXTDDS__STATIC_CONFIG_HPP_
30 changes: 27 additions & 3 deletions rmw_connextdds_common/include/rmw_connextdds/type_support.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,21 @@ class RMW_Connext_MessageTypeSupport

struct RMW_Connext_Message
{
const void * user_data;
bool serialized;
RMW_Connext_MessageTypeSupport * type_support;
const void * user_data{nullptr};
bool serialized{false};
RMW_Connext_MessageTypeSupport * type_support{nullptr};
rcutils_uint8_array_t data_buffer;
};

rmw_ret_t
RMW_Connext_Message_initialize(
RMW_Connext_Message * const self,
RMW_Connext_MessageTypeSupport * const type_support,
const size_t data_buffer_size);

void
RMW_Connext_Message_finalize(RMW_Connext_Message * const self);

class RMW_Connext_ServiceTypeSupportWrapper
{
public:
Expand Down Expand Up @@ -276,5 +286,19 @@ class RMW_Connext_ServiceTypeSupportWrapper
const rosidl_service_type_support_t * const type_supports);
};

#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
DDS_SEQUENCE(RMW_Connext_MessagePtrSeq, RMW_Connext_Message *);

typedef RMW_Connext_MessagePtrSeq RMW_Connext_UntypedSampleSeq;

#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER

#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \
*RMW_Connext_MessagePtrSeq_get_reference(seq_, i_)

#define DDS_UntypedSampleSeq_get_length(seq_) \
RMW_Connext_MessagePtrSeq_get_length(seq_)

#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO

#endif // RMW_CONNEXTDDS__TYPE_SUPPORT_HPP_
25 changes: 23 additions & 2 deletions rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only)
return RMW_RET_ERROR;
}

rmw_ret_t cfg_rc = rmw_connextdds_configure_participant(this, this->participant);
if (RMW_RET_OK != cfg_rc) {
RMW_CONNEXT_LOG_ERROR("failed to configure DDS participant")
return cfg_rc;
}

/* Create DDS publisher/subscriber objects that will be used for all DDS
writers/readers created to support RMW publishers/subscriptions. */

Expand Down Expand Up @@ -332,7 +338,21 @@ rmw_ret_t
rmw_context_impl_t::finalize_participant()
{
RMW_CONNEXT_LOG_DEBUG("finalizing DDS DomainParticipant")

#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
// If we are building in Debug mode, an issue in Connext may prevent the
// participant from being able to delete any content-filtered topic if
// the participant has not been enabled.
// For this reason, make sure to enable the participant before trying to
// finalize it.
// TODO(asorbini) reconsider the need for this code in Connext > 6.1.0
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant)))
{
RMW_CONNEXT_LOG_ERROR_SET(
"failed to enable DomainParticipant before deletion")
return RMW_RET_ERROR;
}
#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
if (RMW_RET_OK != rmw_connextdds_graph_finalize(this)) {
RMW_CONNEXT_LOG_ERROR("failed to finalize graph cache")
return RMW_RET_ERROR;
Expand Down Expand Up @@ -386,7 +406,7 @@ rmw_context_impl_t::finalize_participant()
if (nullptr != this->participant) {
// If we are cleaning up after some RMW failure, it is possible for some
// DataWriter to not have been deleted.
// Call DDS_Publisher_delete_contained_entities() to make sure we can
// Call DDS_DomainParticipant_delete_contained_entities() to make sure we can
// dispose the publisher.
if (DDS_RETCODE_OK !=
DDS_DomainParticipant_delete_contained_entities(this->participant))
Expand All @@ -402,6 +422,7 @@ rmw_context_impl_t::finalize_participant()
RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS participant")
return RMW_RET_ERROR;
}

this->participant = nullptr;
}

Expand Down
Loading