diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index ac20a63d0..eb92fd63c 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -47,6 +47,8 @@ find_package(rmw REQUIRED) include_directories(include) add_library(rmw_fastrtps_shared_cpp + src/custom_publisher_info.cpp + src/custom_subscriber_info.cpp src/demangle.cpp src/namespace_prefix.cpp src/qos.cpp diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp new file mode 100644 index 000000000..d3139afe2 --- /dev/null +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -0,0 +1,103 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ +#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "fastcdr/FastBuffer.h" + +#include "fastrtps/subscriber/SampleInfo.h" +#include "fastrtps/subscriber/Subscriber.h" +#include "fastrtps/subscriber/SubscriberListener.h" +#include "fastrtps/participant/Participant.h" +#include "fastrtps/publisher/Publisher.h" +#include "fastrtps/publisher/PublisherListener.h" + +#include "rmw/event.h" + +#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" + + +class EventListenerInterface +{ +protected: + class ConditionalScopedLock; + +public: + /// Connect a condition variable so a waiter can be notified of new data. + virtual void attachCondition( + std::mutex * conditionMutex, + std::condition_variable * conditionVariable) = 0; + + /// Unset the information from attachCondition. + virtual void detachCondition() = 0; + + /// Check if there is new data available for a specific event type. + /** + * \param event_type The event type to check on. + * \return `true` if new data is available. + */ + virtual bool hasEvent(rmw_event_type_t event_type) const = 0; + + /// Take ready data for an event type. + /** + * \param event_type The event type to get data for. + * \param event_info A preallocated event information (from rmw/types.h) to fill with data + * \return `true` if data was successfully taken. + * \return `false` if data was not available, in this case nothing was written to event_info. + */ + virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0; +}; + +class EventListenerInterface::ConditionalScopedLock +{ +public: + ConditionalScopedLock( + std::mutex * mutex, + std::condition_variable * condition_variable = nullptr) + : mutex_(mutex), cv_(condition_variable) + { + if (nullptr != mutex_) { + mutex_->lock(); + } + } + + ~ConditionalScopedLock() + { + if (nullptr != mutex_) { + mutex_->unlock(); + if (nullptr != cv_) { + cv_->notify_all(); + } + } + } + +private: + std::mutex * mutex_; + std::condition_variable * cv_; +}; + +struct CustomEventInfo +{ + virtual EventListenerInterface * getListener() const = 0; +}; + +#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index ceb902fc2..1a5ed0e2b 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -15,7 +15,9 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ +#include #include +#include #include #include "fastrtps/publisher/Publisher.h" @@ -25,29 +27,43 @@ #include "rmw/rmw.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" + class PubListener; -typedef struct CustomPublisherInfo +typedef struct CustomPublisherInfo : public CustomEventInfo { + virtual ~CustomPublisherInfo() = default; + eprosima::fastrtps::Publisher * publisher_; PubListener * listener_; rmw_fastrtps_shared_cpp::TypeSupport * type_support_; rmw_gid_t publisher_gid; const char * typesupport_identifier_; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + EventListenerInterface * + getListener() const final; } CustomPublisherInfo; -class PubListener : public eprosima::fastrtps::PublisherListener +class PubListener : public EventListenerInterface, public eprosima::fastrtps::PublisherListener { public: explicit PubListener(CustomPublisherInfo * info) + : deadline_changes_(false), + liveliness_changes_(false), + conditionMutex_(nullptr), + conditionVariable_(nullptr) { (void) info; } + // PublisherListener implementation + RMW_FASTRTPS_SHARED_CPP_PUBLIC void onPublicationMatched( - eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) + eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) final { (void) pub; std::lock_guard lock(internalMutex_); @@ -58,16 +74,67 @@ class PubListener : public eprosima::fastrtps::PublisherListener } } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_offered_deadline_missed( + eprosima::fastrtps::Publisher * publisher, + const eprosima::fastrtps::OfferedDeadlineMissedStatus & status) final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_liveliness_lost( + eprosima::fastrtps::Publisher * publisher, + const eprosima::fastrtps::LivelinessLostStatus & status) final; + + + // EventListenerInterface implementation + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool + hasEvent(rmw_event_type_t event_type) const final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool + takeNextEvent(rmw_event_type_t event_type, void * event_info) final; + + // PubListener API size_t subscriptionCount() { std::lock_guard lock(internalMutex_); return subscriptions_.size(); } + void + attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) + { + std::lock_guard lock(internalMutex_); + conditionMutex_ = conditionMutex; + conditionVariable_ = conditionVariable; + } + + void + detachCondition() + { + std::lock_guard lock(internalMutex_); + conditionMutex_ = nullptr; + conditionVariable_ = nullptr; + } + private: - std::mutex internalMutex_; - std::set - subscriptions_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + mutable std::mutex internalMutex_; + + std::set subscriptions_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + std::atomic_bool deadline_changes_; + eprosima::fastrtps::OfferedDeadlineMissedStatus offered_deadline_missed_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + std::atomic_bool liveliness_changes_; + eprosima::fastrtps::LivelinessLostStatus liveliness_lost_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 2aba576a7..929fc625d 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -26,35 +26,47 @@ #include "rcpputils/thread_safety_annotations.hpp" +#include "rmw/impl/cpp/macros.hpp" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" + class SubListener; -typedef struct CustomSubscriberInfo +typedef struct CustomSubscriberInfo : public CustomEventInfo { + virtual ~CustomSubscriberInfo() = default; + eprosima::fastrtps::Subscriber * subscriber_; SubListener * listener_; rmw_fastrtps_shared_cpp::TypeSupport * type_support_; const char * typesupport_identifier_; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + EventListenerInterface * + getListener() const final; } CustomSubscriberInfo; -class SubListener : public eprosima::fastrtps::SubscriberListener +class SubListener : public EventListenerInterface, public eprosima::fastrtps::SubscriberListener { public: explicit SubListener(CustomSubscriberInfo * info) : data_(0), - conditionMutex_(nullptr), conditionVariable_(nullptr) + deadline_changes_(false), + liveliness_changes_(false), + conditionMutex_(nullptr), + conditionVariable_(nullptr) { // Field is not used right now (void)info; } + // SubscriberListener implementation void onSubscriptionMatched( - eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) + eprosima::fastrtps::Subscriber * /*sub*/, eprosima::fastrtps::rtps::MatchingInfo & info) final { - (void)sub; - std::lock_guard lock(internalMutex_); if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) { publishers_.insert(info.remoteEndpointGuid); @@ -64,23 +76,39 @@ class SubListener : public eprosima::fastrtps::SubscriberListener } void - onNewDataMessage(eprosima::fastrtps::Subscriber * sub) + onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final { - (void)sub; std::lock_guard lock(internalMutex_); - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - // the change to data_ needs to be mutually exclusive with rmw_wait() - // which checks hasData() and decides if wait() needs to be called - data_ = sub->getUnreadCount(); - clock.unlock(); - conditionVariable_->notify_one(); - } else { - data_ = sub->getUnreadCount(); - } + // the change to liveliness_lost_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + data_.store(sub->getUnreadCount(), std::memory_order_relaxed); } + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_requested_deadline_missed( + eprosima::fastrtps::Subscriber *, + const eprosima::fastrtps::RequestedDeadlineMissedStatus &) final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void + on_liveliness_changed( + eprosima::fastrtps::Subscriber *, + const eprosima::fastrtps::LivelinessChangedStatus &) final; + + // EventListenerInterface implementation + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool + hasEvent(rmw_event_type_t event_type) const final; + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + bool + takeNextEvent(rmw_event_type_t event_type, void * event_info) final; + + // SubListener API void attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) { @@ -98,22 +126,17 @@ class SubListener : public eprosima::fastrtps::SubscriberListener } bool - hasData() + hasData() const { - return data_ > 0; + return data_.load(std::memory_order_relaxed) > 0; } void data_taken(eprosima::fastrtps::Subscriber * sub) { std::lock_guard lock(internalMutex_); - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - data_ = sub->getUnreadCount(); - } else { - data_ = sub->getUnreadCount(); - } + ConditionalScopedLock clock(conditionMutex_); + data_.store(sub->getUnreadCount(), std::memory_order_relaxed); } size_t publisherCount() @@ -123,8 +146,18 @@ class SubListener : public eprosima::fastrtps::SubscriberListener } private: - std::mutex internalMutex_; + mutable std::mutex internalMutex_; + std::atomic_size_t data_; + + std::atomic_bool deadline_changes_; + eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_missed_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + std::atomic_bool liveliness_changes_; + eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_ + RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp index 90f77f24a..fce19d99f 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/qos.hpp @@ -42,12 +42,6 @@ get_datawriter_qos( RMW_FASTRTPS_SHARED_CPP_PUBLIC bool -is_time_default( - const rmw_time_t & time); - -RMW_FASTRTPS_SHARED_CPP_PUBLIC -bool -is_valid_qos( - const rmw_qos_profile_t & qos_policies); +is_valid_qos(const rmw_qos_profile_t & qos_policies); #endif // RMW_FASTRTPS_SHARED_CPP__QOS_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp new file mode 100644 index 000000000..20305fcb2 --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -0,0 +1,101 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp" + +EventListenerInterface * +CustomPublisherInfo::getListener() const +{ + return listener_; +} + +void +PubListener::on_offered_deadline_missed( + eprosima::fastrtps::Publisher * /* publisher */, + const eprosima::fastrtps::OfferedDeadlineMissedStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to liveliness_lost_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + offered_deadline_missed_status_.total_count = status.total_count; + // Accumulate deltas + offered_deadline_missed_status_.total_count_change += status.total_count_change; + + deadline_changes_.store(true, std::memory_order_relaxed); +} + +void PubListener::on_liveliness_lost( + eprosima::fastrtps::Publisher * /* publisher */, + const eprosima::fastrtps::LivelinessLostStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to liveliness_lost_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + liveliness_lost_status_.total_count = status.total_count; + // Accumulate deltas + liveliness_lost_status_.total_count_change += status.total_count_change; + + liveliness_changes_.store(true, std::memory_order_relaxed); +} + +bool PubListener::hasEvent(rmw_event_type_t event_type) const +{ + switch (event_type) { + case RMW_EVENT_LIVELINESS_LOST: + return liveliness_changes_.load(std::memory_order_relaxed); + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + return deadline_changes_.load(std::memory_order_relaxed); + default: + break; + } + return false; +} + +bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) +{ + std::lock_guard lock(internalMutex_); + switch (event_type) { + case RMW_EVENT_LIVELINESS_LOST: + { + rmw_liveliness_lost_status_t * rmw_data = + static_cast(event_info); + rmw_data->total_count = liveliness_lost_status_.total_count; + rmw_data->total_count_change = liveliness_lost_status_.total_count_change; + liveliness_lost_status_.total_count_change = 0; + liveliness_changes_.store(false, std::memory_order_relaxed); + } + break; + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + { + rmw_offered_deadline_missed_status_t * rmw_data = + static_cast(event_info); + rmw_data->total_count = offered_deadline_missed_status_.total_count; + rmw_data->total_count_change = offered_deadline_missed_status_.total_count_change; + offered_deadline_missed_status_.total_count_change = 0; + deadline_changes_.store(false, std::memory_order_relaxed); + } + break; + default: + return false; + } + return true; +} diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp new file mode 100644 index 000000000..d3f683502 --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -0,0 +1,106 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp" + +EventListenerInterface * +CustomSubscriberInfo::getListener() const +{ + return listener_; +} + +void +SubListener::on_requested_deadline_missed( + eprosima::fastrtps::Subscriber * /* subscriber */, + const eprosima::fastrtps::RequestedDeadlineMissedStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to liveliness_lost_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + requested_deadline_missed_status_.total_count = status.total_count; + // Accumulate deltas + requested_deadline_missed_status_.total_count_change += status.total_count_change; + + deadline_changes_.store(true, std::memory_order_relaxed); +} + +void SubListener::on_liveliness_changed( + eprosima::fastrtps::Subscriber * /* subscriber */, + const eprosima::fastrtps::LivelinessChangedStatus & status) +{ + std::lock_guard lock(internalMutex_); + + // the change to liveliness_lost_count_ needs to be mutually exclusive with + // rmw_wait() which checks hasEvent() and decides if wait() needs to be called + ConditionalScopedLock clock(conditionMutex_, conditionVariable_); + + // Assign absolute values + liveliness_changed_status_.alive_count = status.alive_count; + liveliness_changed_status_.not_alive_count = status.not_alive_count; + // Accumulate deltas + liveliness_changed_status_.alive_count_change += status.alive_count_change; + liveliness_changed_status_.not_alive_count_change += status.not_alive_count_change; + + liveliness_changes_.store(true, std::memory_order_relaxed); +} + +bool SubListener::hasEvent(rmw_event_type_t event_type) const +{ + switch (event_type) { + case RMW_EVENT_LIVELINESS_CHANGED: + return liveliness_changes_.load(std::memory_order_relaxed); + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + return deadline_changes_.load(std::memory_order_relaxed); + default: + break; + } + return false; +} + +bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) +{ + std::lock_guard lock(internalMutex_); + switch (event_type) { + case RMW_EVENT_LIVELINESS_CHANGED: + { + rmw_liveliness_changed_status_t * rmw_data = + static_cast(event_info); + rmw_data->alive_count = liveliness_changed_status_.alive_count; + rmw_data->not_alive_count = liveliness_changed_status_.not_alive_count; + rmw_data->alive_count_change = liveliness_changed_status_.alive_count_change; + rmw_data->not_alive_count_change = liveliness_changed_status_.not_alive_count_change; + liveliness_changed_status_.alive_count_change = 0; + liveliness_changed_status_.not_alive_count_change = 0; + liveliness_changes_.store(false, std::memory_order_relaxed); + } + break; + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + { + rmw_requested_deadline_missed_status_t * rmw_data = + static_cast(event_info); + rmw_data->total_count = requested_deadline_missed_status_.total_count; + rmw_data->total_count_change = requested_deadline_missed_status_.total_count_change; + requested_deadline_missed_status_.total_count_change = 0; + deadline_changes_.store(false, std::memory_order_relaxed); + } + break; + default: + return false; + } + return true; +} diff --git a/rmw_fastrtps_shared_cpp/src/qos.cpp b/rmw_fastrtps_shared_cpp/src/qos.cpp index 4228b24bb..a07eaca67 100644 --- a/rmw_fastrtps_shared_cpp/src/qos.cpp +++ b/rmw_fastrtps_shared_cpp/src/qos.cpp @@ -21,84 +21,34 @@ #include "rmw/error_handling.h" -bool -get_datareader_qos( - const rmw_qos_profile_t & qos_policies, - eprosima::fastrtps::SubscriberAttributes & sattr) +static +eprosima::fastrtps::Duration_t +rmw_time_to_fastrtps(const rmw_time_t & time) { - switch (qos_policies.history) { - case RMW_QOS_POLICY_HISTORY_KEEP_LAST: - sattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_LAST_HISTORY_QOS; - break; - case RMW_QOS_POLICY_HISTORY_KEEP_ALL: - sattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_ALL_HISTORY_QOS; - break; - case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: - break; - default: - RMW_SET_ERROR_MSG("Unknown QoS history policy"); - return false; - } - - switch (qos_policies.reliability) { - case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: - sattr.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; - break; - case RMW_QOS_POLICY_RELIABILITY_RELIABLE: - sattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; - break; - case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: - break; - default: - RMW_SET_ERROR_MSG("Unknown QoS reliability policy"); - return false; - } - - switch (qos_policies.durability) { - case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: - sattr.qos.m_durability.kind = eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS; - break; - case RMW_QOS_POLICY_DURABILITY_VOLATILE: - sattr.qos.m_durability.kind = eprosima::fastrtps::VOLATILE_DURABILITY_QOS; - break; - case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: - break; - default: - RMW_SET_ERROR_MSG("Unknown QoS durability policy"); - return false; - } - - if (qos_policies.depth != RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT) { - sattr.topic.historyQos.depth = static_cast(qos_policies.depth); - } - - // ensure the history depth is at least the requested queue size - assert(sattr.topic.historyQos.depth >= 0); - if ( - eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == sattr.topic.historyQos.kind && - static_cast(sattr.topic.historyQos.depth) < qos_policies.depth) - { - if (qos_policies.depth > (std::numeric_limits::max)()) { - RMW_SET_ERROR_MSG( - "failed to set history depth since the requested queue size exceeds the DDS type"); - return false; - } - sattr.topic.historyQos.depth = static_cast(qos_policies.depth); - } - - return true; + return eprosima::fastrtps::Duration_t( + static_cast(time.sec), + static_cast(time.nsec)); } +static bool -get_datawriter_qos( - const rmw_qos_profile_t & qos_policies, eprosima::fastrtps::PublisherAttributes & pattr) +is_time_default(const rmw_time_t & time) +{ + return time.sec == 0 && time.nsec == 0; +} + +template +bool fill_entity_qos_from_profile( + const rmw_qos_profile_t & qos_policies, + DDSEntityQos & entity_qos, + eprosima::fastrtps::HistoryQosPolicy & history_qos) { switch (qos_policies.history) { case RMW_QOS_POLICY_HISTORY_KEEP_LAST: - pattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_LAST_HISTORY_QOS; + history_qos.kind = eprosima::fastrtps::KEEP_LAST_HISTORY_QOS; break; case RMW_QOS_POLICY_HISTORY_KEEP_ALL: - pattr.topic.historyQos.kind = eprosima::fastrtps::KEEP_ALL_HISTORY_QOS; + history_qos.kind = eprosima::fastrtps::KEEP_ALL_HISTORY_QOS; break; case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: break; @@ -109,10 +59,10 @@ get_datawriter_qos( switch (qos_policies.durability) { case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: - pattr.qos.m_durability.kind = eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS; + entity_qos.m_durability.kind = eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS; break; case RMW_QOS_POLICY_DURABILITY_VOLATILE: - pattr.qos.m_durability.kind = eprosima::fastrtps::VOLATILE_DURABILITY_QOS; + entity_qos.m_durability.kind = eprosima::fastrtps::VOLATILE_DURABILITY_QOS; break; case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: break; @@ -123,10 +73,10 @@ get_datawriter_qos( switch (qos_policies.reliability) { case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: - pattr.qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; + entity_qos.m_reliability.kind = eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS; break; case RMW_QOS_POLICY_RELIABILITY_RELIABLE: - pattr.qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; + entity_qos.m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS; break; case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: break; @@ -136,49 +86,84 @@ get_datawriter_qos( } if (qos_policies.depth != RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT) { - pattr.topic.historyQos.depth = static_cast(qos_policies.depth); + history_qos.depth = static_cast(qos_policies.depth); } // ensure the history depth is at least the requested queue size - assert(pattr.topic.historyQos.depth >= 0); + assert(history_qos.depth >= 0); if ( - eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == pattr.topic.historyQos.kind && - static_cast(pattr.topic.historyQos.depth) < qos_policies.depth) + eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history_qos.kind && + static_cast(history_qos.depth) < qos_policies.depth) { if (qos_policies.depth > (std::numeric_limits::max)()) { RMW_SET_ERROR_MSG( "failed to set history depth since the requested queue size exceeds the DDS type"); return false; } - pattr.topic.historyQos.depth = static_cast(qos_policies.depth); + history_qos.depth = static_cast(qos_policies.depth); + } + + if (!is_time_default(qos_policies.lifespan)) { + entity_qos.m_lifespan.duration = rmw_time_to_fastrtps(qos_policies.lifespan); + } + + if (!is_time_default(qos_policies.deadline)) { + entity_qos.m_deadline.period = rmw_time_to_fastrtps(qos_policies.deadline); + } + + switch (qos_policies.liveliness) { + case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: + entity_qos.m_liveliness.kind = eprosima::fastrtps::AUTOMATIC_LIVELINESS_QOS; + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: + entity_qos.m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS; + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: + entity_qos.m_liveliness.kind = eprosima::fastrtps::MANUAL_BY_TOPIC_LIVELINESS_QOS; + break; + case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT: + break; + default: + RMW_SET_ERROR_MSG("Unknown QoS Liveliness policy"); + return false; + } + if (!is_time_default(qos_policies.liveliness_lease_duration)) { + entity_qos.m_liveliness.lease_duration = + rmw_time_to_fastrtps(qos_policies.liveliness_lease_duration); + + // Docs suggest setting no higher than 0.7 * lease_duration, choosing 2/3 to give safe buffer. + // See doc at https://github.com/eProsima/Fast-RTPS/blob/ + // a8691a40be6b8460b01edde36ad8563170a3a35a/include/fastrtps/qos/QosPolicies.h#L223-L232 + double period_in_ns = entity_qos.m_liveliness.lease_duration.to_ns() * 2.0 / 3.0; + double period_in_s = RCUTILS_NS_TO_S(period_in_ns); + entity_qos.m_liveliness.announcement_period = eprosima::fastrtps::Duration_t(period_in_s); } return true; } bool -is_time_default( - const rmw_time_t & time) +get_datareader_qos( + const rmw_qos_profile_t & qos_policies, + eprosima::fastrtps::SubscriberAttributes & sattr) { - return time.sec == 0 && time.nsec == 0; + return fill_entity_qos_from_profile(qos_policies, sattr.qos, sattr.topic.historyQos); } bool -is_valid_qos( - const rmw_qos_profile_t & qos_policies) +get_datawriter_qos( + const rmw_qos_profile_t & qos_policies, eprosima::fastrtps::PublisherAttributes & pattr) +{ + return fill_entity_qos_from_profile(qos_policies, pattr.qos, pattr.topic.historyQos); +} + +bool +is_valid_qos(const rmw_qos_profile_t & qos_policies) { - if (!is_time_default(qos_policies.deadline)) { - RMW_SET_ERROR_MSG("Deadline unsupported for fastrtps"); - return false; - } - if (!is_time_default(qos_policies.lifespan)) { - RMW_SET_ERROR_MSG("Lifespan unsupported for fastrtps"); - return false; - } if (qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE || qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC) { - RMW_SET_ERROR_MSG("Liveliness unsupported for fastrtps"); + RMW_SET_ERROR_MSG("Manual liveliness unsupported for fastrtps"); return false; } return true; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index 56bb50f26..0e533f86f 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -89,21 +89,25 @@ __rmw_take_event( void * event_info, bool * taken) { - RCUTILS_CHECK_FOR_NULL_WITH_MSG( - event_handle, "event_handle pointer is null", return RMW_RET_ERROR); - RCUTILS_CHECK_FOR_NULL_WITH_MSG( - event_info, "event info output pointer is null", return RMW_RET_ERROR); - RCUTILS_CHECK_FOR_NULL_WITH_MSG(taken, "boolean flag for taken is null", return RMW_RET_ERROR); + RMW_CHECK_ARGUMENT_FOR_NULL(event_handle, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(event_info, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); *taken = false; - if (event_handle->implementation_identifier != identifier) { - RMW_SET_ERROR_MSG("event handle not from this implementation"); - return RMW_RET_ERROR; + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + event handle, + event_handle->implementation_identifier, + identifier, + return RMW_RET_ERROR); + + auto event = static_cast(event_handle->data); + if (event->getListener()->takeNextEvent(event_handle->event_type, event_info)) { + *taken = true; + return RMW_RET_OK; } - RMW_SET_ERROR_MSG("take_event is not yet implemented by rmw_fastrtps yet"); - return RMW_RET_UNSUPPORTED; + return RMW_RET_ERROR; } rmw_ret_t diff --git a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp index ea784a21a..86ce664e8 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp @@ -30,7 +30,8 @@ check_wait_set_for_data( const rmw_subscriptions_t * subscriptions, const rmw_guard_conditions_t * guard_conditions, const rmw_services_t * services, - const rmw_clients_t * clients) + const rmw_clients_t * clients, + const rmw_events_t * events) { if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { @@ -63,6 +64,16 @@ check_wait_set_for_data( } } + if (events) { + for (size_t i = 0; i < events->event_count; ++i) { + auto event = static_cast(events->events[i]); + auto custom_event_info = static_cast(event->data); + if (custom_event_info->getListener()->hasEvent(event->event_type)) { + return true; + } + } + } + if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { void * data = guard_conditions->guard_conditions[i]; @@ -131,14 +142,13 @@ __rmw_wait( } } - // TODO(mm318): implement attachCondition for events when feature becomes available in fastrtps - // if (events) { - // for (size_t i = 0; i < events->event_count; ++i) { - // void * data = events->events[i]; - // auto custom_event_info = static_cast(data); - // custom_event_info->getListener()->attachCondition(conditionMutex, conditionVariable); - // } - // } + if (events) { + for (size_t i = 0; i < events->event_count; ++i) { + auto event = static_cast(events->events[i]); + auto custom_event_info = static_cast(event->data); + custom_event_info->getListener()->attachCondition(conditionMutex, conditionVariable); + } + } if (guard_conditions) { for (size_t i = 0; i < guard_conditions->guard_condition_count; ++i) { @@ -154,9 +164,10 @@ __rmw_wait( // otherwise the decision to wait might be incorrect std::unique_lock lock(*conditionMutex); - bool hasData = check_wait_set_for_data(subscriptions, guard_conditions, services, clients); - auto predicate = [subscriptions, guard_conditions, services, clients]() { - return check_wait_set_for_data(subscriptions, guard_conditions, services, clients); + bool hasData = check_wait_set_for_data( + subscriptions, guard_conditions, services, clients, events); + auto predicate = [subscriptions, guard_conditions, services, clients, events]() { + return check_wait_set_for_data(subscriptions, guard_conditions, services, clients, events); }; bool timeout = false; @@ -213,17 +224,14 @@ __rmw_wait( } } - // TODO(mm318): implement detachCondition for events when feature becomes available in fastrtps - // For now, set all to NULL because data is not ready if (events) { for (size_t i = 0; i < events->event_count; ++i) { - events->events[i] = 0; - // void * data = events->events[i]; - // auto custom_event_info = static_cast(data); - // custom_event_info->getListener()->detachCondition(); - // if (!custom_event_info->getListener()->hasEvent()) { - // events->events[i] = 0; - // } + auto event = static_cast(events->events[i]); + auto custom_event_info = static_cast(event->data); + custom_event_info->getListener()->detachCondition(); + if (!custom_event_info->getListener()->hasEvent(event->event_type)) { + events->events[i] = nullptr; + } } }