From 831e8b9888cd516764c9c3dd250259a17cc13bf8 Mon Sep 17 00:00:00 2001 From: Taxo Rubio RTI <116876175+trubio-rti@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:38:05 +0200 Subject: [PATCH] Backport rmw callbacks implementation to Humble [ros2-73] (#157) * backport: Add support for listener callbacks (#76) (d4330cc) * Add support for listener callbacks. * Fix wrong debug assertion when converting DDS_Duration values * Clarify interactions between count_unread_samples() and take_next() * Notify on changed matched status Signed-off-by: Christopher Wecht Signed-off-by: Andrea Sorbini Signed-off-by: Taxo Rubio * backport: Conditional internal API access to support Connext 7+ (#121) (afa5055d) Signed-off-by: Andrea Sorbini Signed-off-by: Taxo Rubio * style: Fixed header inclusion order Signed-off-by: Taxo Rubio --------- Signed-off-by: Christopher Wecht Signed-off-by: Andrea Sorbini Signed-off-by: Taxo Rubio Co-authored-by: Andrea Sorbini --- .../include/rmw_connextdds/dds_api.hpp | 5 + .../include/rmw_connextdds/rmw_api_impl.hpp | 18 ++-- .../include/rmw_connextdds/rmw_impl.hpp | 24 +++++ .../rmw_connextdds/rmw_waitset_std.hpp | 89 ++++++++++++++++++ .../src/common/rmw_impl_waitset_std.cpp | 37 ++++++++ .../src/common/rmw_listener.cpp | 92 ++++++++++++------- .../src/ndds/dds_api_ndds.cpp | 79 +++++++++++++++- .../src/rtime/dds_api_rtime.cpp | 34 +++++++ 8 files changed, 335 insertions(+), 43 deletions(-) diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp index 3a8d2b3d..11b052f5 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp @@ -138,6 +138,11 @@ rmw_ret_t rmw_connextdds_take_samples( RMW_Connext_Subscriber * const sub); +rmw_ret_t +rmw_connextdds_count_unread_samples( + RMW_Connext_Subscriber * const sub, + size_t & unread_count); + rmw_ret_t rmw_connextdds_return_samples( RMW_Connext_Subscriber * const sub); diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp index a03ce76e..df8e8efd 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp @@ -92,8 +92,8 @@ RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_event_set_callback( rmw_event_t * event, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * Info API @@ -437,15 +437,15 @@ RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_service_set_on_new_request_callback( rmw_service_t * rmw_service, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_client_set_on_new_response_callback( rmw_client_t * rmw_client, - rmw_event_callback_t callback, - const void * user_data); + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * Subscription API @@ -574,9 +574,9 @@ rmw_api_connextdds_return_loaned_message_from_subscription( RMW_CONNEXTDDS_PUBLIC rmw_ret_t rmw_api_connextdds_subscription_set_on_new_message_callback( - rmw_subscription_t * rmw_subscription, - rmw_event_callback_t callback, - const void * user_data); + rmw_subscription_t * const rmw_subscription, + const rmw_event_callback_t callback, + const void * const user_data); /***************************************************************************** * WaitSet API diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 5bf93f0c..09a0b369 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -462,6 +462,30 @@ class RMW_Connext_Subscriber return has_data; } + rmw_ret_t + count_unread_samples(size_t & unread_count) + { + // The action of counting unread samples is not currently mutually exclusive + // with the action of taking samples out of the reader cache. Unfortunately + // we cannot use a mutex to synchronize calls between + // count_unread_samples() and and take_next() because we would run the risk + // of a deadlock. This is because count_unread_samples() is supposed to be + // called from within the reader's "exclusive area" (since it is + // executed within a listener callback), while take_next() is usually called + // from an executor/application thread and it must acquire the reader's + // "exclusive area" before taking samples. + // This might mean that an application which relies on data callbacks and the + // count provided by this function, and which at the same time polls data + // on the subscription, might end up missing some samples in the total count + // notified to it via callback because the samples were taken out of the + // cache before they could be notified to the listener. + // Fortunately, in Connext the listener callback is notified right after a + // sample is added to the reader cache and before any mutex is released, + // which should allow this function to always report a correct number, as + // long as it is only called within a (DDS) listener callback. + return rmw_connextdds_count_unread_samples(this, unread_count); + } + DDS_Subscriber * dds_subscriber() const { return DDS_DataReader_get_subscriber(this->dds_reader); diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 09c911e4..f57cca88 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -15,6 +15,7 @@ #ifndef RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_ #define RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_ +#include #include "rmw_connextdds/context.hpp" /****************************************************************************** @@ -121,6 +122,22 @@ class RMW_Connext_Condition } } + template + void + perform_action_and_update_state(FunctorA && action, FunctorT && update_condition) + { + std::lock_guard internal_lock(this->mutex_internal); + + action(); + + if (nullptr != this->waitset_mutex) { + std::lock_guard lock(*this->waitset_mutex); + update_condition(); + } else { + update_condition(); + } + } + protected: std::mutex mutex_internal; std::mutex * waitset_mutex; @@ -333,8 +350,55 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition virtual bool has_status(const rmw_event_type_t event_type) = 0; + void + notify_new_event(rmw_event_type_t event_type) + { + std::unique_lock lock_mutex(new_event_mutex_); + if (new_event_cb_[event_type]) { + new_event_cb_[event_type](user_data_[event_type], 1); + } else { + unread_events_count_[event_type]++; + } + } + + void + set_new_event_callback( + rmw_event_type_t event_type, + rmw_event_callback_t callback, + const void * user_data) + { + std::unique_lock lock_mutex(new_event_mutex_); + + if (callback) { + // Push events arrived before setting the executor's callback + if (unread_events_count_[event_type] > 0) { + callback(user_data, unread_events_count_[event_type]); + unread_events_count_[event_type] = 0; + } + user_data_[event_type] = user_data; + new_event_cb_[event_type] = callback; + } else { + user_data_[event_type] = nullptr; + new_event_cb_[event_type] = nullptr; + } + } + + void + on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status); + + void + update_status_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status); + protected: DDS_StatusCondition * scond; + std::mutex new_event_mutex_; + rmw_event_callback_t new_event_cb_[RMW_EVENT_INVALID] = {}; + const void * user_data_[RMW_EVENT_INVALID] = {}; + uint64_t unread_events_count_[RMW_EVENT_INVALID] = {0}; + + bool triggered_inconsistent_topic{false}; + + struct DDS_InconsistentTopicStatus status_inconsistent_topic; }; void @@ -712,6 +776,26 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition return RMW_RET_OK; } + void set_on_new_data_callback( + const rmw_event_callback_t callback, + const void * const user_data) + { + std::unique_lock lock(new_data_event_mutex_); + if (callback) { + if (unread_data_events_count_ > 0) { + callback(user_data, unread_data_events_count_); + unread_data_events_count_ = 0; + } + new_data_event_cb_ = callback; + data_event_user_data_ = user_data; + } else { + new_data_event_cb_ = nullptr; + data_event_user_data_ = nullptr; + } + } + + void notify_new_data(); + protected: void update_status_deadline( const DDS_RequestedDeadlineMissedStatus * const status); @@ -745,6 +829,11 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition RMW_Connext_Subscriber * sub; + std::mutex new_data_event_mutex_; + rmw_event_callback_t new_data_event_cb_{nullptr}; + const void * data_event_user_data_{nullptr}; + uint64_t unread_data_events_count_ = 0; + friend class RMW_Connext_WaitSet; }; diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index c8e10121..6f3ae332 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -121,6 +121,7 @@ RMW_Connext_DataReaderListener_on_data_available( UNUSED_ARG(reader); + self->notify_new_data(); self->set_data_available(true); } @@ -706,6 +707,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; + + this->notify_new_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED); } void @@ -720,6 +723,8 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness( this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count; this->status_liveliness.not_alive_count_change -= this->status_liveliness_last.not_alive_count; + + this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED); } void @@ -731,6 +736,7 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos( this->status_qos.total_count_change = this->status_qos.total_count; this->status_qos.total_count_change -= this->status_qos_last.total_count; + this->notify_new_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE); } void @@ -743,6 +749,31 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost( this->status_sample_lost.total_count_change = this->status_sample_lost.total_count; this->status_sample_lost.total_count_change -= this->status_sample_lost_last.total_count; + this->notify_new_event(RMW_EVENT_MESSAGE_LOST); +} + +void +RMW_Connext_SubscriberStatusCondition::notify_new_data() +{ + size_t unread_samples = 0; + std::unique_lock lock_mutex(new_data_event_mutex_); + perform_action_and_update_state( + [this, &unread_samples]() { + const rmw_ret_t rc = this->sub->count_unread_samples(unread_samples); + if (RMW_RET_OK != rc) { + RMW_CONNEXT_LOG_ERROR("failed to count unread samples on DDS Reader") + } + }, + [this, &unread_samples]() { + if (unread_samples == 0) { + return; + } + if (new_data_event_cb_) { + new_data_event_cb_(data_event_user_data_, unread_samples); + } else { + unread_data_events_count_ += unread_samples; + } + }); } rmw_ret_t @@ -852,6 +883,8 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline( this->status_deadline.total_count_change = this->status_deadline.total_count; this->status_deadline.total_count_change -= this->status_deadline_last.total_count; + + this->notify_new_event(RMW_EVENT_OFFERED_DEADLINE_MISSED); } void @@ -863,6 +896,8 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness( this->status_liveliness.total_count_change = this->status_liveliness.total_count; this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count; + + this->notify_new_event(RMW_EVENT_LIVELINESS_CHANGED); } void @@ -874,4 +909,6 @@ RMW_Connext_PublisherStatusCondition::update_status_qos( this->status_qos.total_count_change = this->status_qos.total_count; this->status_qos.total_count_change -= this->status_qos_last.total_count; + + this->notify_new_event(RMW_EVENT_OFFERED_QOS_INCOMPATIBLE); } diff --git a/rmw_connextdds_common/src/common/rmw_listener.cpp b/rmw_connextdds_common/src/common/rmw_listener.cpp index 730eaa92..1b907b8a 100644 --- a/rmw_connextdds_common/src/common/rmw_listener.cpp +++ b/rmw_connextdds_common/src/common/rmw_listener.cpp @@ -19,15 +19,25 @@ ******************************************************************************/ rmw_ret_t rmw_api_connextdds_event_set_callback( - rmw_event_t * event, - rmw_event_callback_t callback, - const void * user_data) + rmw_event_t * const event, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(event); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_event_set_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(event, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + event, + event->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INVALID_ARGUMENT); + + RMW_Connext_StatusCondition * condition = nullptr; + if (RMW_Connext_Event::reader_event(event)) { + condition = RMW_Connext_Event::subscriber(event)->condition(); + } else { + condition = RMW_Connext_Event::publisher(event)->condition(); + } + condition->set_new_event_callback(event->event_type, callback, user_data); + return RMW_RET_OK; } /****************************************************************************** @@ -35,28 +45,40 @@ rmw_api_connextdds_event_set_callback( ******************************************************************************/ rmw_ret_t rmw_api_connextdds_service_set_on_new_request_callback( - rmw_service_t * rmw_service, - rmw_event_callback_t callback, - const void * user_data) + rmw_service_t * const service, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_service); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_service_set_on_new_request_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Service * const svc_impl = + reinterpret_cast(service->data); + svc_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } rmw_ret_t rmw_api_connextdds_client_set_on_new_response_callback( - rmw_client_t * rmw_client, - rmw_event_callback_t callback, - const void * user_data) + rmw_client_t * const client, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_client); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_client_set_on_new_response_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Client * const client_impl = + reinterpret_cast(client->data); + client_impl->subscriber()->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } /****************************************************************************** @@ -64,13 +86,19 @@ rmw_api_connextdds_client_set_on_new_response_callback( ******************************************************************************/ rmw_ret_t rmw_api_connextdds_subscription_set_on_new_message_callback( - rmw_subscription_t * rmw_subscription, - rmw_event_callback_t callback, - const void * user_data) + rmw_subscription_t * const subscription, + const rmw_event_callback_t callback, + const void * const user_data) { - UNUSED_ARG(rmw_subscription); - UNUSED_ARG(callback); - UNUSED_ARG(user_data); - RMW_CONNEXT_LOG_ERROR_SET("rmw_subscription_set_on_new_message_callback not implemented") - return RMW_RET_UNSUPPORTED; + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, + subscription->implementation_identifier, + RMW_CONNEXTDDS_ID, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + + RMW_Connext_Subscriber * const sub_impl = + reinterpret_cast(subscription->data); + sub_impl->condition()->set_on_new_data_callback(callback, user_data); + return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index 8cc2c123..9ef5ae4b 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -804,19 +804,29 @@ rmw_connextdds_return_samples( { void ** data_buffer = reinterpret_cast( RMW_Connext_MessagePtrSeq_get_contiguous_buffer(sub->data_seq())); - const DDS_Long data_len = - RMW_Connext_MessagePtrSeq_get_length(sub->data_seq()); if (!RMW_Connext_MessagePtrSeq_unloan(sub->data_seq())) { RMW_CONNEXT_LOG_ERROR_SET("failed to unloan sample sequence") return RMW_RET_ERROR; } + // DDS_DataReader_return_loan_untypedI is an internal API, and + // its signature changed slightly in Connext 7.x. +#if RTI_DDS_VERSION_MAJOR < 7 + const DDS_Long data_len = + RMW_Connext_MessagePtrSeq_get_length(sub->data_seq()); if (DDS_RETCODE_OK != DDS_DataReader_return_loan_untypedI( sub->reader(), data_buffer, data_len, sub->info_seq())) +#else + if (DDS_RETCODE_OK != + DDS_DataReader_return_loan_untypedI( + sub->reader(), + data_buffer, + sub->info_seq())) +#endif // RTI_DDS_VERSION_MAJOR < 7 { RMW_CONNEXT_LOG_ERROR_SET("failed to return loan to DDS reader") return RMW_RET_ERROR; @@ -824,6 +834,71 @@ rmw_connextdds_return_samples( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_count_unread_samples( + RMW_Connext_Subscriber * const sub, + size_t & unread_count) +{ + DDS_Boolean is_loan = DDS_BOOLEAN_TRUE; + DDS_Long data_len = 0; + void ** data_buffer = nullptr; + DDS_SampleInfoSeq info_seq = DDS_SEQUENCE_INITIALIZER; + + unread_count = 0; + DDS_ReturnCode_t rc = DDS_RETCODE_ERROR; + do { + rc = DDS_DataReader_read_or_take_instance_untypedI( + sub->reader(), + &is_loan, + &data_buffer, + &data_len, + &info_seq, + 0 /* data_seq_len */, + 0 /* data_seq_max_len */, + DDS_BOOLEAN_TRUE /* data_seq_has_ownership */, + NULL /* data_seq_contiguous_buffer_for_copy */, + 1 /* data_size -- ignored because loaning*/, + DDS_LENGTH_UNLIMITED /* max_samples */, + &DDS_HANDLE_NIL /* a_handle */, + #if !RMW_CONNEXT_DDS_API_PRO_LEGACY + NULL /* topic_query_guid */, + #endif /* RMW_CONNEXT_DDS_API_PRO_LEGACY */ + DDS_NOT_READ_SAMPLE_STATE, + DDS_ANY_VIEW_STATE, + DDS_ANY_INSTANCE_STATE, + DDS_BOOLEAN_FALSE /* take */); + if (DDS_RETCODE_NO_DATA == rc) { + continue; + } + if (DDS_RETCODE_OK != rc && DDS_RETCODE_NO_DATA != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to read data from DDS reader") + return RMW_RET_ERROR; + } + if (DDS_RETCODE_OK == rc) { + unread_count += data_len; + // DDS_DataReader_return_loan_untypedI is an internal API, and + // its signature changed slightly in Connext 7.x. +#if RTI_DDS_VERSION_MAJOR < 7 + rc = DDS_DataReader_return_loan_untypedI( + sub->reader(), + data_buffer, + data_len, + &info_seq); +#else + rc = DDS_DataReader_return_loan_untypedI( + sub->reader(), + data_buffer, + &info_seq); +#endif // RTI_DDS_VERSION_MAJOR < 7 + if (DDS_RETCODE_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to return loan to DDS reader") + return RMW_RET_ERROR; + } + } + } while (rc == DDS_RETCODE_OK); + + return RMW_RET_OK; +} rmw_ret_t rmw_connextdds_filter_sample( diff --git a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp index bdc6d4ac..36913e9e 100644 --- a/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/dds_api_rtime.cpp @@ -1203,6 +1203,40 @@ rmw_connextdds_return_samples( return RMW_RET_OK; } +rmw_ret_t +rmw_connextdds_count_unread_samples( + RMW_Connext_Subscriber * const sub, + size_t & unread_count) +{ + unread_count = 0; + DDS_SampleInfoSeq info_seq = DDS_SEQUENCE_INITIALIZER; + DDS_UntypedSampleSeq data_seq = DDS_SEQUENCE_INITIALIZER; + DDS_ReturnCode_t rc = DDS_RETCODE_ERROR; + do { + rc = DDS_DataReader_read( + sub->reader(), + &data_seq, + &info_seq, + DDS_LENGTH_UNLIMITED, + DDS_ANY_VIEW_STATE, + DDS_NOT_READ_SAMPLE_STATE, + DDS_ANY_INSTANCE_STATE); + if (DDS_RETCODE_OK != rc && DDS_RETCODE_NO_DATA != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to read data from DDS reader") + return RMW_RET_ERROR; + } + if (DDS_RETCODE_OK == rc) { + unread_count += DDS_UntypedSampleSeq_get_length(&data_seq); + rc = DDS_DataReader_return_loan(sub->reader(), &data_seq, &info_seq); + if (DDS_RETCODE_OK != rc) { + RMW_CONNEXT_LOG_ERROR_SET("failed to return loan to DDS reader") + return RMW_RET_ERROR; + } + } + } while (rc == DDS_RETCODE_OK); + return RMW_RET_OK; +} + rmw_ret_t rmw_connextdds_filter_sample( RMW_Connext_Subscriber * const sub,