diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 6a4b4d515..d0e13fc18 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -44,6 +44,7 @@ #include "rmw/event_callback_type.h" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class ClientListener; @@ -128,13 +129,7 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener list_has_data_.store(true); } - std::unique_lock lock_mutex(on_new_response_m_); - - if (on_new_response_cb_) { - on_new_response_cb_(user_data_, 1); - } else { - unread_count_++; - } + on_data_available_.call(); } } } @@ -198,20 +193,7 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(on_new_response_m_); - - if (callback) { - // Push events arrived before setting the the executor callback - if (unread_count_) { - callback(user_data, unread_count_); - unread_count_ = 0; - } - user_data_ = user_data; - on_new_response_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_response_cb_ = nullptr; - } + on_data_available_.set_callback(user_data, callback); } private: @@ -234,10 +216,8 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_; - rmw_event_callback_t on_new_response_cb_{nullptr}; - const void * user_data_{nullptr}; - std::mutex on_new_response_m_; - uint64_t unread_count_ = 0; + // Callback to call when the listener detects events + EventTypeCallback on_data_available_; }; class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp index d7df46611..5124b464c 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -15,8 +15,10 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ +#include #include #include +#include #include #include #include @@ -62,14 +64,10 @@ class EventListenerInterface // Provide handlers to perform an action when a // new event from this listener has ocurred - virtual void set_on_new_event_callback( + virtual rmw_ret_t set_on_new_event_callback( + rmw_event_type_t event_type, const void * user_data, rmw_event_callback_t callback) = 0; - - rmw_event_callback_t on_new_event_cb_{nullptr}; - const void * user_data_{nullptr}; - uint64_t unread_events_count_ = 0; - std::mutex on_new_event_m_; }; class EventListenerInterface::ConditionalScopedLock @@ -105,4 +103,51 @@ struct CustomEventInfo virtual EventListenerInterface * getListener() const = 0; }; +class EventTypeCallback +{ +public: + EventTypeCallback() = default; + + explicit EventTypeCallback(size_t depth) + { + history_depth_ = (depth > 0) ? depth : std::numeric_limits::max(); + } + + void set_callback(const void * user_data, rmw_event_callback_t callback) + { + std::lock_guard lock(mutex_); + + if (callback) { + if (unread_count_) { + size_t count = std::min(unread_count_, history_depth_); + callback(user_data, count); + unread_count_ = 0; + } + user_data_ = user_data; + callback_ = callback; + } else { + user_data_ = nullptr; + callback_ = nullptr; + } + } + + void call() + { + std::lock_guard lock(mutex_); + + if (callback_) { + callback_(user_data_, 1); + } else { + unread_count_++; + } + } + +private: + std::mutex mutex_; + rmw_event_callback_t callback_{nullptr}; + const void * user_data_{nullptr}; + size_t unread_count_{0}; + size_t history_depth_ = std::numeric_limits::max(); +}; + #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 37db4ca2c..8ce1f0cbb 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -107,7 +107,9 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds hasEvent(rmw_event_type_t event_type) const final; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void set_on_new_event_callback( + rmw_ret_t + set_on_new_event_callback( + rmw_event_type_t event_type, const void * user_data, rmw_event_callback_t callback) final; @@ -158,6 +160,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + // The callbacks to call when the listener detects events + EventTypeCallback on_liveliness_lost_; + EventTypeCallback on_offered_deadline_missed_; + EventTypeCallback on_offered_incompatible_qos_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 9d50c5a3d..ab5b8282e 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -42,6 +42,7 @@ #include "rmw/event_callback_type.h" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" #include "rmw_fastrtps_shared_cpp/guid_utils.hpp" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -249,13 +250,7 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener list_has_data_.store(true); } - std::unique_lock lock_mutex(on_new_request_m_); - - if (on_new_request_cb_) { - on_new_request_cb_(user_data_, 1); - } else { - unread_count_++; - } + on_data_available_.call(); } } } @@ -313,20 +308,7 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(on_new_request_m_); - - if (callback) { - // Push events arrived before setting the the executor callback - if (unread_count_) { - callback(user_data, unread_count_); - unread_count_ = 0; - } - user_data_ = user_data; - on_new_request_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_request_cb_ = nullptr; - } + on_data_available_.set_callback(user_data, callback); } private: @@ -337,10 +319,8 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - rmw_event_callback_t on_new_request_cb_{nullptr}; - const void * user_data_{nullptr}; - std::mutex on_new_request_m_; - uint64_t unread_count_ = 0; + // Callback to call when the listener detects events + EventTypeCallback on_data_available_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index a130f2cf6..1db3fa57f 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -16,7 +16,6 @@ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ #include -#include #include #include #include @@ -91,9 +90,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds sample_lost_changes_(false), incompatible_qos_changes_(false), conditionMutex_(nullptr), - conditionVariable_(nullptr) + conditionVariable_(nullptr), + on_data_available_(qos_depth) { - qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits::max(); // Field is not used right now (void)info; } @@ -115,19 +114,13 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds update_has_data(reader); } + RMW_FASTRTPS_SHARED_CPP_PUBLIC void - on_data_available(eprosima::fastdds::dds::DataReader * reader) final - { - update_has_data(reader); - - std::unique_lock lock_mutex(on_new_message_m_); + on_data_available(eprosima::fastdds::dds::DataReader * reader) final; - if (on_new_message_cb_) { - on_new_message_cb_(user_data_, 1); - } else { - new_data_unread_count_++; - } - } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + set_on_data_available_callback(const void * user_data, rmw_event_callback_t callback); RMW_FASTRTPS_SHARED_CPP_PUBLIC void @@ -159,7 +152,9 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds hasEvent(rmw_event_type_t event_type) const final; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void set_on_new_event_callback( + rmw_ret_t + set_on_new_event_callback( + rmw_event_type_t event_type, const void * user_data, rmw_event_callback_t callback) final; @@ -209,30 +204,6 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds return publishers_.size(); } - // Provide handlers to perform an action when a - // new event from this listener has ocurred - void - set_on_new_message_callback( - const void * user_data, - rmw_event_callback_t callback) - { - std::unique_lock lock_mutex(on_new_message_m_); - - if (callback) { - // Push events arrived before setting the executor's callback - if (new_data_unread_count_) { - auto unread_count = std::min(new_data_unread_count_, qos_depth_); - callback(user_data, unread_count); - new_data_unread_count_ = 0; - } - user_data_ = user_data; - on_new_message_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_message_cb_ = nullptr; - } - } - private: mutable std::mutex internalMutex_; @@ -259,10 +230,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds std::set publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - rmw_event_callback_t on_new_message_cb_{nullptr}; - std::mutex on_new_message_m_; - size_t qos_depth_; - size_t new_data_unread_count_ = 0; + // The callbacks to call when the listener detects events + EventTypeCallback on_sample_lost_; + EventTypeCallback on_data_available_; + EventTypeCallback on_liveliness_changed_; + EventTypeCallback on_requested_deadline_missed_; + EventTypeCallback on_requested_incompatible_qos_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 9fc6e4009..2bb61e6a4 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "rmw/error_handling.h" + #include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp" #include "fastdds/dds/core/status/BaseStatus.hpp" @@ -44,13 +46,7 @@ PubListener::on_offered_deadline_missed( deadline_changes_.store(true, std::memory_order_relaxed); - std::unique_lock lock_mutex(on_new_event_m_); - - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; - } + on_offered_deadline_missed_.call(); } void PubListener::on_liveliness_lost( @@ -70,13 +66,7 @@ void PubListener::on_liveliness_lost( liveliness_changes_.store(true, std::memory_order_relaxed); - std::unique_lock lock_mutex(on_new_event_m_); - - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; - } + on_liveliness_lost_.call(); } void PubListener::on_offered_incompatible_qos( @@ -96,6 +86,8 @@ void PubListener::on_offered_incompatible_qos( incompatible_qos_status_.total_count_change += status.total_count_change; incompatible_qos_changes_.store(true, std::memory_order_relaxed); + + on_offered_incompatible_qos_.call(); } bool PubListener::hasEvent(rmw_event_type_t event_type) const @@ -114,24 +106,27 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const return false; } -void PubListener::set_on_new_event_callback( +rmw_ret_t PubListener::set_on_new_event_callback( + rmw_event_type_t event_type, const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(on_new_event_m_); - - if (callback) { - // Push events arrived before setting the executor's callback - if (unread_events_count_) { - callback(user_data, unread_events_count_); - unread_events_count_ = 0; - } - user_data_ = user_data; - on_new_event_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_event_cb_ = nullptr; + switch (event_type) { + case RMW_EVENT_LIVELINESS_LOST: + on_liveliness_lost_.set_callback(user_data, callback); + break; + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + on_offered_deadline_missed_.set_callback(user_data, callback); + break; + case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: + on_offered_incompatible_qos_.set_callback(user_data, callback); + break; + default: + RMW_SET_ERROR_MSG("provided event_type is not supported"); + return RMW_RET_UNSUPPORTED; + break; } + return RMW_RET_OK; } bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 6c24c358f..138ad2c44 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -44,13 +44,7 @@ SubListener::on_requested_deadline_missed( deadline_changes_.store(true, std::memory_order_relaxed); - std::unique_lock lock_mutex(on_new_event_m_); - - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; - } + on_requested_deadline_missed_.call(); } void SubListener::on_liveliness_changed( @@ -72,13 +66,7 @@ void SubListener::on_liveliness_changed( liveliness_changes_.store(true, std::memory_order_relaxed); - std::unique_lock lock_mutex(on_new_event_m_); - - if (on_new_event_cb_) { - on_new_event_cb_(user_data_, 1); - } else { - unread_events_count_++; - } + on_liveliness_changed_.call(); } void SubListener::on_sample_lost( @@ -97,6 +85,8 @@ void SubListener::on_sample_lost( sample_lost_status_.total_count_change += status.total_count_change; sample_lost_changes_.store(true, std::memory_order_relaxed); + + on_sample_lost_.call(); } void SubListener::on_requested_incompatible_qos( @@ -116,6 +106,15 @@ void SubListener::on_requested_incompatible_qos( incompatible_qos_status_.total_count_change += status.total_count_change; incompatible_qos_changes_.store(true, std::memory_order_relaxed); + + on_requested_incompatible_qos_.call(); +} + +void SubListener::on_data_available( + eprosima::fastdds::dds::DataReader * reader) +{ + update_has_data(reader); + on_data_available_.call(); } bool SubListener::hasEvent(rmw_event_type_t event_type) const @@ -136,25 +135,37 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const return false; } -void SubListener::set_on_new_event_callback( +rmw_ret_t SubListener::set_on_new_event_callback( + rmw_event_type_t event_type, const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(on_new_event_m_); - - if (callback) { - // Push events arrived before setting the executor's callback - if (unread_events_count_) { - callback(user_data, unread_events_count_); - unread_events_count_ = 0; - } - user_data_ = user_data; - on_new_event_cb_ = callback; - } else { - user_data_ = nullptr; - on_new_event_cb_ = nullptr; - return; + switch (event_type) { + case RMW_EVENT_MESSAGE_LOST: + on_sample_lost_.set_callback(user_data, callback); + break; + case RMW_EVENT_LIVELINESS_CHANGED: + on_liveliness_changed_.set_callback(user_data, callback); + break; + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + on_requested_deadline_missed_.set_callback(user_data, callback); + break; + case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: + on_requested_incompatible_qos_.set_callback(user_data, callback); + break; + default: + RMW_SET_ERROR_MSG("provided event_type is not supported"); + return RMW_RET_UNSUPPORTED; + break; } + return RMW_RET_OK; +} + +void SubListener::set_on_data_available_callback( + const void * user_data, + rmw_event_callback_t callback) +{ + on_data_available_.set_callback(user_data, callback); } bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index ccc53753c..30898d281 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -102,10 +102,11 @@ __rmw_event_set_callback( const void * user_data) { auto custom_event_info = static_cast(rmw_event->data); - custom_event_info->getListener()->set_on_new_event_callback( - user_data, - callback); - return RMW_RET_OK; + + auto listener = custom_event_info->getListener(); + + return listener->set_on_new_event_callback( + rmw_event->event_type, user_data, callback); } } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index a521b1a69..1ccd44d88 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -267,7 +267,7 @@ __rmw_subscription_set_on_new_message_callback( const void * user_data) { auto custom_subscriber_info = static_cast(rmw_subscription->data); - custom_subscriber_info->listener_->set_on_new_message_callback( + custom_subscriber_info->listener_->set_on_data_available_callback( user_data, callback); return RMW_RET_OK;