From 87cee28df431ff3b5e8488adfb459f540182b0a9 Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Wed, 16 Mar 2022 16:31:36 -0700 Subject: [PATCH 1/5] Add support for listener callbacks. Signed-off-by: Andrea Sorbini --- .../include/rmw_connextdds/dds_api.hpp | 5 + .../include/rmw_connextdds/rmw_api_impl.hpp | 18 ++-- .../rmw_connextdds/rmw_waitset_std.hpp | 98 +++++++++++++++++++ .../src/common/rmw_impl_waitset_std.cpp | 9 ++ .../src/common/rmw_listener.cpp | 92 +++++++++++------ .../src/ndds/dds_api_ndds.cpp | 56 +++++++++++ .../src/rtime/dds_api_rtime.cpp | 34 +++++++ 7 files changed, 271 insertions(+), 41 deletions(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index f342e4b3..50542a6b 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -132,6 +132,11 @@ rmw_ret_t rmw_connextdds_take_samples( RMW_Connext_Subscriber * const sub); +rmw_ret_t +rmw_connextdds_count_unread_samples( + RMW_Connext_Subscriber * const sub, + size_t & unread_count); + rmw_ret_t rmw_connextdds_return_samples( RMW_Connext_Subscriber * const sub); diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp index 8c78e78a..6f531a61 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp @@ -90,8 +90,8 @@ RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_event_set_callback( rmw_event_t * event, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * Info API @@ -435,15 +435,15 @@ RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_service_set_on_new_request_callback( rmw_service_t * rmw_service, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_client_set_on_new_response_callback( rmw_client_t * rmw_client, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * Subscription API @@ -560,9 +560,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription( RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_subscription_set_on_new_message_callback( - rmw_subscription_t * rmw_subscription, - rmw_event_callback_t callback, - const void * user_data); + rmw_subscription_t * const rmw_subscription, + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * WaitSet API diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 09c911e4..99f89309 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -121,6 +121,22 @@ class RMW_Connext_Condition } } + template + void + perform_action_and_update_state(FunctorT && update_condition, FunctorA && action) + { + std::lock_guard internal_lock(this->mutex_internal); + + action(); + + if (nullptr != this->waitset_mutex) { + std::lock_guard lock(*this->waitset_mutex); + update_condition(); + } else { + update_condition(); + } + } + protected: std::mutex mutex_internal; std::mutex * waitset_mutex; @@ -333,8 +349,44 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition virtual bool has_status(const rmw_event_type_t event_type) = 0; + void + notify_new_event() + { + std::unique_lock lock_mutex(new_event_mutex_); + if (new_event_cb_) { + new_event_cb_(user_data_, 1); + } else { + unread_events_count_++; + } + } + + void + set_new_event_callback( + rmw_event_callback_t callback, + const void * user_data) + { + std::unique_lock lock_mutex(new_event_mutex_); + + if (callback) { + // Push events arrived before setting the executor's callback + if (unread_events_count_ > 0) { + callback(user_data, unread_events_count_); + unread_events_count_ = 0; + } + user_data_ = user_data; + new_event_cb_ = callback; + } else { + user_data_ = nullptr; + new_event_cb_ = nullptr; + } + } + protected: DDS_StatusCondition * scond; + std::mutex new_event_mutex_; + rmw_event_callback_t new_event_cb_{nullptr}; + const void * user_data_{nullptr}; + uint64_t unread_events_count_ = 0; }; void @@ -712,6 +764,47 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition return RMW_RET_OK; } + void set_on_new_data_callback( + const rmw_event_callback_t callback, + const void * const user_data) + { + std::unique_lock lock(new_data_event_mutex_); + if (callback) { + if (unread_data_events_count_ > 0) { + callback(user_data, unread_data_events_count_); + unread_data_events_count_ = 0; + } + new_data_event_cb_ = callback; + data_event_user_data_ = user_data; + } else { + new_data_event_cb_ = nullptr; + data_event_user_data_ = nullptr; + } + } + + void notify_new_data() + { + size_t unread_samples = 0; + std::unique_lock lock_mutex(new_data_event_mutex_); + perform_action_and_update_state( + [this, &unread_samples]() { + if (unread_samples == 0) { + return; + } + if (new_data_event_cb_) { + new_data_event_cb_(data_event_user_data_, unread_samples); + } else { + unread_data_events_count_ += unread_samples; + } + }, + [this, &unread_samples]() { + const rmw_ret_t rc = rmw_connextdds_count_unread_samples(this->sub, unread_samples); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader") + } + }); + } + protected: void update_status_deadline( const DDS_RequestedDeadlineMissedStatus * const status); @@ -745,6 +838,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition RMW_Connext_Subscriber * sub; + std::mutex new_data_event_mutex_; + rmw_event_callback_t new_data_event_cb_{nullptr}; + const void * data_event_user_data_{nullptr}; + uint64_t unread_data_events_count_ = 0; + friend class RMW_Connext_WaitSet; }; diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index c8e10121..0c58cb2d 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -121,6 +121,7 @@ RMW_Connext_DataReaderListener_on_data_available( UNUSED_ARG(reader); + self->notify_new_data(); self->set_data_available(true); } @@ -706,6 +707,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; + + this->notify_new_event(); } void @@ -720,6 +723,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness( this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count; this->status_liveliness.not_alive_count_change -= this->status_liveliness_last.not_alive_count; + + this->notify_new_event(); } void @@ -852,6 +857,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; + + this->notify_new_event(); } void @@ -863,6 +870,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness( this->status_liveliness.total_count_change = this->status_liveliness.total_count; this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count; + + this->notify_new_event(); } void diff --git a/rmw_connextdds_common/src/common/rmw_listener.cpp b/rmw_connextdds_common/src/common/rmw_listener.cpp index 730eaa92..d0a7958a 100644 --- a/rmw_connextdds_common/src/common/rmw_listener.cpp +++ b/rmw_connextdds_common/src/common/rmw_listener.cpp @@ -19,15 +19,25 @@ ******************************************************************************/ rmw_ret_t rmw_api_connextdds_event_set_callback( - rmw_event_t * event, - rmw_event_callback_t callback, - const void * user_data) + rmw_event_t * const event, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(event); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_event_set_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(event, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + event, + event->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_StatusCondition * condition = nullptr; + if (RMW_Connext_Event::reader_event(event)) { + condition = RMW_Connext_Event::subscriber(event)->condition(); + } else { + condition = RMW_Connext_Event::publisher(event)->condition(); + } + condition->set_new_event_callback(callback, user_data); + return RMW_RET_OK; } /****************************************************************************** @@ -35,28 +45,40 @@ rmw_api_connextdds_event_set_callback( ******************************************************************************/ rmw_ret_t rmw_api_connextdds_service_set_on_new_request_callback( - rmw_service_t * rmw_service, - rmw_event_callback_t callback, - const void * user_data) + rmw_service_t * const service, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_service); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_service_set_on_new_request_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Service * const svc_impl = + reinterpret_cast(service->data); + svc_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } rmw_ret_t rmw_api_connextdds_client_set_on_new_response_callback( - rmw_client_t * rmw_client, - rmw_event_callback_t callback, - const void * user_data) + rmw_client_t * const client, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_client); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_client_set_on_new_response_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Client * const client_impl = + reinterpret_cast(client->data); + client_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } /****************************************************************************** @@ -64,13 +86,19 @@ rmw_api_connextdds_client_set_on_new_response_callback( ******************************************************************************/ rmw_ret_t rmw_api_connextdds_subscription_set_on_new_message_callback( - rmw_subscription_t * rmw_subscription, - rmw_event_callback_t callback, - const void * user_data) + rmw_subscription_t * const subscription, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_subscription); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_subscription_set_on_new_message_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + sub_impl->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 546daea1..857c7791 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -675,6 +675,62 @@ rmw_connextdds_return_samples( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_count_unread_samples( + RMW_Connext_Subscriber * const sub, + size_t & unread_count) +{ + DDS_Boolean is_loan = DDS_BOOLEAN_TRUE; + DDS_Long data_len = 0; + void ** data_buffer = nullptr; + DDS_SampleInfoSeq info_seq = DDS_SEQUENCE_INITIALIZER; + + unread_count = 0; + DDS_ReturnCode_t rc = DDS_RETCODE_ERROR; + do { + rc = DDS_DataReader_read_or_take_instance_untypedI( + sub->reader(), + &is_loan, + &data_buffer, + &data_len, + &info_seq, + 0 /* data_seq_len */, + 0 /* data_seq_max_len */, + DDS_BOOLEAN_TRUE /* data_seq_has_ownership */, + NULL /* data_seq_contiguous_buffer_for_copy */, + 1 /* data_size -- ignored because loaning*/, + DDS_LENGTH_UNLIMITED /* max_samples */, + &DDS_HANDLE_NIL /* a_handle */, + #if !RMW_CONNEXT_DDS_API_PRO_LEGACY + NULL /* topic_query_guid */, + #endif /* RMW_CONNEXT_DDS_API_PRO_LEGACY */ + DDS_NOT_READ_SAMPLE_STATE, + DDS_ANY_VIEW_STATE, + DDS_ANY_INSTANCE_STATE, + DDS_BOOLEAN_FALSE /* take */); + if (DDS_RETCODE_NO_DATA == rc) { + continue; + } + if (DDS_RETCODE_OK != rc && DDS_RETCODE_NO_DATA != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to read data from DDS reader") + return RMW_RET_ERROR; + } + if (DDS_RETCODE_OK == rc) { + unread_count += data_len; + rc = DDS_DataReader_return_loan_untypedI( + sub->reader(), + data_buffer, + data_len, + &info_seq); + if (DDS_RETCODE_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to return loan to DDS reader") + return RMW_RET_ERROR; + } + } + } while (rc == DDS_RETCODE_OK); + + return RMW_RET_OK; +} rmw_ret_t rmw_connextdds_filter_sample( diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 729acd42..0f5672ec 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -1188,6 +1188,40 @@ rmw_connextdds_return_samples( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_count_unread_samples( + RMW_Connext_Subscriber * const sub, + size_t & unread_count) +{ + unread_count = 0; + DDS_SampleInfoSeq info_seq = DDS_SEQUENCE_INITIALIZER; + DDS_UntypedSampleSeq data_seq = DDS_SEQUENCE_INITIALIZER; + DDS_ReturnCode_t rc = DDS_RETCODE_ERROR; + do { + rc = DDS_DataReader_read( + sub->reader(), + &data_seq, + &info_seq, + DDS_LENGTH_UNLIMITED, + DDS_ANY_VIEW_STATE, + DDS_NOT_READ_SAMPLE_STATE, + DDS_ANY_INSTANCE_STATE); + if (DDS_RETCODE_OK != rc && DDS_RETCODE_NO_DATA != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to read data from DDS reader") + return RMW_RET_ERROR; + } + if (DDS_RETCODE_OK == rc) { + unread_count += DDS_UntypedSampleSeq_get_length(&data_seq); + rc = DDS_DataReader_return_loan(sub->reader(), &data_seq, &info_seq); + if (DDS_RETCODE_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to return loan to DDS reader") + return RMW_RET_ERROR; + } + } + } while (rc == DDS_RETCODE_OK); + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_filter_sample( RMW_Connext_Subscriber * const sub, From 679173bd84c149491675b2ec59137cb8ef95850b Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Thu, 17 Mar 2022 10:49:12 -0700 Subject: [PATCH 2/5] Fix wrong debug assertion when converting DDS_Duration values Signed-off-by: Andrea Sorbini --- rmw_connextdds_common/src/common/rmw_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index a74a9010..24d81333 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -285,7 +285,7 @@ dds_duration_to_rmw_time(const DDS_Duration_t & duration) if (DDS_Duration_is_infinite(&duration)) { return RMW_DURATION_INFINITE; } - assert(duration.sec > 0); + assert(duration.sec >= 0); rmw_time_t result = {static_cast(duration.sec), duration.nanosec}; return result; } From 2dd4e92578d16c06d2b073355c92e56858ea2f3e Mon Sep 17 00:00:00 2001 From: Andrea Sorbini Date: Thu, 17 Mar 2022 16:44:17 -0700 Subject: [PATCH 3/5] Clarify interactions between count_unread_samples() and take_next() Signed-off-by: Andrea Sorbini --- .../include/rmw_connextdds/rmw_impl.hpp | 24 ++++++++++++++++++ .../rmw_connextdds/rmw_waitset_std.hpp | 25 ++----------------- .../src/common/rmw_impl_waitset_std.cpp | 24 ++++++++++++++++++ 3 files changed, 50 insertions(+), 23 deletions(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index bd79a4a1..cff0592e 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -453,6 +453,30 @@ class RMW_Connext_Subscriber return has_data; } + rmw_ret_t + count_unread_samples(size_t & unread_count) + { + // The action of counting unread samples is not currently mutually exclusive + // with the action of taking samples out of the reader cache. Unfortunately + // we cannot use a mutex to synchronize calls between + // count_unread_samples() and and take_next() because we would run the risk + // of a deadlock. This is because count_unread_samples() is supposed to be + // called from within the reader's "exclusive area" (since it is + // executed within a listener callback), while take_next() is usually called + // from an executor/application thread and it must acquire the reader's + // "exclusive area" before taking samples. + // This might mean that an application which relies on data callbacks and the + // count provided by this function, and which at the same time polls data + // on the subscription, might end up missing some samples in the total count + // notified to it via callback because the samples were taken out of the + // cache before they could be notified to the listener. + // Fortunately, in Connext the listener callback is notified right after a + // sample is added to the reader cache and before any mutex is released, + // which should allow this function to always report a correct number, as + // long as it is only called within a (DDS) listener callback. + return rmw_connextdds_count_unread_samples(this, unread_count); + } + DDS_Subscriber * dds_subscriber() const { return DDS_DataReader_get_subscriber(this->dds_reader); diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 99f89309..959eddc0 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -123,7 +123,7 @@ class RMW_Connext_Condition template void - perform_action_and_update_state(FunctorT && update_condition, FunctorA && action) + perform_action_and_update_state(FunctorA && action, FunctorT && update_condition) { std::lock_guard internal_lock(this->mutex_internal); @@ -782,28 +782,7 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition } } - void notify_new_data() - { - size_t unread_samples = 0; - std::unique_lock lock_mutex(new_data_event_mutex_); - perform_action_and_update_state( - [this, &unread_samples]() { - if (unread_samples == 0) { - return; - } - if (new_data_event_cb_) { - new_data_event_cb_(data_event_user_data_, unread_samples); - } else { - unread_data_events_count_ += unread_samples; - } - }, - [this, &unread_samples]() { - const rmw_ret_t rc = rmw_connextdds_count_unread_samples(this->sub, unread_samples); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader") - } - }); - } + void notify_new_data(); protected: void update_status_deadline( diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index 0c58cb2d..3bb58623 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -750,6 +750,30 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost( this->status_sample_lost_last.total_count; } +void +RMW_Connext_SubscriberStatusCondition::notify_new_data() +{ + size_t unread_samples = 0; + std::unique_lock lock_mutex(new_data_event_mutex_); + perform_action_and_update_state( + [this, &unread_samples]() { + const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader") + } + }, + [this, &unread_samples]() { + if (unread_samples == 0) { + return; + } + if (new_data_event_cb_) { + new_data_event_cb_(data_event_user_data_, unread_samples); + } else { + unread_data_events_count_ += unread_samples; + } + }); +} + rmw_ret_t RMW_Connext_PublisherStatusCondition::install( RMW_Connext_Publisher * const pub) From 3a88dad32047f361fe579f5916d5a81b1954bb32 Mon Sep 17 00:00:00 2001 From: Christopher Wecht Date: Thu, 30 Mar 2023 08:41:21 +0200 Subject: [PATCH 4/5] Event listeners distinct (#109) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add sequence numbers to message info structure (#74) * Fill reception_sequence_number/publication_sequence_number in all rmw_take_*_with_info() functions Signed-off-by: Ivan Santiago Paunovic * Add rmw_feature_supported() Signed-off-by: Ivan Santiago Paunovic * add stub for content filtered topic (#77) * add stub for content filtered topic Signed-off-by: Chen Lihui * Add support for user-specified content filters (#68) * Add support for user-specified content filters. Signed-off-by: Andrea Sorbini * - Resolve memory leak of custom content-filter resources - Add missing package dependencies for rti_connext_dds_custom_sql_filter - Clean up all participants upon factory finalization - Reset context state upon finalization (rmw_connextddsmicro) Signed-off-by: Andrea Sorbini * Assume non-null options argument Signed-off-by: Andrea Sorbini * - Return error when retrieving content-filter from a subscription that doesn't have one. - Rename internal functions related to content-filters Signed-off-by: Andrea Sorbini * Fix compilation error, oops. Signed-off-by: Andrea Sorbini * - Define RMW_CONNEXT_DEBUG when building Debug libraries. - Make sure participant is enabled before deleting contained entities when using Connext debug libraries. Signed-off-by: Andrea Sorbini * Resolve memory leak for finalization on error. Signed-off-by: Andrea Sorbini * Rename content filter public API. Signed-off-by: Andrea Sorbini * Add client/service QoS getters (#67) Signed-off-by: Mauro Passerino * Changelogs Signed-off-by: Ivan Santiago Paunovic * 0.8.1 * Fix cpplint errors (#69) * Use static_cast instead of C-style cast Fixes cpplint error. Signed-off-by: Jacob Perron * Update NOLINT category Relates to https://github.com/ament/ament_lint/pull/324 Signed-off-by: Jacob Perron * 0.8.2 Signed-off-by: Audrow Nash * Update rti-connext-dds dependency to 6.0.1. (#71) Now that this package is available in the ROS bootstrap repository for Ubuntu Focal and Jammy we can bump the expected dependency version. * 0.8.3 * Add rmw listener apis (#44) * Add stubs for setting listener callbacks Signed-off-by: Mauro Passerino * Address PR suggestions Signed-off-by: Mauro Passerino * Fix linter issues Signed-off-by: Mauro Passerino Co-authored-by: Mauro Passerino Co-authored-by: Alberto Soragna * Changelog. (#73) Signed-off-by: Chris Lalancette * 0.9.0 * add stub for content filtered topic Signed-off-by: Chen Lihui * * Rebased branch asorbini/cft on top of 0.9.0. * Resolved CFT finalization issues on error. * Verified and cleaned up build for rmw_connextddsmicro. Signed-off-by: Andrea Sorbini * Move custom SQL filter to rmw_connextdds_common Signed-off-by: Andrea Sorbini * Try to resolve linking error on Windows. Signed-off-by: Andrea Sorbini * Optionally disable writer-side CFT optimizations to support Windows. Signed-off-by: Andrea Sorbini * No need to declare private CFT function on Windows. Signed-off-by: Andrea Sorbini * remove stub implementation for ContentFilteredTopic. Signed-off-by: Tomoya Fujita * address cpplint error. Signed-off-by: Tomoya Fujita * Avoid conversion warnings on Windows. Signed-off-by: Andrea Sorbini * Use strtol instead of sscanf to avoid warnings on Windows. Signed-off-by: Andrea Sorbini * Avoid finalizing participants if factory is not available. Signed-off-by: Andrea Sorbini Co-authored-by: mauropasse Co-authored-by: Ivan Santiago Paunovic Co-authored-by: Jacob Perron Co-authored-by: Audrow Nash Co-authored-by: Steven! Ragnarök Co-authored-by: Steven! Ragnarök Co-authored-by: iRobot ROS <49500531+irobot-ros@users.noreply.github.com> Co-authored-by: Mauro Passerino Co-authored-by: Alberto Soragna Co-authored-by: Chris Lalancette Co-authored-by: Chen Lihui Co-authored-by: Tomoya Fujita * 0.10.0 Signed-off-by: Audrow Nash * Update launch_testing_ros output filter prefixes for Connext6 (#80) Signed-off-by: Ivan Santiago Paunovic * Properly initialize CDR stream before using it for filtering (#81) Signed-off-by: Andrea Sorbini * Exclude missing sample info fields when building rmw_connextddsmicro (#79) * Exclude missing sample info fields when building micro. * Report features individually for each RMW implementation. * Return special value for unsupported sequence numbers. Signed-off-by: Andrea Sorbini Co-authored-by: Chris Lalancette * 0.11.0 Signed-off-by: Audrow Nash * Resolve build error with RTI Connext DDS 5.3.1 (#82) Signed-off-by: Andrea Sorbini * Changelog. Signed-off-by: Chris Lalancette * 0.11.1 * Use destinct callbacks for each event type --------- Signed-off-by: Ivan Santiago Paunovic Signed-off-by: Chen Lihui Signed-off-by: Audrow Nash Signed-off-by: Andrea Sorbini Signed-off-by: Chris Lalancette Co-authored-by: Ivan Santiago Paunovic Co-authored-by: Chen Lihui Co-authored-by: Andrea Sorbini Co-authored-by: mauropasse Co-authored-by: Jacob Perron Co-authored-by: Audrow Nash Co-authored-by: Steven! Ragnarök Co-authored-by: Steven! Ragnarök Co-authored-by: iRobot ROS <49500531+irobot-ros@users.noreply.github.com> Co-authored-by: Mauro Passerino Co-authored-by: Alberto Soragna Co-authored-by: Chris Lalancette Co-authored-by: Tomoya Fujita --- rmw_connextdds/CHANGELOG.rst | 15 + rmw_connextdds/CMakeLists.txt | 2 +- rmw_connextdds/package.xml | 2 +- rmw_connextdds/src/rmw_api_impl_ndds.cpp | 37 + rmw_connextdds_common/CHANGELOG.rst | 18 + rmw_connextdds_common/CMakeLists.txt | 11 + .../rmw_connextdds/custom_sql_filter.hpp | 66 ++ .../include/rmw_connextdds/dds_api.hpp | 18 + .../include/rmw_connextdds/dds_api_ndds.hpp | 12 - .../include/rmw_connextdds/rmw_api_impl.hpp | 21 + .../include/rmw_connextdds/rmw_impl.hpp | 22 + .../rmw_connextdds/rmw_waitset_std.hpp | 29 +- .../include/rmw_connextdds/static_config.hpp | 15 + .../include/rmw_connextdds/type_support.hpp | 30 +- rmw_connextdds_common/package.xml | 2 +- .../src/common/rmw_context.cpp | 25 +- rmw_connextdds_common/src/common/rmw_impl.cpp | 130 ++- .../src/common/rmw_impl_waitset_std.cpp | 12 +- .../src/common/rmw_listener.cpp | 2 +- .../src/common/rmw_subscription.cpp | 48 ++ .../src/common/rmw_type_support.cpp | 33 + .../src/ndds/custom_sql_filter.cpp | 811 ++++++++++++++++++ .../src/ndds/dds_api_ndds.cpp | 264 +++++- .../src/ndds/rmw_type_support_ndds.cpp | 105 +-- .../src/rtime/dds_api_rtime.cpp | 39 + .../src/rtime/rmw_type_support_rtime.cpp | 68 +- rmw_connextddsmicro/CHANGELOG.rst | 15 + rmw_connextddsmicro/package.xml | 2 +- .../src/rmw_api_impl_rtime.cpp | 42 + rti_connext_dds_cmake_module/CHANGELOG.rst | 9 + rti_connext_dds_cmake_module/package.xml | 2 +- 31 files changed, 1752 insertions(+), 155 deletions(-) create mode 100644 rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp create mode 100644 rmw_connextdds_common/src/ndds/custom_sql_filter.cpp diff --git a/rmw_connextdds/CHANGELOG.rst b/rmw_connextdds/CHANGELOG.rst index f199c3a9..5ee6744e 100644 --- a/rmw_connextdds/CHANGELOG.rst +++ b/rmw_connextdds/CHANGELOG.rst @@ -2,6 +2,21 @@ Changelog for package rmw_connextdds ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.11.1 (2022-04-26) +------------------- + +0.11.0 (2022-04-08) +------------------- +* Exclude missing sample info fields when building rmw_connextddsmicro (`#79 `_) +* Update launch_testing_ros output filter prefixes for Connext6 (`#80 `_) +* Contributors: Andrea Sorbini, Ivan Santiago Paunovic + +0.10.0 (2022-03-28) +------------------- +* Add support for user-specified content filters (`#68 `_) +* add stub for content filtered topic (`#77 `_) +* Contributors: Andrea Sorbini, Chen Lihui, Ivan Santiago Paunovic + 0.9.0 (2022-03-01) ------------------ * Add rmw listener apis (`#44 `_) diff --git a/rmw_connextdds/CMakeLists.txt b/rmw_connextdds/CMakeLists.txt index c5a4174d..37c10026 100644 --- a/rmw_connextdds/CMakeLists.txt +++ b/rmw_connextdds/CMakeLists.txt @@ -73,4 +73,4 @@ ament_package( ) ament_index_register_resource("rmw_output_prefixes" - CONTENT "RTI Data Distribution Service\nExpires on") \ No newline at end of file + CONTENT "RTI Data Distribution Service\nRTI Connext DDS\nExpires on") \ No newline at end of file diff --git a/rmw_connextdds/package.xml b/rmw_connextdds/package.xml index 8c96eeb2..de5f3304 100644 --- a/rmw_connextdds/package.xml +++ b/rmw_connextdds/package.xml @@ -2,7 +2,7 @@ rmw_connextdds - 0.9.0 + 0.11.1 A ROS2 RMW implementation built with RTI Connext DDS Professional. Andrea Sorbini Apache License 2.0 diff --git a/rmw_connextdds/src/rmw_api_impl_ndds.cpp b/rmw_connextdds/src/rmw_api_impl_ndds.cpp index 49f8c6f1..ae9e9356 100644 --- a/rmw_connextdds/src/rmw_api_impl_ndds.cpp +++ b/rmw_connextdds/src/rmw_api_impl_ndds.cpp @@ -716,6 +716,24 @@ rmw_subscription_get_actual_qos( return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos); } +rmw_ret_t +rmw_subscription_set_content_filter( + rmw_subscription_t * subscription, + const rmw_subscription_content_filter_options_t * options) +{ + return rmw_api_connextdds_subscription_set_content_filter( + subscription, options); +} + +rmw_ret_t +rmw_subscription_get_content_filter( + const rmw_subscription_t * subscription, + rcutils_allocator_t * allocator, + rmw_subscription_content_filter_options_t * options) +{ + return rmw_api_connextdds_subscription_get_content_filter( + subscription, allocator, options); +} rmw_ret_t rmw_destroy_subscription( @@ -937,3 +955,22 @@ rmw_subscription_get_network_flow_endpoints( allocator, network_flow_endpoint_array); } + +/****************************************************************************** + * Feature support functions + ******************************************************************************/ +bool +rmw_feature_supported(rmw_feature_t feature) +{ + switch (feature) { + case RMW_FEATURE_MESSAGE_INFO_RECEPTION_SEQUENCE_NUMBER: + case RMW_FEATURE_MESSAGE_INFO_PUBLICATION_SEQUENCE_NUMBER: + { + return true; + } + default: + { + return false; + } + } +} diff --git a/rmw_connextdds_common/CHANGELOG.rst b/rmw_connextdds_common/CHANGELOG.rst index b5fedb66..9c351153 100644 --- a/rmw_connextdds_common/CHANGELOG.rst +++ b/rmw_connextdds_common/CHANGELOG.rst @@ -2,6 +2,24 @@ Changelog for package rmw_connextdds_common ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.11.1 (2022-04-26) +------------------- +* Resolve build error with RTI Connext DDS 5.3.1 (`#82 `_) +* Contributors: Andrea Sorbini + +0.11.0 (2022-04-08) +------------------- +* Exclude missing sample info fields when building rmw_connextddsmicro (`#79 `_) +* Properly initialize CDR stream before using it for filtering (`#81 `_) +* Contributors: Andrea Sorbini + +0.10.0 (2022-03-28) +------------------- +* Add support for user-specified content filters (`#68 `_) +* add stub for content filtered topic (`#77 `_) +* Add sequence numbers to message info structure (`#74 `_) +* Contributors: Andrea Sorbini, Chen Lihui, Ivan Santiago Paunovic + 0.9.0 (2022-03-01) ------------------ * Add rmw listener apis (`#44 `_) diff --git a/rmw_connextdds_common/CMakeLists.txt b/rmw_connextdds_common/CMakeLists.txt index f26c3a14..aea621d6 100644 --- a/rmw_connextdds_common/CMakeLists.txt +++ b/rmw_connextdds_common/CMakeLists.txt @@ -62,6 +62,11 @@ function(rtirmw_add_library) ament_target_dependencies(${_rti_build_NAME} ${_rti_build_DEPS}) + set(_extra_defines) + if("${CMAKE_BUILD_TYPE}" MATCHES "[dD]ebug") + list(APPEND _extra_defines "RMW_CONNEXT_DEBUG=1") + endif() + target_compile_definitions(${_rti_build_NAME} PUBLIC RMW_VERSION_MAJOR=${rmw_VERSION_MAJOR} @@ -69,6 +74,7 @@ function(rtirmw_add_library) RMW_VERSION_PATCH=${rmw_VERSION_PATCH} RMW_CONNEXT_DDS_API=RMW_CONNEXT_DDS_API_${_rti_build_API} ${_rti_build_DEFINES} + ${_extra_defines} ) set(private_defines) @@ -188,6 +194,9 @@ else() if("${CONNEXTDDS_VERSION}" VERSION_LESS "6.0.0") list(APPEND extra_defines "RMW_CONNEXT_DDS_API_PRO_LEGACY=1") endif() + if(CONNEXTDDS_ARCH MATCHES ".*Win.*") + list(APPEND extra_defines "RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE=1") + endif() rtirmw_add_library( NAME ${PROJECT_NAME}_pro API PRO @@ -196,8 +205,10 @@ else() src/ndds/rmw_type_support_ndds.cpp src/ndds/rmw_typecode.cpp src/ndds/dds_api_ndds.cpp + src/ndds/custom_sql_filter.cpp include/rmw_connextdds/typecode.hpp include/rmw_connextdds/dds_api_ndds.hpp + include/rmw_connextdds/custom_sql_filter.hpp DEPS ${RMW_CONNEXT_DEPS} LIBRARIES RTIConnextDDS::c_api DEFINES ${extra_defines}) diff --git a/rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp b/rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp new file mode 100644 index 00000000..7c134205 --- /dev/null +++ b/rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp @@ -0,0 +1,66 @@ +// Copyright 2021 Real-Time Innovations, Inc. (RTI) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_ +#define RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_ + +#include "rmw_connextdds/dds_api.hpp" + +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + +namespace rti_connext_dds_custom_sql_filter +{ + +struct CustomSqlFilterData +{ + DDS_SqlFilterGeneratorQos base; + + CustomSqlFilterData(); + + DDS_ReturnCode_t + set_memory_management_property( + const DDS_DomainParticipantQos & dp_qos); +}; + +RMW_CONNEXTDDS_PUBLIC +DDS_ReturnCode_t +register_content_filter( + DDS_DomainParticipant * const participant, + CustomSqlFilterData * const filter_data); + +RMW_CONNEXTDDS_PUBLIC +extern const char * const PLUGIN_NAME; + +} // namespace rti_connext_dds_custom_sql_filter + +#if !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +extern "C" { +// This is an internal function from RTI Connext DDS which allows a filter to +// be registered as "built-in". We need this because we want this custom filter +// to be a replacement for the built-in SQL-like filter. +RMW_CONNEXTDDS_PUBLIC +DDS_ReturnCode_t +DDS_ContentFilter_register_filter( + DDS_DomainParticipant * participant, + const char * name, + const struct DDS_ContentFilter * filter, + const DDS_ContentFilterEvaluateFunction evaluateOnSerialized, + const DDS_ContentFilterWriterEvaluateFunction writerEvaluateOnSerialized, + const DDS_ContentFilterQueryFunction query, + DDS_Boolean isBuiltin); +} +#endif // RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + +#endif // RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index 50542a6b..11b052f5 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -65,6 +65,11 @@ rmw_connextdds_initialize_participant_qos_impl( rmw_context_impl_t * const ctx, DDS_DomainParticipantQos * const dp_qos); +rmw_ret_t +rmw_connextdds_configure_participant( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const participant); + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -72,6 +77,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out); rmw_ret_t @@ -263,4 +269,16 @@ rmw_connextdds_enable_security( DDS_SECURITY_PROPERTY_PREFIX ".logging.log_level" #endif /* DDS_SECURITY_LOGGING_LEVEL_PROPERTY */ +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const cft_expression, + const rcutils_string_array_t * const cft_expression_parameters); + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * const options); + #endif // RMW_CONNEXTDDS__DDS_API_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp index a40b5b30..1b0305c1 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp @@ -19,18 +19,6 @@ #include "rcutils/types.h" -DDS_SEQUENCE(RMW_Connext_Uint8ArrayPtrSeq, rcutils_uint8_array_t *); - -typedef RMW_Connext_Uint8ArrayPtrSeq RMW_Connext_UntypedSampleSeq; - -#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER - -#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \ - *RMW_Connext_Uint8ArrayPtrSeq_get_reference(seq_, i_) - -#define DDS_UntypedSampleSeq_get_length(seq_) \ - RMW_Connext_Uint8ArrayPtrSeq_get_length(seq_) - #if RMW_CONNEXT_DDS_API_PRO_LEGACY #ifndef RTIXCdrLong_MAX #define RTIXCdrLong_MAX 2147483647 diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp index 6f531a61..df8e8efd 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp @@ -17,6 +17,8 @@ #include "rmw_connextdds/context.hpp" +#include "rmw/features.h" + /***************************************************************************** * Context API *****************************************************************************/ @@ -481,6 +483,18 @@ rmw_api_connextdds_subscription_get_actual_qos( const rmw_subscription_t * subscription, rmw_qos_profile_t * qos); +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_set_content_filter( + rmw_subscription_t * subscription, + const rmw_subscription_content_filter_options_t * options); + +RMW_CONNEXTDDS_PUBLIC +rmw_ret_t +rmw_api_connextdds_subscription_get_content_filter( + const rmw_subscription_t * subscription, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * options); RMW_CONNEXTDDS_PUBLIC rmw_ret_t @@ -637,4 +651,11 @@ rmw_api_connextdds_subscription_get_network_flow_endpoints( rcutils_allocator_t * allocator, rmw_network_flow_endpoint_array_t * network_flow_endpoint_array); +/****************************************************************************** + * Feature support functions + ******************************************************************************/ +RMW_CONNEXTDDS_PUBLIC +bool +rmw_api_connextdds_feature_supported(rmw_feature_t feature); + #endif // RMW_CONNEXTDDS__RMW_API_IMPL_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index cff0592e..09a0b369 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -438,6 +438,15 @@ class RMW_Connext_Subscriber rmw_message_info_t * const message_info, bool * const taken); + rmw_ret_t + set_content_filter( + const rmw_subscription_content_filter_options_t * const options); + + rmw_ret_t + get_content_filter( + rcutils_allocator_t * allocator, + rmw_subscription_content_filter_options_t * const options); + bool has_data() { @@ -494,6 +503,17 @@ class RMW_Connext_Subscriber return this->dds_topic; } + static std::string get_atomic_id() + { + static std::atomic_uint64_t id; + return std::to_string(id++); + } + + bool is_cft_enabled() + { + return !this->cft_expression.empty(); + } + const bool internal; const bool ignore_local; @@ -502,6 +522,7 @@ class RMW_Connext_Subscriber DDS_DataReader * dds_reader; DDS_Topic * dds_topic; DDS_TopicDescription * dds_topic_cft; + std::string cft_expression; RMW_Connext_MessageTypeSupport * type_support; rmw_gid_t ros_gid; const bool created_topic; @@ -520,6 +541,7 @@ class RMW_Connext_Subscriber const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, + const char * const cft_expression, const bool internal); friend class RMW_Connext_SubscriberStatusCondition; diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 959eddc0..c0b925ec 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -350,18 +350,19 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition has_status(const rmw_event_type_t event_type) = 0; void - notify_new_event() + notify_new_event(rmw_event_type_t event_type) { std::unique_lock lock_mutex(new_event_mutex_); - if (new_event_cb_) { - new_event_cb_(user_data_, 1); + if (new_event_cb_[event_type]) { + new_event_cb_[event_type](user_data_[event_type], 1); } else { - unread_events_count_++; + unread_events_count_[event_type]++; } } void set_new_event_callback( + rmw_event_type_t event_type, rmw_event_callback_t callback, const void * user_data) { @@ -369,24 +370,24 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition if (callback) { // Push events arrived before setting the executor's callback - if (unread_events_count_ > 0) { - callback(user_data, unread_events_count_); - unread_events_count_ = 0; + if (unread_events_count_[event_type] > 0) { + callback(user_data, unread_events_count_[event_type]); + unread_events_count_[event_type] = 0; } - user_data_ = user_data; - new_event_cb_ = callback; + user_data_[event_type] = user_data; + new_event_cb_[event_type] = callback; } else { - user_data_ = nullptr; - new_event_cb_ = nullptr; + user_data_[event_type] = nullptr; + new_event_cb_[event_type] = nullptr; } } protected: DDS_StatusCondition * scond; std::mutex new_event_mutex_; - rmw_event_callback_t new_event_cb_{nullptr}; - const void * user_data_{nullptr}; - uint64_t unread_events_count_ = 0; + rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {}; + const void * user_data_[RMW_EVENT_INVALID] = {}; + uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0}; }; void diff --git a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp index b5f1022f..120e5790 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp @@ -15,6 +15,13 @@ #ifndef RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ #define RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ +/****************************************************************************** + * Debug flags + ******************************************************************************/ +#ifndef RMW_CONNEXT_DEBUG +#define RMW_CONNEXT_DEBUG 0 +#endif // RMW_CONNEXT_DEBUG + /****************************************************************************** * Default User Configuration ******************************************************************************/ @@ -311,6 +318,14 @@ #define RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE 1 #endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */ +/****************************************************************************** + * On windows, the custom SQL filter cannot be registered as "built-in", so we + * must enable some additional code to register it as a user plugin. + ******************************************************************************/ +#ifndef RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +#define RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE 0 +#endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */ + #include "resource_limits.hpp" #endif // RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp index 41d3bd8e..284148de 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp @@ -167,11 +167,21 @@ class RMW_Connext_MessageTypeSupport struct RMW_Connext_Message { - const void * user_data; - bool serialized; - RMW_Connext_MessageTypeSupport * type_support; + const void * user_data{nullptr}; + bool serialized{false}; + RMW_Connext_MessageTypeSupport * type_support{nullptr}; + rcutils_uint8_array_t data_buffer; }; +rmw_ret_t +RMW_Connext_Message_initialize( + RMW_Connext_Message * const self, + RMW_Connext_MessageTypeSupport * const type_support, + const size_t data_buffer_size); + +void +RMW_Connext_Message_finalize(RMW_Connext_Message * const self); + class RMW_Connext_ServiceTypeSupportWrapper { public: @@ -276,5 +286,19 @@ class RMW_Connext_ServiceTypeSupportWrapper const rosidl_service_type_support_t * const type_supports); }; +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO +DDS_SEQUENCE(RMW_Connext_MessagePtrSeq, RMW_Connext_Message *); + +typedef RMW_Connext_MessagePtrSeq RMW_Connext_UntypedSampleSeq; + +#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER + +#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \ + *RMW_Connext_MessagePtrSeq_get_reference(seq_, i_) + +#define DDS_UntypedSampleSeq_get_length(seq_) \ + RMW_Connext_MessagePtrSeq_get_length(seq_) + +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO #endif // RMW_CONNEXTDDS__TYPE_SUPPORT_HPP_ diff --git a/rmw_connextdds_common/package.xml b/rmw_connextdds_common/package.xml index a42b52d0..43bf1e7e 100644 --- a/rmw_connextdds_common/package.xml +++ b/rmw_connextdds_common/package.xml @@ -2,7 +2,7 @@ rmw_connextdds_common - 0.9.0 + 0.11.1 Common source for RMW implementations built with RTI Connext DDS Professional and RTI Connext DDS Micro. Andrea Sorbini Apache License 2.0 diff --git a/rmw_connextdds_common/src/common/rmw_context.cpp b/rmw_connextdds_common/src/common/rmw_context.cpp index 54102045..4ca175f7 100644 --- a/rmw_connextdds_common/src/common/rmw_context.cpp +++ b/rmw_connextdds_common/src/common/rmw_context.cpp @@ -218,6 +218,12 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only) return RMW_RET_ERROR; } + rmw_ret_t cfg_rc = rmw_connextdds_configure_participant(this, this->participant); + if (RMW_RET_OK != cfg_rc) { + RMW_CONNEXT_LOG_ERROR("failed to configure DDS participant") + return cfg_rc; + } + /* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers created to support RMW publishers/subscriptions. */ @@ -332,7 +338,21 @@ rmw_ret_t rmw_context_impl_t::finalize_participant() { RMW_CONNEXT_LOG_DEBUG("finalizing DDS DomainParticipant") - +#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + // If we are building in Debug mode, an issue in Connext may prevent the + // participant from being able to delete any content-filtered topic if + // the participant has not been enabled. + // For this reason, make sure to enable the participant before trying to + // finalize it. + // TODO(asorbini) reconsider the need for this code in Connext > 6.1.0 + if (DDS_RETCODE_OK != + DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant))) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to enable DomainParticipant before deletion") + return RMW_RET_ERROR; + } +#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO if (RMW_RET_OK != rmw_connextdds_graph_finalize(this)) { RMW_CONNEXT_LOG_ERROR("failed to finalize graph cache") return RMW_RET_ERROR; @@ -386,7 +406,7 @@ rmw_context_impl_t::finalize_participant() if (nullptr != this->participant) { // If we are cleaning up after some RMW failure, it is possible for some // DataWriter to not have been deleted. - // Call DDS_Publisher_delete_contained_entities() to make sure we can + // Call DDS_DomainParticipant_delete_contained_entities() to make sure we can // dispose the publisher. if (DDS_RETCODE_OK != DDS_DomainParticipant_delete_contained_entities(this->participant)) @@ -402,6 +422,7 @@ rmw_context_impl_t::finalize_participant() RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS participant") return RMW_RET_ERROR; } + this->participant = nullptr; } diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 24d81333..c453bf71 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -29,6 +29,7 @@ const char * const ROS_TOPIC_PREFIX = "rt"; const char * const ROS_SERVICE_REQUESTER_PREFIX = ROS_SERVICE_REQUESTER_PREFIX_STR; const char * const ROS_SERVICE_RESPONSE_PREFIX = ROS_SERVICE_RESPONSE_PREFIX_STR; +const char * const ROS_CFT_TOPIC_NAME_INFIX = "_ContentFilterTopic"; std::string rmw_connextdds_create_topic_name( @@ -903,9 +904,11 @@ RMW_Connext_Publisher::write( int64_t * const sn_out) { RMW_Connext_Message user_msg; + if (RMW_RET_OK != RMW_Connext_Message_initialize(&user_msg, this->type_support, 0)) { + return RMW_RET_ERROR; + } user_msg.user_data = ros_message; user_msg.serialized = serialized; - user_msg.type_support = this->type_support; return rmw_connextdds_write_message(this, &user_msg, sn_out); } @@ -947,6 +950,11 @@ rmw_ret_t RMW_Connext_Publisher::wait_for_all_acked(rmw_time_t wait_timeout) { DDS_Duration_t timeout = rmw_connextdds_duration_from_ros_time(&wait_timeout); +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_MICRO + // Avoid warnings for unused variable in micro, since wait_for_ack() is + // mapped to an empty call. + UNUSED_ARG(timeout); +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_MICRO const DDS_ReturnCode_t dds_rc = DDS_DataWriter_wait_for_acknowledgments(this->dds_writer, &timeout); @@ -1129,6 +1137,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( const bool ignore_local, const bool created_topic, DDS_TopicDescription * const dds_topic_cft, + const char * const cft_expression, const bool internal) : internal(internal), ignore_local(ignore_local), @@ -1136,6 +1145,7 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( dds_reader(dds_reader), dds_topic(dds_topic), dds_topic_cft(dds_topic_cft), + cft_expression(cft_expression), type_support(type_support), created_topic(created_topic), status_condition(dds_reader, ignore_local, internal) @@ -1236,7 +1246,7 @@ RMW_Connext_Subscriber::create( } auto scope_exit_topic_delete = rcpputils::make_scope_exit( - [ctx, topic_created, dp, topic, cft_topic]() + [ctx, &topic_created, dp, &topic, &cft_topic]() { if (nullptr != cft_topic) { if (RMW_RET_OK != @@ -1256,19 +1266,40 @@ RMW_Connext_Subscriber::create( }); DDS_TopicDescription * sub_topic = DDS_Topic_as_topicdescription(topic); + std::string sub_cft_name; + const char * sub_cft_expr = ""; + const rcutils_string_array_t * sub_cft_params = nullptr; if (nullptr != cft_name) { - rmw_ret_t cft_rc = - rmw_connextdds_create_contentfilteredtopic( - ctx, dp, topic, cft_name, cft_filter, &cft_topic); + sub_cft_name = cft_name; + sub_cft_expr = cft_filter; + } else { + sub_cft_name = + fqtopic_name + ROS_CFT_TOPIC_NAME_INFIX + RMW_Connext_Subscriber::get_atomic_id(); + if (nullptr != subscriber_options->content_filter_options) { + sub_cft_expr = + subscriber_options->content_filter_options->filter_expression; + sub_cft_params = + &subscriber_options->content_filter_options->expression_parameters; + } + } - if (RMW_RET_OK != cft_rc) { - if (RMW_RET_UNSUPPORTED != cft_rc) { - return nullptr; - } - } else { - sub_topic = cft_topic; + rmw_ret_t cft_rc = + rmw_connextdds_create_contentfilteredtopic( + ctx, + dp, + topic, + sub_cft_name.c_str(), + sub_cft_expr, + sub_cft_params, + &cft_topic); + + if (RMW_RET_OK != cft_rc) { + if (RMW_RET_UNSUPPORTED != cft_rc) { + return nullptr; } + } else { + sub_topic = cft_topic; } // The following initialization generates warnings when built @@ -1337,15 +1368,16 @@ RMW_Connext_Subscriber::create( subscriber_options->ignore_local_publications, topic_created, cft_topic, + sub_cft_expr, internal); if (nullptr == rmw_sub_impl) { RMW_CONNEXT_LOG_ERROR_SET("failed to allocate RMW subscriber") return nullptr; } - scope_exit_type_unregister.cancel(); - scope_exit_topic_delete.cancel(); scope_exit_dds_reader_delete.cancel(); + scope_exit_topic_delete.cancel(); + scope_exit_type_unregister.cancel(); return rmw_sub_impl; } @@ -1529,6 +1561,30 @@ RMW_Connext_Subscriber::take_serialized( return rc; } + +rmw_ret_t +RMW_Connext_Subscriber::set_content_filter( + const rmw_subscription_content_filter_options_t * const options) +{ + if (RMW_RET_OK != + rmw_connextdds_set_cft_filter_expression( + this->dds_topic_cft, options->filter_expression, &options->expression_parameters)) + { + return RMW_RET_ERROR; + } + + this->cft_expression = options->filter_expression; + return RMW_RET_OK; +} + +rmw_ret_t +RMW_Connext_Subscriber::get_content_filter( + rcutils_allocator_t * allocator, + rmw_subscription_content_filter_options_t * const options) +{ + return rmw_connextdds_get_cft_filter_expression(this->dds_topic_cft, allocator, options); +} + rmw_ret_t RMW_Connext_Subscriber::loan_messages(const bool update_condition) { @@ -1634,8 +1690,8 @@ RMW_Connext_Subscriber::take_next( RMW_RET_OK == rc_exit; this->loan_next++) { - rcutils_uint8_array_t * data_buffer = - reinterpret_cast( + RMW_Connext_Message * msg = + reinterpret_cast( DDS_UntypedSampleSeq_get_reference( &this->loan_data, static_cast(this->loan_next))); DDS_SampleInfo * info = @@ -1656,7 +1712,7 @@ RMW_Connext_Subscriber::take_next( if (RMW_RET_OK != this->type_support->deserialize( - ros_message, data_buffer, deserialized_size, true /* header_only */)) + ros_message, &msg->data_buffer, deserialized_size, true /* header_only */)) { RMW_CONNEXT_LOG_ERROR_SET("failed to deserialize taken sample") rc_exit = RMW_RET_ERROR; @@ -1695,7 +1751,7 @@ RMW_Connext_Subscriber::take_next( if (RCUTILS_RET_OK != rcutils_uint8_array_copy( reinterpret_cast(ros_message), - data_buffer)) + &msg->data_buffer)) { RMW_CONNEXT_LOG_ERROR_SET("failed to copy uint8 array") rc_exit = RMW_RET_ERROR; @@ -1706,7 +1762,7 @@ RMW_Connext_Subscriber::take_next( if (RMW_RET_OK != this->type_support->deserialize( - ros_message, data_buffer, deserialized_size)) + ros_message, &msg->data_buffer, deserialized_size)) { RMW_CONNEXT_LOG_ERROR_SET( "failed to deserialize taken sample") @@ -1771,7 +1827,6 @@ rmw_connextdds_create_subscriber( "failed to allocate RMW_Connext_Subscriber") return nullptr; } - auto scope_exit_rmw_reader_impl_delete = rcpputils::make_scope_exit( [rmw_sub_impl]() @@ -1782,6 +1837,25 @@ rmw_connextdds_create_subscriber( } delete rmw_sub_impl; }); +#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + auto scope_exit_enable_participant_on_error = + rcpputils::make_scope_exit( + [ctx]() + { + // If we are building in Debug mode, an issue in Connext may prevent the + // participant from being able to delete any content-filtered topic if + // the participant has not been enabled. + // For this reason, make sure to enable the participant before trying to + // finalize it. + // TODO(asorbini) reconsider the need for this code in Connext > 6.1.0 + if (DDS_RETCODE_OK != + DDS_Entity_enable(DDS_DomainParticipant_as_entity(ctx->participant))) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to enable DomainParticipant on subscriber creation error") + } + }); +#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO rmw_subscription_t * rmw_subscriber = rmw_subscription_allocate(); if (nullptr == rmw_subscriber) { @@ -1815,6 +1889,7 @@ rmw_connextdds_create_subscriber( topic_name_len + 1); rmw_subscriber->options = *subscriber_options; rmw_subscriber->can_loan_messages = false; + rmw_subscriber->is_cft_enabled = rmw_sub_impl->is_cft_enabled(); if (!internal) { if (RMW_RET_OK != rmw_sub_impl->enable()) { @@ -1831,6 +1906,9 @@ rmw_connextdds_create_subscriber( } } +#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + scope_exit_enable_participant_on_error.cancel(); +#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO scope_exit_rmw_reader_impl_delete.cancel(); scope_exit_rmw_reader_delete.cancel(); return rmw_subscriber; @@ -1885,6 +1963,20 @@ rmw_connextdds_message_info_from_dds( to->source_timestamp = 0; to->received_timestamp = 0; #endif /* !RTI_WIN32 */ + // Currently we cannot use `rmw_connextdds_sn_dds_to_ros`, as that was used to convert to + // `rmw_request_id_t.sequence_number`, which is an int64_t and not an uint64_t. + // When rmw is updated and all sequence numbers are a `uint64_t`, + // rmw_connextdds_sn_dds_to_ros() should be updated and used everywhere. + to->publication_sequence_number = + static_cast((from->publication_sequence_number).high) << 32 | + static_cast((from->publication_sequence_number).low); +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + to->reception_sequence_number = + static_cast((from->reception_sequence_number).high) << 32 | + static_cast((from->reception_sequence_number).low); +#else + to->reception_sequence_number = RMW_MESSAGE_INFO_SEQUENCE_NUMBER_UNSUPPORTED; +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO } /****************************************************************************** diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index 3bb58623..6f3ae332 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -708,7 +708,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; - this->notify_new_event(); + this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED); } void @@ -724,7 +724,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness( this->status_liveliness.not_alive_count_change -= this->status_liveliness_last.not_alive_count; - this->notify_new_event(); + this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED); } void @@ -736,6 +736,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos( this->status_qos.total_count_change = this->status_qos.total_count; this->status_qos.total_count_change -= this->status_qos_last.total_count; + this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE); } void @@ -748,6 +749,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost( this->status_sample_lost.total_count_change = this->status_sample_lost.total_count; this->status_sample_lost.total_count_change -= this->status_sample_lost_last.total_count; + this->notify_new_event(RMW_EVENT_MESSAGE_LOST); } void @@ -882,7 +884,7 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; - this->notify_new_event(); + this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED); } void @@ -895,7 +897,7 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness( this->status_liveliness.total_count_change = this->status_liveliness.total_count; this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count; - this->notify_new_event(); + this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED); } void @@ -907,4 +909,6 @@ RMW_Connext_PublisherStatusCondition::update_status_qos( this->status_qos.total_count_change = this->status_qos.total_count; this->status_qos.total_count_change -= this->status_qos_last.total_count; + + this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE); } diff --git a/rmw_connextdds_common/src/common/rmw_listener.cpp b/rmw_connextdds_common/src/common/rmw_listener.cpp index d0a7958a..1b907b8a 100644 --- a/rmw_connextdds_common/src/common/rmw_listener.cpp +++ b/rmw_connextdds_common/src/common/rmw_listener.cpp @@ -36,7 +36,7 @@ rmw_api_connextdds_event_set_callback( } else { condition = RMW_Connext_Event::publisher(event)->condition(); } - condition->set_new_event_callback(callback, user_data); + condition->set_new_event_callback(event->event_type, callback, user_data); return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/common/rmw_subscription.cpp b/rmw_connextdds_common/src/common/rmw_subscription.cpp index 97f6a779..698ccb3e 100644 --- a/rmw_connextdds_common/src/common/rmw_subscription.cpp +++ b/rmw_connextdds_common/src/common/rmw_subscription.cpp @@ -166,6 +166,54 @@ rmw_api_connextdds_subscription_get_actual_qos( return sub_impl->qos(qos); } +rmw_ret_t +rmw_api_connextdds_subscription_set_content_filter( + rmw_subscription_t * subscription, + const rmw_subscription_content_filter_options_t * options) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + rmw_ret_t rc = sub_impl->set_content_filter(options); + subscription->is_cft_enabled = sub_impl->is_cft_enabled(); + + return rc; +} + + +rmw_ret_t +rmw_api_connextdds_subscription_get_content_filter( + const rmw_subscription_t * subscription, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * options) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(allocator, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(options, RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + + if (!sub_impl->is_cft_enabled()) { + RMW_CONNEXT_LOG_ERROR_SET("no content-filter associated with subscription") + return RMW_RET_ERROR; + } + + return sub_impl->get_content_filter(allocator, options); +} rmw_ret_t rmw_api_connextdds_destroy_subscription( diff --git a/rmw_connextdds_common/src/common/rmw_type_support.cpp b/rmw_connextdds_common/src/common/rmw_type_support.cpp index 9209dc68..8055398b 100644 --- a/rmw_connextdds_common/src/common/rmw_type_support.cpp +++ b/rmw_connextdds_common/src/common/rmw_type_support.cpp @@ -813,3 +813,36 @@ RMW_Connext_ServiceTypeSupportWrapper::get_response_type_name( return rmw_connextdds_create_type_name_response(svc_callbacks); } + + +rmw_ret_t +RMW_Connext_Message_initialize( + RMW_Connext_Message * const self, + RMW_Connext_MessageTypeSupport * const type_support, + const size_t data_buffer_size) +{ + const rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + self->user_data = nullptr; + self->serialized = false; + self->type_support = type_support; + + if (RCUTILS_RET_OK != + rcutils_uint8_array_init(&self->data_buffer, data_buffer_size, &allocator)) + { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to initialize message buffer: size=%lu", + data_buffer_size) + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +void +RMW_Connext_Message_finalize(RMW_Connext_Message * const self) +{ + if (RCUTILS_RET_OK != rcutils_uint8_array_fini(&self->data_buffer)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize uint8 array") + } +} diff --git a/rmw_connextdds_common/src/ndds/custom_sql_filter.cpp b/rmw_connextdds_common/src/ndds/custom_sql_filter.cpp new file mode 100644 index 00000000..e42312e7 --- /dev/null +++ b/rmw_connextdds_common/src/ndds/custom_sql_filter.cpp @@ -0,0 +1,811 @@ +// Copyright 2021 Real-Time Innovations, Inc. (RTI) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" + +#include "rmw_connextdds/custom_sql_filter.hpp" +#include "rmw_connextdds/type_support.hpp" + +#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO + +struct RTI_CustomSqlFilterProgram +{ + void * base{nullptr}; + std::string expression; + std::string type_name; + DDS_StringSeq expression_params DDS_SEQUENCE_INITIALIZER; + ~RTI_CustomSqlFilterProgram() + { + DDS_StringSeq_finalize(&this->expression_params); + } +}; + +struct RTI_CustomSqlFilterWriterData +{ + void * base{nullptr}; + size_t filtered_readers_count{0}; + size_t unfiltered_readers_count{0}; + std::vector unfiltered_readers_buffer; + std::vector matched_readers_buffer; + DDS_CookieSeq matched_readers DDS_SEQUENCE_INITIALIZER; + REDASkiplistDescription readers_desc; + REDASkiplist readers; + std::vector serialize_buffer; +}; + +struct RTI_CustomSqlFilterReaderData +{ + DDS_Cookie_t cookie; + REDAWeakReference wr; + std::string expression; + + explicit RTI_CustomSqlFilterReaderData(REDAWeakReference * const wr) + : cookie(DDS_COOKIE_DEFAULT), + wr(*wr) + { + DDS_Cookie_t_initialize(&this->cookie); + DDS_OctetSeq_loan_contiguous( + &this->cookie.value, + reinterpret_cast(&this->wr), + sizeof(struct REDAWeakReference), + sizeof(struct REDAWeakReference)); + } +}; + +using RTI_CustomSqlFilterData = rti_connext_dds_custom_sql_filter::CustomSqlFilterData; + +RTI_CustomSqlFilterData::CustomSqlFilterData() + : base(DDS_SQLFILTER_QOS_DEFAULT) +{ +} + +DDS_ReturnCode_t +RTI_CustomSqlFilterData::set_memory_management_property( + const DDS_DomainParticipantQos & dp_qos) +{ + static const DDS_SqlFilterMemoryManagementQos DEFAULT = + DDS_SqlFilterMemoryManagementQos_INITIALIZER; + this->base.memory_management = DEFAULT; + + auto properties = const_cast(&dp_qos.property); + + const DDS_Property_t * property = DDS_PropertyQosPolicyHelper_lookup_property( + properties, + DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_MIN_BUFFER_SIZE_PROPERTY_NAME); + + if (nullptr != property) { + this->base.memory_management.buffer_min_size = strtol(property->value, nullptr, 0); + } + + property = DDS_PropertyQosPolicyHelper_lookup_property( + properties, + DDS_CONTENT_FILTER_SQL_DESERIALIZED_SAMPLE_TRIM_TO_SIZE_PROPERTY_NAME); + + if (nullptr != property) { + if (!REDAString_iCompare(property->value, "1") || + !REDAString_iCompare(property->value, "true") || + !REDAString_iCompare(property->value, "yes")) + { + this->base.memory_management.trim_buffer = DDS_BOOLEAN_TRUE; + } + } + + return DDS_RETCODE_OK; +} + +static +DDS_ReturnCode_t +RTI_CustomSqlFilter_compile( + void * filter_data, + void ** newHandle, + const char * expression, + const struct DDS_StringSeq * param_seq, + const struct DDS_TypeCode * typeCode, + const char * typeClassName, + void * oldhandle) +{ + *newHandle = nullptr; + + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + + RTI_CustomSqlFilterProgram * program = + new (std::nothrow) RTI_CustomSqlFilterProgram(); + if (nullptr == program) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + auto scope_exit_program = rcpputils::make_scope_exit( + [program]() + { + delete program; + }); + + RTI_CustomSqlFilterProgram * old_program = + static_cast(oldhandle); + void * const old_program_base = + (nullptr != old_program) ? old_program->base : nullptr; + + program->type_name = typeClassName; + program->expression = expression; + if (!DDS_StringSeq_copy(&program->expression_params, param_seq)) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + if (!program->expression.empty()) { + const DDS_ReturnCode_t retcode = + DDS_SqlFilter_compile( + &cft_data->base, + &program->base, + expression, + param_seq, + typeCode, + typeClassName, + old_program_base); + if (DDS_RETCODE_OK != retcode) { + // TODO(asorbini) log error + return retcode; + } + } + + // Delete old_program if we didn't actually compile anything. + // In the other case, Connext doesn't delete the old program to + // avoid a possible race condition, so we do the same here and + // leave it alone. TODO(asorbini) This is likely a memory leak. + if (nullptr != old_program && nullptr == old_program->base) { + delete old_program; + } + + *newHandle = program; + + scope_exit_program.cancel(); + return DDS_RETCODE_OK; +} + +#if !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +static +int +RTI_CustomSqlFilter_compare_reader_data( + const void * left, + const void * right) +{ + RTI_CustomSqlFilterReaderData * const l = + const_cast( + static_cast(left)); + RTI_CustomSqlFilterReaderData * const r = + const_cast( + static_cast(right)); + + return RTIOsapiMemory_compare( + DDS_OctetSeq_get_contiguous_buffer(&l->cookie.value), + DDS_OctetSeq_get_contiguous_buffer(&r->cookie.value), + sizeof(struct REDAWeakReference)); +} + +static +DDS_ReturnCode_t +RTI_CustomSqlFilter_writer_attach( + void * filter_data, + void ** writer_filter_data, + void * reserved) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * writer_data = + new (std::nothrow) RTI_CustomSqlFilterWriterData(); + if (nullptr == writer_data) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + auto scope_exit_data = rcpputils::make_scope_exit( + [writer_data]() + { + delete writer_data; + }); + + if (!REDASkiplist_newDefaultAllocator( + &writer_data->readers_desc, + REDASkiplist_getOptimumMaximumLevel(REDA_FAST_BUFFER_POOL_UNLIMITED), + 1)) + { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + REDASkiplist_init( + &writer_data->readers, + &writer_data->readers_desc, + RTI_CustomSqlFilter_compare_reader_data, + nullptr, + 0, + 0); + + DDS_ReturnCode_t rc = + DDS_SqlFilter_writerAttach(&cft_data->base, &writer_data->base, reserved); + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return rc; + } + *writer_filter_data = writer_data; + scope_exit_data.cancel(); + return DDS_RETCODE_OK; +} + +void +RTI_CustomSqlFilter_writer_detach( + void * filter_data, + void * writer_filter_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + DDS_SqlFilter_writerDetach(&cft_data->base, writer_data->base); + + REDASkiplistNode * node = nullptr; + REDASkiplist_gotoTopNode(&writer_data->readers, &node); + while (REDASkiplist_gotoNextNode(&writer_data->readers, &node)) { + RTI_CustomSqlFilterReaderData * const rdata = + static_cast(REDASkiplistNode_getUserData(node)); + delete rdata; + } + + REDASkiplist_deleteDefaultAllocator(&writer_data->readers_desc); + delete writer_data; +} + +static +void +RTI_CustomSqlFilter_update_unfiltered_list( + RTI_CustomSqlFilterWriterData * const writer_data) +{ + writer_data->unfiltered_readers_buffer.clear(); + writer_data->unfiltered_readers_buffer.reserve(writer_data->unfiltered_readers_count); + + REDASkiplistNode * node = nullptr; + REDASkiplist_gotoTopNode(&writer_data->readers, &node); + while (REDASkiplist_gotoNextNode(&writer_data->readers, &node)) { + RTI_CustomSqlFilterReaderData * const rdata = + static_cast(REDASkiplistNode_getUserData(node)); + if (rdata->expression.empty()) { + writer_data->unfiltered_readers_buffer.push_back(&rdata->cookie); + } + } +} + +static +DDS_ReturnCode_t +RTI_CustomSqlFilter_writer_compile( + void * filter_data, + void * writer_filter_data, + struct DDS_ExpressionProperty * prop, + const char * expression, + const struct DDS_StringSeq * parameters, + const struct DDS_TypeCode * type_code, + const char * type_class_name, + const struct DDS_Cookie_t * cookie) +{ + const bool empty_expr = expression[0] == '\0'; + + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + bool regen_unfiltered = false; + bool new_reader = false; + + RTI_CustomSqlFilterReaderData * reader_data = nullptr; + bool reader_data_added = false; + bool reader_data_compiled = false; + auto scope_exit_reader_data = rcpputils::make_scope_exit( + [cft_data, writer_data, cookie, + &reader_data_added, &reader_data_compiled, &reader_data]() + { + if (reader_data_compiled) { + DDS_SqlFilter_writerFinalize( + &cft_data->base, writer_data->base, cookie); + } + if (reader_data_added) { + REDASkiplist_removeNodeEA(&writer_data->readers, reader_data); + } + if (nullptr != reader_data) { + delete reader_data; + } + }); + REDAWeakReference cookie_wr; + RTIOsapiMemory_copy( + &cookie_wr, + DDS_OctetSeq_get_contiguous_buffer(&cookie->value), + sizeof(struct REDAWeakReference)); + RTI_CustomSqlFilterReaderData lookup_data(&cookie_wr); + + REDASkiplistNode * node = nullptr; + RTIBool precise_match = RTI_FALSE; + REDASkiplist_findNode( + &writer_data->readers, + const_cast(&node), + &precise_match, + static_cast(&lookup_data)); + if (!precise_match) { + node = nullptr; + } + if (nullptr != node) { + reader_data = + static_cast(REDASkiplistNode_getUserData(node)); + reader_data_added = true; + + if (!reader_data->expression.empty() && empty_expr) { + DDS_SqlFilter_writerFinalize( + &cft_data->base, writer_data->base, cookie); + writer_data->unfiltered_readers_count += 1; + writer_data->filtered_readers_count -= 1; + } else if (reader_data->expression.empty() && !empty_expr) { + // Regenerate the list of unfiltered readers + writer_data->unfiltered_readers_count -= 1; + writer_data->filtered_readers_count += 1; + regen_unfiltered = true; + } + + } else { + new_reader = true; + reader_data = new (std::nothrow) RTI_CustomSqlFilterReaderData(&cookie_wr); + if (nullptr == reader_data) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + if (REDASkiplist_assertNodeEA( + &writer_data->readers, + nullptr, + reader_data, + 0, + 0) == nullptr) + { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + reader_data_added = true; + + if (empty_expr) { + writer_data->unfiltered_readers_count += 1; + } else { + writer_data->filtered_readers_count += 1; + } + } + + reader_data->expression = expression; + + if (empty_expr) { + prop->key_only_filter = DDS_BOOLEAN_FALSE; + prop->writer_side_filter_optimization = DDS_BOOLEAN_FALSE; + } else { + DDS_ReturnCode_t rc = DDS_SqlFilter_writerCompile( + &cft_data->base, + writer_data->base, + prop, + expression, + parameters, + type_code, + type_class_name, + cookie); + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return rc; + } + reader_data_compiled = true; + } + + const size_t readers_size = writer_data->matched_readers_buffer.size(); + if (new_reader) { + writer_data->matched_readers_buffer.reserve(readers_size + 1); + } + + if (regen_unfiltered) { + RTI_CustomSqlFilter_update_unfiltered_list(writer_data); + } else if (new_reader && empty_expr) { + writer_data->unfiltered_readers_buffer.push_back(&reader_data->cookie); + } + + scope_exit_reader_data.cancel(); + return DDS_RETCODE_OK; +} + +DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluated_result( + RTI_CustomSqlFilterWriterData * const writer_data, + DDS_CookieSeq * const sql_matched) +{ + if (DDS_CookieSeq_get_maximum(&writer_data->matched_readers) > 0) { + DDS_CookieSeq_unloan(&writer_data->matched_readers); + } + + const size_t sql_matched_len = + (nullptr != sql_matched) ? DDS_CookieSeq_get_length(sql_matched) : 0; + const size_t unfiltered_size = writer_data->unfiltered_readers_buffer.size(); + size_t readers_size = sql_matched_len + unfiltered_size; + size_t matched_readers_start = 0; + + writer_data->matched_readers_buffer.resize(readers_size); + + if (sql_matched_len > 0) { + DDS_Cookie_t ** const sql_matched_buffer = + DDS_CookieSeq_get_discontiguous_buffer(sql_matched); + memcpy( + &(writer_data->matched_readers_buffer[matched_readers_start]), + sql_matched_buffer, + sizeof(DDS_Cookie_t *) * sql_matched_len); + matched_readers_start += sql_matched_len; + } + + if (unfiltered_size > 0) { + memcpy( + &(writer_data->matched_readers_buffer[matched_readers_start]), + &(writer_data->unfiltered_readers_buffer[0]), + sizeof(DDS_Cookie_t *) * unfiltered_size); + } + + readers_size = writer_data->matched_readers_buffer.size(); + if (readers_size > 0) { + if (!DDS_CookieSeq_loan_discontiguous( + &writer_data->matched_readers, + &(writer_data->matched_readers_buffer[0]), + readers_size, + readers_size)) + { + // TODO(asorbini) log error + return nullptr; + } + } + + return &writer_data->matched_readers; +} + +static +struct DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluate_on_serialized( + void * filter_data, + void * writer_filter_data, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data); + +DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluate( + void * filter_data, + void * writer_filter_data, + const void * sample, + const DDS_FilterSampleInfo * meta_data) +{ + // RTI_CustomSqlFilterData * const cft_data = + // static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + if (writer_data->filtered_readers_count > 0) { + // `sample` is a ROS 2 message so we can't pass it to the SQL filter's + // evaluate() function because the filter expects a Connext memory layout. + // For this reason, we must first serialize the sample to a buffer, then + // we can then call evaluate_on_serialized(). + size_t serialized_size = 0; + const RMW_Connext_Message * const msg = + reinterpret_cast(sample); + const uint8_t * serialized_sample = nullptr; + + if (msg->serialized) { + const rcutils_uint8_array_t * const serialized_msg = + reinterpret_cast(msg->user_data); + serialized_size += static_cast(serialized_msg->buffer_length); + serialized_sample = serialized_msg->buffer; + } else { + RMW_CONNEXT_ASSERT(nullptr != msg->type_support) + serialized_size += + msg->type_support->serialized_size_max( + msg->user_data, true /* include_encapsulation */); + writer_data->serialize_buffer.resize(serialized_size); + + rcutils_uint8_array_t data_buffer; + data_buffer.allocator = rcutils_get_default_allocator(); + data_buffer.buffer = &writer_data->serialize_buffer[0]; + data_buffer.buffer_length = serialized_size; + data_buffer.buffer_capacity = serialized_size; + rmw_ret_t rc = msg->type_support->serialize(msg->user_data, &data_buffer); + if (RMW_RET_OK != rc) { + return nullptr; + } + serialized_sample = &writer_data->serialize_buffer[0]; + } + + RTICdrStream cdr_stream; + RTICdrStream_init(&cdr_stream); + RTICdrStream_set( + &cdr_stream, + // "Cast the const away", but the buffer will not be written to. + reinterpret_cast( + const_cast(serialized_sample)), + serialized_size); + if (!RTICdrStream_deserializeCdrEncapsulationAndSetDefault((&cdr_stream))) { + RMW_CONNEXT_LOG_ERROR("failed to deserialize and set CDR encapsulation") + return nullptr; + } + RTICdrStream_resetAlignment(&cdr_stream); + RTICdrStream_setCurrentPositionOffset( + &cdr_stream, + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE); + + return RTI_CustomSqlFilter_writer_evaluate_on_serialized( + filter_data, + writer_filter_data, + &cdr_stream, + meta_data); + } else { + return RTI_CustomSqlFilter_writer_evaluated_result(writer_data, nullptr); + } +} + +void +RTI_CustomSqlFilter_writer_finalize( + void * filter_data, + void * writer_filter_data, + const struct DDS_Cookie_t * cookie) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + REDAWeakReference cookie_wr; + RTIOsapiMemory_copy( + &cookie_wr, + DDS_OctetSeq_get_contiguous_buffer(&cookie->value), + sizeof(struct REDAWeakReference)); + RTI_CustomSqlFilterReaderData lookup_data(&cookie_wr); + + REDASkiplistNode * node = + const_cast( + REDASkiplist_removeNodeEA(&writer_data->readers, &lookup_data)); + if (nullptr != node) { + RTI_CustomSqlFilterReaderData * reader_data = + static_cast(REDASkiplistNode_getUserData(node)); + if (reader_data->expression.empty()) { + writer_data->unfiltered_readers_count -= 1; + RTI_CustomSqlFilter_update_unfiltered_list(writer_data); + } else { + writer_data->filtered_readers_count -= 1; + DDS_SqlFilter_writerFinalize(&cft_data->base, writer_data->base, cookie); + } + delete reader_data; + REDASkiplist_deleteNode(&writer_data->readers, node); + } +} + +void +RTI_CustomSqlFilter_writer_return_loan( + void * filter_data, + void * writer_filter_data, + struct DDS_CookieSeq * cookies) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + UNUSED_ARG(cft_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + UNUSED_ARG(writer_data); + DDS_CookieSeq_set_length(cookies, 0); +} + + +static +struct DDS_CookieSeq * +RTI_CustomSqlFilter_writer_evaluate_on_serialized( + void * filter_data, + void * writer_filter_data, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterWriterData * const writer_data = + static_cast(writer_filter_data); + + DDS_CookieSeq * sql_matched = nullptr; + + if (writer_data->filtered_readers_count > 0) { + sql_matched = DDS_SqlFilter_writerEvaluateOnSerialized( + &cft_data->base, writer_data->base, sample, meta_data); + if (nullptr == sql_matched) { + // TODO(asorbini) log error + return nullptr; + } + } + return RTI_CustomSqlFilter_writer_evaluated_result(writer_data, sql_matched); +} + +DDS_Boolean +RTI_CustomSqlFilter_evaluate_on_serialized( + void * filter_data, + void * handle, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + DDS_Boolean accepted = DDS_BOOLEAN_TRUE; + if (nullptr != program->base) { + accepted = DDS_SqlFilter_evaluateOnSerialized( + &cft_data->base, program->base, sample, meta_data); + } + return accepted; +} + +DDS_Long +RTI_CustomSqlFilter_query(void * filter_data, void * handle) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + (void)cft_data; + + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + + if (nullptr == program->base) { + return 0; + } + + return DDS_SqlFilter_query(filter_data, program->base); +} + +#endif // !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + +// This function is only called when RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE +// is enabled. In that flag is set, the filter will be registered as a user +// filter instead of built-in one, so it won't be able to do filtering on +// serialized samples. When this function is used, we don't perform +// writer-side optimizations and we rely only on reader-side filtering to +// avoid having to serialize the sample unnecessarily. +DDS_Boolean +RTI_CustomSqlFilter_evaluate( + void * filter_data, + void * handle, + const void * sample, + const struct DDS_FilterSampleInfo * meta_data) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + + if (nullptr == program->base) { + return DDS_BOOLEAN_TRUE; + } + + const RMW_Connext_Message * const msg = + static_cast(sample); + + if (nullptr != msg->user_data) { + // On the writer side we don't actually perform any filtering, because that + // could require to perform a (possibly useless) additional serialization + // into a buffer, so we always return true. + return DDS_BOOLEAN_TRUE; + } + + RTICdrStream cdr_stream; + RTICdrStream_init(&cdr_stream); + RTICdrStream_set( + &cdr_stream, + reinterpret_cast(msg->data_buffer.buffer), + msg->data_buffer.buffer_length); + if (!RTICdrStream_deserializeCdrEncapsulationAndSetDefault((&cdr_stream))) { + RMW_CONNEXT_LOG_ERROR("failed to deserialize and set CDR encapsulation") + return DDS_BOOLEAN_FALSE; + } + RTICdrStream_resetAlignment(&cdr_stream); + RTICdrStream_setCurrentPositionOffset( + &cdr_stream, + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE); + + return DDS_SqlFilter_evaluateOnSerialized( + &cft_data->base, program->base, &cdr_stream, meta_data); +} + +void +RTI_CustomSqlFilter_finalize(void * filter_data, void * handle) +{ + RTI_CustomSqlFilterData * const cft_data = + static_cast(filter_data); + RTI_CustomSqlFilterProgram * const program = + static_cast(handle); + + if (nullptr != program->base) { + DDS_SqlFilter_finalize(&cft_data->base, program->base); + } + + delete program; +} + + +DDS_ReturnCode_t +rti_connext_dds_custom_sql_filter::register_content_filter( + DDS_DomainParticipant * const participant, + rti_connext_dds_custom_sql_filter::CustomSqlFilterData * const filter_data) +{ + DDS_DomainParticipantQos dp_qos = DDS_DomainParticipantQos_INITIALIZER; + auto scope_exit_qos = rcpputils::make_scope_exit( + [&dp_qos]() + { + if (DDS_RETCODE_OK != DDS_DomainParticipantQos_finalize(&dp_qos)) { + // TODO(asorbini) log error + } + }); + + + if (DDS_RETCODE_OK != DDS_DomainParticipant_get_qos(participant, &dp_qos)) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + DDS_ReturnCode_t rc = filter_data->set_memory_management_property(dp_qos); + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return rc; + } + + DDS_ContentFilter filter = DDS_ContentFilter_INITIALIZER; + filter.compile = RTI_CustomSqlFilter_compile; + filter.evaluate = RTI_CustomSqlFilter_evaluate; + filter.finalize = RTI_CustomSqlFilter_finalize; + filter.filter_data = filter_data; + +#if RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + rc = DDS_DomainParticipant_register_contentfilter( + participant, + PLUGIN_NAME, + &filter); +#else + filter.writer_attach = RTI_CustomSqlFilter_writer_attach; + filter.writer_compile = RTI_CustomSqlFilter_writer_compile; + filter.writer_detach = RTI_CustomSqlFilter_writer_detach; + filter.writer_evaluate = RTI_CustomSqlFilter_writer_evaluate; + filter.writer_finalize = RTI_CustomSqlFilter_writer_finalize; + filter.writer_return_loan = RTI_CustomSqlFilter_writer_return_loan; + + rc = DDS_ContentFilter_register_filter( + participant, + PLUGIN_NAME, + &filter, + RTI_CustomSqlFilter_evaluate_on_serialized, + RTI_CustomSqlFilter_writer_evaluate_on_serialized, + RTI_CustomSqlFilter_query, + DDS_BOOLEAN_TRUE); +#endif // RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE + if (DDS_RETCODE_OK != rc) { + // TODO(asorbini) log error + return DDS_RETCODE_ERROR; + } + + return DDS_RETCODE_OK; +} + +const char * const +rti_connext_dds_custom_sql_filter::PLUGIN_NAME = "RTI_CONNEXTDDS_CUSTOM_SQL_FILTER"; + +#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 857c7791..4b9cc9a4 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -17,6 +17,7 @@ #include #include "rmw/impl/cpp/key_value.hpp" +#include "rmw_connextdds/custom_sql_filter.hpp" #include "rmw_connextdds/type_support.hpp" #include "rmw_connextdds/rmw_impl.hpp" @@ -25,6 +26,13 @@ const char * const RMW_CONNEXTDDS_ID = "rmw_connextdds"; const char * const RMW_CONNEXTDDS_SERIALIZATION_FORMAT = "cdr"; +struct rmw_connextdds_api_pro +{ + rti_connext_dds_custom_sql_filter::CustomSqlFilterData custom_filter_data; +}; + +rmw_connextdds_api_pro * RMW_Connext_fv_FactoryContext = nullptr; + rmw_ret_t rmw_connextdds_set_log_verbosity(rmw_log_severity_t severity) { @@ -69,9 +77,28 @@ rmw_ret_t rmw_connextdds_initialize_participant_factory_context( rmw_context_impl_t * const ctx) { + RMW_CONNEXT_ASSERT(RMW_Connext_fv_FactoryContext == nullptr) + // RMW_Connext_gv_DomainParticipantFactory is initialized by + // rmw_api_connextdds_init(). RMW_CONNEXT_ASSERT(RMW_Connext_gv_DomainParticipantFactory == nullptr) UNUSED_ARG(ctx); - // Nothing to do + + rmw_connextdds_api_pro * ctx_api = nullptr; + auto scope_exit_api_delete = rcpputils::make_scope_exit( + [ctx_api]() + { + if (nullptr != ctx_api) { + delete ctx_api; + } + }); + + ctx_api = new (std::nothrow) rmw_connextdds_api_pro(); + if (nullptr == ctx_api) { + return RMW_RET_ERROR; + } + + scope_exit_api_delete.cancel(); + RMW_Connext_fv_FactoryContext = ctx_api; return RMW_RET_OK; } @@ -79,6 +106,71 @@ rmw_ret_t rmw_connextdds_finalize_participant_factory_context( rmw_context_impl_t * const ctx) { + RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_fv_FactoryContext) + rmw_connextdds_api_pro * const ctx_api = RMW_Connext_fv_FactoryContext; + RMW_Connext_fv_FactoryContext = nullptr; + + delete ctx_api; + + if (nullptr == RMW_Connext_gv_DomainParticipantFactory) { + // Nothing else to do if we didn't even initialize the factory; + return RMW_RET_OK; + } + + // There might be some DomainParticipants left-over from a ("failed context + // initialization" + "failed participant finalization"), so let's try to + // clean them up. + DDS_DomainParticipantSeq participants = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_seq = rcpputils::make_scope_exit( + [&participants]() + { + DDS_DomainParticipantSeq_finalize(&participants); + }); + + if (DDS_RETCODE_OK != + DDS_DomainParticipantFactory_get_participants( + RMW_Connext_gv_DomainParticipantFactory, &participants)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to list existing participants") + return RMW_RET_ERROR; + } + + const DDS_Long pending = DDS_DomainParticipantSeq_get_length(&participants); + for (DDS_Long i = 0; i < pending; i++) { + DDS_DomainParticipant * const participant = + *DDS_DomainParticipantSeq_get_reference(&participants, i); +#if RMW_CONNEXT_DEBUG + // If we are building in Debug mode, an issue in Connext may prevent the + // participant from being able to delete any content-filtered topic if + // the participant has not been enabled. + // For this reason, make sure to enable the participant before trying to + // finalize it. + // TODO(asorbini) reconsider the need for this code in Connext > 6.1.0 + if (DDS_RETCODE_OK != + DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant))) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to enable pending DomainParticipant before deletion") + return RMW_RET_ERROR; + } +#endif // RMW_CONNEXT_DEBUG + if (DDS_RETCODE_OK != + DDS_DomainParticipant_delete_contained_entities(participant)) + { + RMW_CONNEXT_LOG_ERROR_SET( + "failed to delete pending DomainParticipant's entities") + return RMW_RET_ERROR; + } + + if (DDS_RETCODE_OK != + DDS_DomainParticipantFactory_delete_participant( + RMW_Connext_gv_DomainParticipantFactory, participant)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to delete pending DomainParticipant") + return RMW_RET_ERROR; + } + } + UNUSED_ARG(ctx); return RMW_RET_OK; } @@ -229,6 +321,48 @@ rmw_connextdds_initialize_participant_qos_impl( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_configure_participant( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const participant) +{ + UNUSED_ARG(ctx); + + if (DDS_RETCODE_OK != + rti_connext_dds_custom_sql_filter::register_content_filter( + participant, &RMW_Connext_fv_FactoryContext->custom_filter_data)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to register custom SQL filter") + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +static +rmw_ret_t +rmw_connextdds_initialize_cft_parameters( + struct DDS_StringSeq * const cft_parameters, + const rcutils_string_array_t * const cft_expression_parameters) +{ + // Cache value locally to avoid conversion warnings on Windows. + const DDS_Long params_len = static_cast(cft_expression_parameters->size); + + if (!DDS_StringSeq_ensure_length(cft_parameters, params_len, params_len)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to ensure length for cft parameters sequence") + return RMW_RET_ERROR; + } + if (!DDS_StringSeq_from_array( + cft_parameters, + const_cast(cft_expression_parameters->data), + params_len)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to copy data for cft parameters sequence") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -236,6 +370,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -243,13 +378,28 @@ rmw_connextdds_create_contentfilteredtopic( RMW_CONNEXT_ASSERT(nullptr != cft_filter) struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; - DDS_StringSeq_ensure_length(&cft_parameters, 0, 0); + auto scope_exit_cft_params = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (cft_expression_parameters) { + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") + return RMW_RET_ERROR; + } + } *cft_out = nullptr; DDS_ContentFilteredTopic * cft_topic = - DDS_DomainParticipant_create_contentfilteredtopic( - dp, cft_name, base_topic, cft_filter, &cft_parameters); + DDS_DomainParticipant_create_contentfilteredtopic_with_filter( + dp, cft_name, base_topic, cft_filter, &cft_parameters, + rti_connext_dds_custom_sql_filter::PLUGIN_NAME); + if (nullptr == cft_topic) { RMW_CONNEXT_LOG_ERROR_A_SET( "failed to create content-filtered topic: " @@ -570,7 +720,6 @@ rmw_connextdds_write_message( // enable WriteParams::replace_auto to retrieve SN of published message write_params.replace_auto = DDS_BOOLEAN_TRUE; } - if (DDS_RETCODE_OK != DDS_DataWriter_write_w_params_untypedI( pub->writer(), message, &write_params)) @@ -640,9 +789,9 @@ rmw_connextdds_take_samples( RMW_CONNEXT_LOG_ERROR_SET("failed to take data from DDS reader") return RMW_RET_ERROR; } - RMW_CONNEXT_ASSERT(data_len > 0)(void) RMW_Connext_Uint8ArrayPtrSeq_loan_contiguous( + RMW_CONNEXT_ASSERT(data_len > 0)(void) RMW_Connext_MessagePtrSeq_loan_contiguous( sub->data_seq(), - reinterpret_cast(data_buffer), + reinterpret_cast(data_buffer), data_len, data_len); @@ -654,11 +803,11 @@ rmw_connextdds_return_samples( RMW_Connext_Subscriber * const sub) { void ** data_buffer = reinterpret_cast( - RMW_Connext_Uint8ArrayPtrSeq_get_contiguous_buffer(sub->data_seq())); + RMW_Connext_MessagePtrSeq_get_contiguous_buffer(sub->data_seq())); const DDS_Long data_len = - RMW_Connext_Uint8ArrayPtrSeq_get_length(sub->data_seq()); + RMW_Connext_MessagePtrSeq_get_length(sub->data_seq()); - if (!RMW_Connext_Uint8ArrayPtrSeq_unloan(sub->data_seq())) { + if (!RMW_Connext_MessagePtrSeq_unloan(sub->data_seq())) { RMW_CONNEXT_LOG_ERROR_SET("failed to unloan sample sequence") return RMW_RET_ERROR; } @@ -1299,3 +1448,98 @@ rmw_connextdds_enable_security( return RMW_RET_OK; } + + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const cft_expression, + const rcutils_string_array_t * const cft_expression_parameters) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + struct DDS_StringSeq cft_parameters = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_cft_parameters = rcpputils::make_scope_exit( + [&cft_parameters]() { + if (!DDS_StringSeq_finalize(&cft_parameters)) { + RMW_CONNEXT_LOG_ERROR_SET("failed to finalize cft parameters sequence") + } + }); + if (nullptr != cft_expression_parameters) { + if (RMW_RET_OK != + rmw_connextdds_initialize_cft_parameters(&cft_parameters, cft_expression_parameters)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_connextdds_initialize_cft_parameters") + return RMW_RET_ERROR; + } + } + + DDS_ReturnCode_t ret = + DDS_ContentFilteredTopic_set_expression(cft_topic, cft_expression, &cft_parameters); + if (DDS_RETCODE_OK != ret) { + RMW_CONNEXT_LOG_ERROR_SET("failed to set content-filtered topic") + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * const options) +{ + DDS_ContentFilteredTopic * const cft_topic = + DDS_ContentFilteredTopic_narrow(topic_desc); + + // get filter_expression + const char * filter_expression = DDS_ContentFilteredTopic_get_filter_expression(cft_topic); + if (!filter_expression) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get filter expression") + return RMW_RET_ERROR; + } + + // get parameters + struct DDS_StringSeq parameters = DDS_SEQUENCE_INITIALIZER; + DDS_ReturnCode_t status = + DDS_ContentFilteredTopic_get_expression_parameters(cft_topic, ¶meters); + if (DDS_RETCODE_OK != status) { + RMW_CONNEXT_LOG_ERROR_SET("failed to get expression parameters") + return RMW_RET_ERROR; + } + auto scope_exit_parameters_delete = + rcpputils::make_scope_exit( + [¶meters]() + { + DDS_StringSeq_finalize(¶meters); + }); + + const DDS_Long parameters_len = DDS_StringSeq_get_length(¶meters); + std::vector expression_parameters; + expression_parameters.reserve(parameters_len); + + for (DDS_Long i = 0; i < parameters_len; ++i) { + const char * ref = *DDS_StringSeq_get_reference(¶meters, i); + if (!ref) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to get a reference for parameters with index %d", i) + return RMW_RET_ERROR; + } + + expression_parameters.push_back(ref); + } + + if (RMW_RET_OK != rmw_subscription_content_filter_options_init( + filter_expression, + expression_parameters.size(), + expression_parameters.data(), + allocator, + options)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to rmw_subscription_content_filter_options_init") + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} diff --git a/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp b/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp index dc4e33f6..ffbb6862 100644 --- a/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp @@ -88,15 +88,15 @@ struct RMW_Connext_NddsTypePluginI REDAFastBufferPool_delete(this->pool_samples); } - rcutils_uint8_array_t * + RMW_Connext_Message * allocate_sample() { - return reinterpret_cast( + return reinterpret_cast( REDAFastBufferPool_getBuffer(this->pool_samples)); } void - return_sample(rcutils_uint8_array_t * const sample) + return_sample(RMW_Connext_Message * const sample) { REDAFastBufferPool_returnBuffer(this->pool_samples, sample); } @@ -126,8 +126,8 @@ struct RMW_Connext_NddsParticipantData static RTIBool -RMW_Connext_Uint8ArrayPtr_initialize_w_params( - rcutils_uint8_array_t ** self, +RMW_Connext_MessagePtr_initialize_w_params( + RMW_Connext_Message ** self, const struct DDS_TypeAllocationParams_t * allocParams) { UNUSED_ARG(allocParams); @@ -136,8 +136,8 @@ RMW_Connext_Uint8ArrayPtr_initialize_w_params( } static RTIBool -RMW_Connext_Uint8ArrayPtr_finalize_w_params( - rcutils_uint8_array_t ** self, +RMW_Connext_MessagePtr_finalize_w_params( + RMW_Connext_Message ** self, const struct DDS_TypeDeallocationParams_t * deallocParams) { UNUSED_ARG(deallocParams); @@ -146,19 +146,19 @@ RMW_Connext_Uint8ArrayPtr_finalize_w_params( } static RTIBool -RMW_Connext_Uint8ArrayPtr_copy( - rcutils_uint8_array_t ** dst, - const rcutils_uint8_array_t ** src) +RMW_Connext_MessagePtr_copy( + RMW_Connext_Message ** dst, + const RMW_Connext_Message ** src) { - *dst = const_cast(*src); + *dst = const_cast(*src); return RTI_TRUE; } -#define T rcutils_uint8_array_t * -#define TSeq RMW_Connext_Uint8ArrayPtrSeq -#define T_initialize_w_params RMW_Connext_Uint8ArrayPtr_initialize_w_params -#define T_finalize_w_params RMW_Connext_Uint8ArrayPtr_finalize_w_params -#define T_copy RMW_Connext_Uint8ArrayPtr_copy +#define T RMW_Connext_Message * +#define TSeq RMW_Connext_MessagePtrSeq +#define T_initialize_w_params RMW_Connext_MessagePtr_initialize_w_params +#define T_finalize_w_params RMW_Connext_MessagePtr_finalize_w_params +#define T_copy RMW_Connext_MessagePtr_copy #include "dds_c/generic/dds_c_sequence_TSeq.gen" static @@ -232,29 +232,22 @@ RMW_Connext_TypePlugin_create_data(void ** sample, void * user_data) RMW_Connext_MessageTypeSupport * const type_support = reinterpret_cast(user_data); - rcutils_uint8_array_t * data_buffer = - new (std::nothrow) rcutils_uint8_array_t(); - if (nullptr == data_buffer) { - return RTI_FALSE; - } - - const rcutils_allocator_t allocator = rcutils_get_default_allocator(); size_t buffer_size = 0; - if (type_support->unbounded()) { buffer_size = 0; } else { buffer_size = type_support->type_serialized_size_max(); } - if (RCUTILS_RET_OK != - rcutils_uint8_array_init(data_buffer, buffer_size, &allocator)) - { - delete data_buffer; + RMW_Connext_Message * msg = new (std::nothrow) RMW_Connext_Message(); + if (nullptr == msg) { return RTI_FALSE; } - - *sample = data_buffer; + if (RMW_RET_OK != RMW_Connext_Message_initialize(msg, type_support, buffer_size)) { + delete msg; + return RTI_FALSE; + } + *sample = msg; return RTI_TRUE; } @@ -268,14 +261,9 @@ RMW_Connext_TypePlugin_destroy_data(void ** sample, void * user_data) UNUSED_ARG(type_support); - rcutils_uint8_array_t * data_buffer = - reinterpret_cast(*sample); - - if (RCUTILS_RET_OK != rcutils_uint8_array_fini(data_buffer)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to finalize array") - } - - delete data_buffer; + RMW_Connext_Message * msg = reinterpret_cast(*sample); + RMW_Connext_Message_finalize(msg); + delete msg; } /* ---------------------------------------------------------------------------- @@ -418,11 +406,14 @@ RMW_Connext_TypePlugin_copy_sample( { UNUSED_ARG(endpoint_data); - const rcutils_uint8_array_t * src_buffer = - reinterpret_cast(src); - rcutils_uint8_array_t * dst_buffer = reinterpret_cast(dst); + const RMW_Connext_Message * src_msg = + reinterpret_cast(src); + RMW_Connext_Message * dst_msg = + reinterpret_cast(dst); - if (RCUTILS_RET_OK != rcutils_uint8_array_copy(dst_buffer, src_buffer)) { + if (RCUTILS_RET_OK != + rcutils_uint8_array_copy(&dst_msg->data_buffer, &src_msg->data_buffer)) + { return RTI_FALSE; } @@ -461,6 +452,14 @@ RMW_Connext_TypePlugin_serialize( const RMW_Connext_Message * const msg = reinterpret_cast(sample); + if (nullptr == msg->user_data) { + // Samples written by the application layer should always have a non-null + // user_data pointer. The only samples which do not use that pointers are + // the ones created internally in the DataReader queue, and it would be + // unexpected for them to be passed to this function. + return RTI_FALSE; + } + rcutils_uint8_array_t data_buffer; data_buffer.allocator = rcutils_get_default_allocator(); data_buffer.buffer = @@ -517,22 +516,22 @@ RMW_Connext_TypePlugin_deserialize( return RTI_FALSE; } - rcutils_uint8_array_t * const data_buffer = - reinterpret_cast(*sample); + RMW_Connext_Message * const msg = + reinterpret_cast(*sample); + const size_t deserialize_size = RTICdrStream_getRemainder(stream); void * const src_ptr = RTICdrStream_getCurrentPosition(stream); - if (data_buffer->buffer_capacity < deserialize_size) { + if (msg->data_buffer.buffer_capacity < deserialize_size) { if (RCUTILS_RET_OK != - rcutils_uint8_array_resize(data_buffer, deserialize_size)) + rcutils_uint8_array_resize(&msg->data_buffer, deserialize_size)) { return RTI_FALSE; } } - memcpy(data_buffer->buffer, src_ptr, deserialize_size); - - data_buffer->buffer_length = deserialize_size; + memcpy(msg->data_buffer.buffer, src_ptr, deserialize_size); + msg->data_buffer.buffer_length = deserialize_size; RTICdrStream_setCurrentPosition(stream, reinterpret_cast(src_ptr) + deserialize_size); @@ -606,6 +605,12 @@ RMW_Connext_TypePlugin_get_serialized_sample_size( const RMW_Connext_Message * const msg = reinterpret_cast(sample); + // Samples written by the application layer should always have a non-null + // user_data pointer. The only samples which do not use that pointers are + // the ones created internally in the DataReader queue, and it would be + // unexpected for them to be passed to this function. + RMW_CONNEXT_ASSERT(nullptr != msg->user_data) + if (msg->serialized) { const rcutils_uint8_array_t * const serialized_msg = reinterpret_cast(msg->user_data); @@ -796,7 +801,7 @@ rmw_connextdds_register_type_support( struct REDAFastBufferPool * const pool_samples = REDAFastBufferPool_new( - sizeof(rcutils_uint8_array_t), + sizeof(RMW_Connext_Message), RTIOsapiAlignment_getDefaultAlignment(), &pool_prop); diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index 0f5672ec..36913e9e 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -649,6 +649,8 @@ rmw_connextdds_finalize_participant_factory_context( RMW_CONNEXT_ASSERT(nullptr != RMW_Connext_fv_FactoryContext) rmw_connextdds_api_micro * const ctx_api = RMW_Connext_fv_FactoryContext; + RMW_Connext_fv_FactoryContext = nullptr; + RT_Registry_T * registry = DDS_DomainParticipantFactory_get_registry( RMW_Connext_gv_DomainParticipantFactory); @@ -860,6 +862,17 @@ rmw_connextdds_initialize_participant_qos_impl( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_configure_participant( + rmw_context_impl_t * const ctx, + DDS_DomainParticipant * const participant) +{ + UNUSED_ARG(participant); + UNUSED_ARG(ctx); + + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_create_contentfilteredtopic( rmw_context_impl_t * const ctx, @@ -867,6 +880,7 @@ rmw_connextdds_create_contentfilteredtopic( DDS_Topic * const base_topic, const char * const cft_name, const char * const cft_filter, + const rcutils_string_array_t * const cft_expression_parameters, DDS_TopicDescription ** const cft_out) { UNUSED_ARG(ctx); @@ -874,6 +888,7 @@ rmw_connextdds_create_contentfilteredtopic( UNUSED_ARG(base_topic); UNUSED_ARG(cft_name); UNUSED_ARG(cft_filter); + UNUSED_ARG(cft_expression_parameters); UNUSED_ARG(cft_out); return RMW_RET_UNSUPPORTED; } @@ -1986,3 +2001,27 @@ rmw_connextdds_enable_security( return RMW_RET_ERROR; #endif /* RMW_CONNEXT_ENABLE_SECURITY */ } + +rmw_ret_t +rmw_connextdds_set_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + const char * const cft_expression, + const rcutils_string_array_t * const cft_expression_parameters) +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(cft_expression); + UNUSED_ARG(cft_expression_parameters); + return RMW_RET_UNSUPPORTED; +} + +rmw_ret_t +rmw_connextdds_get_cft_filter_expression( + DDS_TopicDescription * const topic_desc, + rcutils_allocator_t * const allocator, + rmw_subscription_content_filter_options_t * const options) +{ + UNUSED_ARG(topic_desc); + UNUSED_ARG(allocator); + UNUSED_ARG(options); + return RMW_RET_UNSUPPORTED; +} diff --git a/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp b/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp index a0af9461..57086bb6 100644 --- a/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp @@ -62,29 +62,23 @@ RMW_Connext_MemoryPlugin_create_sample( { auto type_support = RMW_Connext_RtimeTypePluginI::type_support(tp); - rcutils_uint8_array_t * data_buffer = - new (std::nothrow) rcutils_uint8_array_t(); - if (nullptr == data_buffer) { - return RTI_FALSE; - } - - const rcutils_allocator_t allocator = rcutils_get_default_allocator(); size_t buffer_size = 0; - if (type_support->unbounded()) { buffer_size = 0; } else { buffer_size = type_support->type_serialized_size_max(); } - if (RCUTILS_RET_OK != - rcutils_uint8_array_init(data_buffer, buffer_size, &allocator)) - { - delete data_buffer; + RMW_Connext_Message * const msg = new (std::nothrow) RMW_Connext_Message(); + if (nullptr == msg) { + return RTI_FALSE; + } + if (RMW_RET_OK != RMW_Connext_Message_initialize(msg, type_support, buffer_size)) { + delete msg; return RTI_FALSE; } - *sample = data_buffer; + *sample = msg; return RTI_TRUE; } @@ -97,15 +91,10 @@ RMW_Connext_MemoryPlugin_delete_sample( { UNUSED_ARG(plugin); - rcutils_uint8_array_t * data_buffer = - reinterpret_cast(sample); - - if (RCUTILS_RET_OK != rcutils_uint8_array_fini(data_buffer)) { - delete data_buffer; - return RTI_FALSE; - } - - delete data_buffer; + RMW_Connext_Message * const msg = + reinterpret_cast(sample); + RMW_Connext_Message_finalize(msg); + delete msg; return RTI_TRUE; } @@ -117,14 +106,16 @@ RMW_Connext_MemoryPlugin_copy_sample( void * dst, const void * src) { - const rcutils_uint8_array_t * src_buffer = - reinterpret_cast(src); - rcutils_uint8_array_t * dst_buffer = - reinterpret_cast(dst); + const RMW_Connext_Message * src_msg = + reinterpret_cast(src); + RMW_Connext_Message * dst_msg = + reinterpret_cast(dst); UNUSED_ARG(plugin); - if (RCUTILS_RET_OK != rcutils_uint8_array_copy(dst_buffer, src_buffer)) { + if (RCUTILS_RET_OK != + rcutils_uint8_array_copy(&dst_msg->data_buffer, &src_msg->data_buffer)) + { return RTI_FALSE; } @@ -145,6 +136,14 @@ RMW_Connext_EncapsulationPlugin_serialize( UNUSED_ARG(plugin); UNUSED_ARG(destination); + if (nullptr == msg->user_data) { + // Samples written by the application layer should always have a non-null + // user_data pointer. The only samples which do not use that pointers are + // the ones created internally in the DataReader queue, and it would be + // unexpected for them to be passed to this function. + return RTI_FALSE; + } + DDS_TypePluginBuffer * const tbuf = reinterpret_cast(stream->real_buff); @@ -182,8 +181,7 @@ RMW_Connext_EncapsulationPlugin_serialize( if (type_support->unbounded()) { if (msg_buffer_unbound->buffer_capacity < serialized_size) { if (RCUTILS_RET_OK != - rcutils_uint8_array_resize( - msg_buffer_unbound, serialized_size)) + rcutils_uint8_array_resize(msg_buffer_unbound, serialized_size)) { return RTI_FALSE; } @@ -243,15 +241,15 @@ RMW_Connext_EncapsulationPlugin_deserialize( UNUSED_ARG(source); UNUSED_ARG(plugin); - rcutils_uint8_array_t * const data_buffer = - reinterpret_cast(void_sample); + RMW_Connext_Message * const msg = + reinterpret_cast(void_sample); const size_t deserialize_size = stream->length - CDR_Stream_get_current_position_offset(stream) + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; - if (data_buffer->buffer_capacity < deserialize_size) { + if (msg->data_buffer.buffer_capacity < deserialize_size) { if (RCUTILS_RET_OK != - rcutils_uint8_array_resize(data_buffer, deserialize_size)) + rcutils_uint8_array_resize(&msg->data_buffer, deserialize_size)) { return RTI_FALSE; } @@ -262,11 +260,11 @@ RMW_Connext_EncapsulationPlugin_deserialize( RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; memcpy( - data_buffer->buffer, + msg->data_buffer.buffer, src_ptr, deserialize_size); - data_buffer->buffer_length = deserialize_size; + msg->data_buffer.buffer_length = deserialize_size; return RTI_TRUE; } diff --git a/rmw_connextddsmicro/CHANGELOG.rst b/rmw_connextddsmicro/CHANGELOG.rst index 0d25e166..d605c769 100644 --- a/rmw_connextddsmicro/CHANGELOG.rst +++ b/rmw_connextddsmicro/CHANGELOG.rst @@ -2,6 +2,21 @@ Changelog for package rmw_connextddsmicro ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.11.1 (2022-04-26) +------------------- + +0.11.0 (2022-04-08) +------------------- +* Exclude missing sample info fields when building rmw_connextddsmicro (`#79 `_) +* Contributors: Andrea Sorbini + +0.10.0 (2022-03-28) +------------------- +* Add support for user-specified content filters (`#68 `_) +* add stub for content filtered topic (`#77 `_) +* Add sequence numbers to message info structure (`#74 `_) +* Contributors: Andrea Sorbini, Chen Lihui, Ivan Santiago Paunovic + 0.9.0 (2022-03-01) ------------------ * Add rmw listener apis (`#44 `_) diff --git a/rmw_connextddsmicro/package.xml b/rmw_connextddsmicro/package.xml index 9ea0db43..9c3833a9 100644 --- a/rmw_connextddsmicro/package.xml +++ b/rmw_connextddsmicro/package.xml @@ -2,7 +2,7 @@ rmw_connextddsmicro - 0.9.0 + 0.11.1 A ROS2 RMW implementation built with RTI Connext DDS Micro. Andrea Sorbini Apache License 2.0 diff --git a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp index 9f6b6434..c70b1b69 100644 --- a/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp +++ b/rmw_connextddsmicro/src/rmw_api_impl_rtime.cpp @@ -716,6 +716,30 @@ rmw_subscription_get_actual_qos( return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos); } +rmw_ret_t +rmw_subscription_set_content_filter( + rmw_subscription_t * subscription, + const rmw_subscription_content_filter_options_t * options) +{ + // Content filters are not supported by Micro and this call will + // eventually fail with RMW_RET_UNSUPPORTED. + return rmw_api_connextdds_subscription_set_content_filter( + subscription, options); +} + + +rmw_ret_t +rmw_subscription_get_content_filter( + const rmw_subscription_t * subscription, + rcutils_allocator_t * allocator, + rmw_subscription_content_filter_options_t * options) +{ + // Content filters are not supported by Micro and this call will + // eventually fail with RMW_RET_UNSUPPORTED. + return rmw_api_connextdds_subscription_get_content_filter( + subscription, allocator, options); +} + rmw_ret_t rmw_destroy_subscription( @@ -936,3 +960,21 @@ rmw_subscription_get_network_flow_endpoints( allocator, network_flow_endpoint_array); } + +/****************************************************************************** + * Feature support functions + ******************************************************************************/ +bool +rmw_feature_supported(rmw_feature_t feature) +{ + switch (feature) { + case RMW_FEATURE_MESSAGE_INFO_PUBLICATION_SEQUENCE_NUMBER: + { + return true; + } + default: + { + return false; + } + } +} diff --git a/rti_connext_dds_cmake_module/CHANGELOG.rst b/rti_connext_dds_cmake_module/CHANGELOG.rst index 69736d78..c24ef599 100644 --- a/rti_connext_dds_cmake_module/CHANGELOG.rst +++ b/rti_connext_dds_cmake_module/CHANGELOG.rst @@ -2,6 +2,15 @@ Changelog for package rti_connext_dds_cmake_module ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +0.11.1 (2022-04-26) +------------------- + +0.11.0 (2022-04-08) +------------------- + +0.10.0 (2022-03-28) +------------------- + 0.9.0 (2022-03-01) ------------------ diff --git a/rti_connext_dds_cmake_module/package.xml b/rti_connext_dds_cmake_module/package.xml index 7eb14726..d0e9f403 100644 --- a/rti_connext_dds_cmake_module/package.xml +++ b/rti_connext_dds_cmake_module/package.xml @@ -2,7 +2,7 @@ rti_connext_dds_cmake_module - 0.9.0 + 0.11.1 Helper module to provide access to RTI products like Connext DDS Professional Andrea Sorbini Apache License 2.0 From ecda67cb51da272b960084cc26463981f491d2de Mon Sep 17 00:00:00 2001 From: Christopher Wecht Date: Wed, 3 May 2023 08:43:48 +0200 Subject: [PATCH 5/5] Notify on changed matched status Signed-off-by: Christopher Wecht --- .../include/rmw_connextdds/rmw_waitset_std.hpp | 1 + rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index a4e52710..45bcedf6 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -851,6 +851,7 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition } void notify_new_data(); + inline rmw_ret_t get_matched_status(rmw_matched_status_t * const status) { diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index d8056155..42751984 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -880,6 +880,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_matched( this->status_matched.total_count - this->status_matched_last.total_count; this->status_matched.current_count_change = this->status_matched.current_count - this->status_matched_last.current_count; + + this->notify_new_event(RMW_EVENT_SUBSCRIPTION_MATCHED); } rmw_ret_t @@ -1070,4 +1072,6 @@ RMW_Connext_PublisherStatusCondition::update_status_matched( this->status_matched.total_count - this->status_matched_last.total_count; this->status_matched.current_count_change = this->status_matched.current_count - this->status_matched_last.current_count; + + this->notify_new_event(RMW_EVENT_PUBLICATION_MATCHED); }