diff --git a/rmw_connextdds_common/CMakeLists.txt b/rmw_connextdds_common/CMakeLists.txt index 9a60ef62..e2a94c01 100644 --- a/rmw_connextdds_common/CMakeLists.txt +++ b/rmw_connextdds_common/CMakeLists.txt @@ -81,15 +81,6 @@ function(rtirmw_add_library) CACHE INTERNAL "") endif() - if(NOT "${RMW_CONNEXT_WAITSET_MODE}" STREQUAL "") - string(TOUPPER "${RMW_CONNEXT_WAITSET_MODE}" rmw_connext_waitset_mode) - if(rmw_connext_waitset_mode STREQUAL "STD") - list(APPEND private_defines "RMW_CONNEXT_CPP_STD_WAITSETS=1") - endif() - set(RMW_CONNEXT_WAITSET_MODE "${RMW_CONNEXT_WAITSET_MODE}" - CACHE INTERNAL "") - endif() - target_compile_definitions(${_rti_build_NAME} PRIVATE ${private_defines}) # Causes the visibility macros to use dllexport rather than dllimport, @@ -136,7 +127,6 @@ set(RMW_CONNEXT_COMMON_SOURCE_CPP src/common/rmw_graph.cpp src/common/rmw_event.cpp src/common/rmw_impl.cpp - src/common/rmw_impl_waitset_dds.cpp src/common/rmw_impl_waitset_std.cpp src/common/rmw_info.cpp src/common/rmw_node.cpp @@ -159,7 +149,6 @@ set(RMW_CONNEXT_COMMON_SOURCE_HPP include/rmw_connextdds/resource_limits.hpp include/rmw_connextdds/rmw_impl.hpp include/rmw_connextdds/rmw_api_impl.hpp - include/rmw_connextdds/rmw_waitset_dds.hpp include/rmw_connextdds/rmw_waitset_std.hpp include/rmw_connextdds/scope_exit.hpp include/rmw_connextdds/static_config.hpp diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp index 36f28855..a40b5b30 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp @@ -41,4 +41,8 @@ typedef RMW_Connext_Uint8ArrayPtrSeq RMW_Connext_UntypedSampleSeq; // the guid with 0's. #define DDS_SampleIdentity_UNKNOWN DDS_SAMPLEIDENTITY_DEFAULT +// Convenience function to compare the first 12 bytes of the handle +#define DDS_InstanceHandle_compare_prefix(ih_a_, ih_b_) \ + memcmp((ih_a_)->keyHash.value, (ih_b_)->keyHash.value, 12) + #endif // RMW_CONNEXTDDS__DDS_API_NDDS_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp index 05eb487f..a658aaa0 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp @@ -66,4 +66,8 @@ struct DDS_LifespanQosPolicy; // the guid with 0's. #define DDS_SampleIdentity_UNKNOWN DDS_SAMPLE_IDENTITY_UNKNOWN +// Convenience function to compare the first 12 bytes of the handle +#define DDS_InstanceHandle_compare_prefix(ih_a_, ih_b_) \ + memcmp((ih_a_)->octet, (ih_b_)->octet, 12) + #endif // RMW_CONNEXTDDS__DDS_API_RTIME_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index beeabb5f..a62301e4 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -117,7 +117,6 @@ class RMW_Connext_Subscriber; class RMW_Connext_Client; class RMW_Connext_Service; -#include "rmw_connextdds/rmw_waitset_dds.hpp" #include "rmw_connextdds/rmw_waitset_std.hpp" /****************************************************************************** @@ -411,13 +410,13 @@ class RMW_Connext_Subscriber qos(rmw_qos_profile_t * const qos); rmw_ret_t - loan_messages(); + loan_messages(const bool update_condition = true); rmw_ret_t return_messages(); rmw_ret_t - loan_messages_if_needed() + loan_messages_if_needed(const bool update_condition = true) { rmw_ret_t rc = RMW_RET_OK; @@ -432,16 +431,12 @@ class RMW_Connext_Subscriber } } /* loan messages from reader */ - rc = this->loan_messages(); + rc = this->loan_messages(update_condition); if (RMW_RET_OK != rc) { return rc; } } - if (this->internal) { - return this->status_condition.trigger_loan_guard_condition(this->loan_len > 0); - } - return RMW_RET_OK; } @@ -477,7 +472,9 @@ class RMW_Connext_Subscriber has_data() { std::lock_guard lock(this->loan_mutex); - if (RMW_RET_OK != this->loan_messages_if_needed()) { + if (RMW_RET_OK != + this->loan_messages_if_needed(false /* update_condition */)) + { RMW_CONNEXT_LOG_ERROR("failed to check loaned messages") return false; } diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index 7aa0d811..a73a2715 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -17,16 +17,13 @@ #include "rmw_connextdds/context.hpp" -#if RMW_CONNEXT_CPP_STD_WAITSETS /****************************************************************************** * Alternative implementation of WaitSets and Conditions using C++ std ******************************************************************************/ class RMW_Connext_WaitSet { public: - RMW_Connext_WaitSet() - : waiting(false) - {} + RMW_Connext_WaitSet() {} rmw_ret_t wait( @@ -64,10 +61,9 @@ class RMW_Connext_WaitSet rmw_clients_t * const cls, rmw_events_t * const evs); + bool waiting{false}; std::mutex mutex_internal; - bool waiting; std::condition_variable condition; - std::mutex condition_mutex; }; class RMW_Connext_Condition @@ -79,26 +75,52 @@ class RMW_Connext_Condition waitset_condition(nullptr) {} + template void attach( std::mutex * const waitset_mutex, - std::condition_variable * const waitset_condition) + std::condition_variable * const waitset_condition, + bool & already_active, + FunctorT && check_trigger) { std::lock_guard lock(this->mutex_internal); - this->waitset_mutex = waitset_mutex; - this->waitset_condition = waitset_condition; + already_active = check_trigger(); + if (!already_active) { + this->waitset_mutex = waitset_mutex; + this->waitset_condition = waitset_condition; + } } + template void - detach() + detach(FunctorT && on_detached) { std::lock_guard lock(this->mutex_internal); this->waitset_mutex = nullptr; this->waitset_condition = nullptr; + on_detached(); } virtual bool owns(DDS_Condition * const cond) = 0; + template + void + update_state(FunctorT && update_condition, const bool notify) + { + std::lock_guard internal_lock(this->mutex_internal); + + if (nullptr != this->waitset_mutex) { + std::lock_guard lock(*this->waitset_mutex); + update_condition(); + } else { + update_condition(); + } + + if (notify && nullptr != this->waitset_condition) { + this->waitset_condition->notify_one(); + } + } + protected: std::mutex mutex_internal; std::mutex * waitset_mutex; @@ -133,6 +155,9 @@ class RMW_Connext_Condition } return RMW_RET_OK; } + + friend class RMW_Connext_WaitSet; + friend class RMW_Connext_Event; }; class RMW_Connext_GuardCondition : public RMW_Connext_Condition @@ -164,18 +189,6 @@ class RMW_Connext_GuardCondition : public RMW_Connext_Condition return this->gcond; } - bool - trigger_check() - { - return this->trigger_value.exchange(false); - } - - bool - has_triggered() - { - return this->trigger_value; - } - rmw_ret_t trigger() { @@ -184,23 +197,17 @@ class RMW_Connext_GuardCondition : public RMW_Connext_Condition DDS_GuardCondition_set_trigger_value( this->gcond, DDS_BOOLEAN_TRUE)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to trigger guard condition") + RMW_CONNEXT_LOG_ERROR_SET("failed to trigger internal guard condition") return RMW_RET_ERROR; } return RMW_RET_OK; } - std::lock_guard lock(this->mutex_internal); - - if (this->waitset_mutex != nullptr) { - std::unique_lock clock(*this->waitset_mutex); - this->trigger_value = true; - clock.unlock(); - this->waitset_condition->notify_one(); - } else { - this->trigger_value = true; - } + update_state( + [this]() { + this->trigger_value = true; + }, true /* notify */); return RMW_RET_OK; } @@ -223,9 +230,11 @@ class RMW_Connext_GuardCondition : public RMW_Connext_Condition } protected: - std::atomic_bool trigger_value; + bool trigger_value; bool internal; DDS_GuardCondition * gcond; + + friend class RMW_Connext_WaitSet; }; class RMW_Connext_StatusCondition : public RMW_Connext_Condition @@ -321,6 +330,9 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition return RMW_RET_OK; } + virtual bool + has_status(const rmw_event_type_t event_type) = 0; + protected: DDS_StatusCondition * scond; }; @@ -348,7 +360,7 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition public: explicit RMW_Connext_PublisherStatusCondition(DDS_DataWriter * const writer); - bool + virtual bool has_status(const rmw_event_type_t event_type); virtual rmw_ret_t @@ -394,11 +406,16 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition inline rmw_ret_t get_liveliness_lost_status(rmw_liveliness_lost_status_t * const status) { - status->total_count = this->status_liveliness.total_count; - status->total_count_change = this->status_liveliness.total_count_change; + update_state( + [this, status]() { + this->triggered_liveliness = false; + + status->total_count = this->status_liveliness.total_count; + status->total_count_change = this->status_liveliness.total_count_change; - this->status_liveliness.total_count_change = 0; - this->triggered_liveliness = false; + this->status_liveliness.total_count_change = 0; + this->status_liveliness_last = this->status_liveliness; + }, false /* notify */); return RMW_RET_OK; } @@ -407,26 +424,16 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition get_offered_deadline_missed_status( rmw_offered_deadline_missed_status_t * const status) { - status->total_count = this->status_deadline.total_count; - status->total_count_change = this->status_deadline.total_count_change; + update_state( + [this, status]() { + this->triggered_deadline = false; - this->status_deadline.total_count_change = 0; - this->triggered_deadline = false; - - return RMW_RET_OK; - } - - inline rmw_ret_t - get_offered_qos_incompatible_status( - rmw_offered_qos_incompatible_event_status_t * const status) - { - status->total_count = this->status_qos.total_count; - status->total_count_change = this->status_qos.total_count_change; - status->last_policy_kind = - dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); + status->total_count = this->status_deadline.total_count; + status->total_count_change = this->status_deadline.total_count_change; - this->status_qos.total_count_change = 0; - this->triggered_qos = false; + this->status_deadline.total_count_change = 0; + this->status_deadline_last = this->status_deadline; + }, false /* notify */); return RMW_RET_OK; } @@ -441,14 +448,18 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition void update_status_qos( const DDS_OfferedIncompatibleQosStatus * const status); - bool triggered_deadline; - bool triggered_liveliness; - bool triggered_qos; + bool triggered_deadline{false}; + bool triggered_liveliness{false}; + bool triggered_qos{false}; DDS_OfferedDeadlineMissedStatus status_deadline; DDS_OfferedIncompatibleQosStatus status_qos; DDS_LivelinessLostStatus status_liveliness; + DDS_OfferedDeadlineMissedStatus status_deadline_last; + DDS_OfferedIncompatibleQosStatus status_qos_last; + DDS_LivelinessLostStatus status_liveliness_last; + RMW_Connext_Publisher * pub; }; @@ -491,7 +502,7 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition virtual ~RMW_Connext_SubscriberStatusCondition(); - bool + virtual bool has_status(const rmw_event_type_t event_type); virtual rmw_ret_t @@ -534,15 +545,6 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition rmw_ret_t install(RMW_Connext_Subscriber * const sub); - bool - on_data_triggered() - { - return this->triggered_data.exchange(false); - } - - void - on_data(); - void on_requested_deadline_missed( const DDS_RequestedDeadlineMissedStatus * const status); @@ -564,31 +566,28 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition rmw_ret_t set_data_available(const bool available) { - UNUSED_ARG(available); - return RMW_RET_OK; - } + update_state( + [this, available]() { + this->triggered_data = available; + }, true /* notify */); - // Methods for "internal" subscribers only - inline rmw_ret_t - trigger_loan_guard_condition(const bool trigger_value) - { - if (nullptr == this->_loan_guard_condition) { - return RMW_RET_ERROR; - } - if (DDS_RETCODE_OK != DDS_GuardCondition_set_trigger_value( - this->_loan_guard_condition, trigger_value)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to set internal reader condition's trigger") - return RMW_RET_ERROR; + if (nullptr != this->loan_guard_condition) { + if (DDS_RETCODE_OK != + DDS_GuardCondition_set_trigger_value(this->loan_guard_condition, available)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to set internal reader condition's trigger") + return RMW_RET_ERROR; + } } return RMW_RET_OK; } - DDS_Condition * - loan_guard_condition() const + virtual bool + owns(DDS_Condition * const cond) { - return (nullptr != this->_loan_guard_condition) ? - DDS_GuardCondition_as_condition(this->_loan_guard_condition) : nullptr; + return RMW_Connext_StatusCondition::owns(cond) || + (nullptr != this->loan_guard_condition && + cond == DDS_GuardCondition_as_condition(this->loan_guard_condition)); } virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) @@ -597,9 +596,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition if (RMW_RET_OK != rc) { return rc; } - if (nullptr != this->_loan_guard_condition) { + if (nullptr != this->loan_guard_condition) { return RMW_Connext_Condition::_attach( - waitset, DDS_GuardCondition_as_condition(this->_loan_guard_condition)); + waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition)); } return RMW_RET_OK; } @@ -609,9 +608,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition if (RMW_RET_OK != rc) { return rc; } - if (nullptr != this->_loan_guard_condition) { + if (nullptr != this->loan_guard_condition) { return RMW_Connext_Condition::_detach( - waitset, DDS_GuardCondition_as_condition(this->_loan_guard_condition)); + waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition)); } return RMW_RET_OK; } @@ -620,14 +619,19 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition inline rmw_ret_t get_liveliness_changed_status(rmw_liveliness_changed_status_t * const status) { - status->alive_count = this->status_liveliness.alive_count; - status->alive_count_change = this->status_liveliness.alive_count_change; - status->not_alive_count = this->status_liveliness.not_alive_count; - status->not_alive_count_change = this->status_liveliness.not_alive_count_change; + update_state( + [this, status]() { + this->triggered_liveliness = false; - this->status_liveliness.alive_count_change = 0; - this->status_liveliness.not_alive_count_change = 0; - this->triggered_liveliness = false; + status->alive_count = this->status_liveliness.alive_count; + status->alive_count_change = this->status_liveliness.alive_count_change; + status->not_alive_count = this->status_liveliness.not_alive_count; + status->not_alive_count_change = this->status_liveliness.not_alive_count_change; + + this->status_liveliness.alive_count_change = 0; + this->status_liveliness.not_alive_count_change = 0; + this->status_liveliness_last = this->status_liveliness; + }, false /* notify */); return RMW_RET_OK; } @@ -636,31 +640,21 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition get_requested_deadline_missed_status( rmw_requested_deadline_missed_status_t * const status) { - status->total_count = this->status_deadline.total_count; - status->total_count_change = this->status_deadline.total_count_change; + update_state( + [this, status]() { + this->triggered_deadline = false; - this->status_deadline.total_count_change = 0; - this->triggered_deadline = false; + status->total_count = this->status_deadline.total_count; + status->total_count_change = this->status_deadline.total_count_change; - return RMW_RET_OK; - } - - - inline rmw_ret_t - get_requested_qos_incompatible_status( - rmw_requested_qos_incompatible_event_status_t * const status) - { - status->total_count = this->status_qos.total_count; - status->total_count_change = this->status_qos.total_count_change; - status->last_policy_kind = - dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); - - this->status_qos.total_count_change = 0; - this->triggered_qos = false; + this->status_deadline.total_count_change = 0; + this->status_deadline_last = this->status_deadline; + }, false /* notify */); return RMW_RET_OK; } + protected: void update_status_deadline( const DDS_RequestedDeadlineMissedStatus * const status); @@ -674,20 +668,27 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition void update_status_sample_lost( const DDS_SampleLostStatus * const status); - std::atomic_bool triggered_deadline; - std::atomic_bool triggered_liveliness; - std::atomic_bool triggered_qos; - std::atomic_bool triggered_sample_lost; - std::atomic_bool triggered_data; + DDS_GuardCondition * const loan_guard_condition; - DDS_GuardCondition * _loan_guard_condition; + bool triggered_deadline{false}; + bool triggered_liveliness{false}; + bool triggered_qos{false}; + bool triggered_sample_lost{false}; + bool triggered_data{false}; DDS_RequestedDeadlineMissedStatus status_deadline; DDS_RequestedIncompatibleQosStatus status_qos; DDS_LivelinessChangedStatus status_liveliness; DDS_SampleLostStatus status_sample_lost; + DDS_RequestedDeadlineMissedStatus status_deadline_last; + DDS_RequestedIncompatibleQosStatus status_qos_last; + DDS_LivelinessChangedStatus status_liveliness_last; + DDS_SampleLostStatus status_sample_lost_last; + RMW_Connext_Subscriber * sub; + + friend class RMW_Connext_WaitSet; }; /****************************************************************************** @@ -704,10 +705,6 @@ class RMW_Connext_Event rmw_ret_t disable(rmw_event_t * const event); - static - bool - active(rmw_event_t * const event); - static DDS_Condition * condition(const rmw_event_t * const event); @@ -740,5 +737,4 @@ class RMW_Connext_Event return reinterpret_cast(event->data); } }; -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ #endif // RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp index f4732d28..2ad1c9fc 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp @@ -263,14 +263,14 @@ * In order to reduce the time to cleanup a participant (Node), we use the * advice from * https://community.rti.com/static/documentation/connext-dds/5.3.1/doc/api/connext_dds/api_cpp/structDDS__DomainParticipantQos.html - * and reduce the shutdown_cleanup_period to 50 milliseconds. + * and reduce the shutdown_cleanup_period to 10 milliseconds. ******************************************************************************/ #ifndef RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_SEC #define RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_SEC 0 #endif /* RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_SEC */ #ifndef RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC -#define RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC 50000000 +#define RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC 10000000 #endif /* RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC */ /****************************************************************************** @@ -299,13 +299,6 @@ #define RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE 1 #endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */ -/****************************************************************************** - * Use an alternative implementation of WaitSets based on C++ std library - ******************************************************************************/ -#ifndef RMW_CONNEXT_CPP_STD_WAITSETS -#define RMW_CONNEXT_CPP_STD_WAITSETS 0 -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ - #include "resource_limits.hpp" #endif // RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ diff --git a/rmw_connextdds_common/src/common/rmw_graph.cpp b/rmw_connextdds_common/src/common/rmw_graph.cpp index 3ae6c9ad..3e0e8d4a 100644 --- a/rmw_connextdds_common/src/common/rmw_graph.cpp +++ b/rmw_connextdds_common/src/common/rmw_graph.cpp @@ -27,6 +27,7 @@ rmw_connextdds_graph_add_entityEA( const DDS_GUID_t * const dp_guid, const char * const topic_name, const char * const type_name, + const DDS_HistoryQosPolicy * const history, const DDS_ReliabilityQosPolicy * const reliability, const DDS_DurabilityQosPolicy * const durability, const DDS_DeadlineQosPolicy * const deadline, @@ -716,6 +717,7 @@ rmw_connextdds_graph_add_entityEA( const DDS_GUID_t * const dp_guid, const char * const topic_name, const char * const type_name, + const DDS_HistoryQosPolicy * const history, const DDS_ReliabilityQosPolicy * const reliability, const DDS_DurabilityQosPolicy * const durability, const DDS_DeadlineQosPolicy * const deadline, @@ -733,7 +735,7 @@ rmw_connextdds_graph_add_entityEA( if (RMW_RET_OK != rmw_connextdds_readerwriter_qos_to_ros( - nullptr /* history */, + history, reliability, durability, deadline, @@ -861,6 +863,7 @@ rmw_connextdds_graph_add_local_publisherEA( &dp_guid, topic_name, pub->message_type_support()->type_name(), + &dw_qos.history, &dw_qos.reliability, &dw_qos.durability, &dw_qos.deadline, @@ -926,6 +929,7 @@ rmw_connextdds_graph_add_local_subscriberEA( &dp_guid, topic_name, sub->message_type_support()->type_name(), + &dr_qos.history, &dr_qos.reliability, &dr_qos.durability, &dr_qos.deadline, @@ -965,6 +969,7 @@ rmw_connextdds_graph_add_remote_entity( dp_guid, topic_name, type_name, + nullptr /* history (not propagated via discovery) */, reliability, durability, deadline, diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index cbe4d57c..844e7b90 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -569,12 +569,10 @@ RMW_Connext_Publisher::RMW_Connext_Publisher( status_condition(dds_writer) { rmw_connextdds_get_entity_gid(this->dds_writer, this->ros_gid); -#if RMW_CONNEXT_CPP_STD_WAITSETS if (RMW_RET_OK != this->status_condition.install(this)) { RMW_CONNEXT_LOG_ERROR("failed to install condition on writer") throw std::runtime_error("failed to install condition on writer"); } -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ } RMW_Connext_Publisher * @@ -1052,12 +1050,10 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( this->loan_info = def_info_seq; this->loan_len = 0; this->loan_next = 0; -#if RMW_CONNEXT_CPP_STD_WAITSETS if (RMW_RET_OK != this->status_condition.install(this)) { RMW_CONNEXT_LOG_ERROR("failed to install condition on reader") throw std::runtime_error("failed to install condition on reader"); } -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ } RMW_Connext_Subscriber * @@ -1413,7 +1409,7 @@ RMW_Connext_Subscriber::take_serialized( } rmw_ret_t -RMW_Connext_Subscriber::loan_messages() +RMW_Connext_Subscriber::loan_messages(const bool update_condition) { /* this function should only be called once all previously loaned messages have been returned */ @@ -1430,11 +1426,11 @@ RMW_Connext_Subscriber::loan_messages() "[%s] loaned messages: %lu", this->type_support->type_name(), this->loan_len) - if (this->loan_len > 0) { - return this->status_condition.set_data_available(true); + if (update_condition) { + return this->status_condition.set_data_available(this->loan_len > 0); + } else { + return RMW_RET_OK; } - - return RMW_RET_OK; } rmw_ret_t @@ -2337,10 +2333,49 @@ RMW_Connext_Client::enable() rmw_ret_t RMW_Connext_Client::is_service_available(bool & available) { - /* TODO(asorbini): check that we actually have at least one service matched by both - request writer and response reader */ - available = (this->request_pub->subscriptions_count() > 0 && - this->reply_sub->publications_count() > 0); + // mark service as available if we have at least one writer and one reader + // matched from the same remote DomainParticipant. + struct DDS_InstanceHandleSeq matched_req_subs = DDS_SEQUENCE_INITIALIZER, + matched_rep_pubs = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_seqs = rcpputils::make_scope_exit( + [&matched_req_subs, &matched_rep_pubs]() + { + if (!DDS_InstanceHandleSeq_finalize(&matched_req_subs)) { + RMW_CONNEXT_LOG_ERROR("failed to finalize req instance handle sequence") + } + if (!DDS_InstanceHandleSeq_finalize(&matched_rep_pubs)) { + RMW_CONNEXT_LOG_ERROR("failed to finalize rep instance handle sequence") + } + }); + + DDS_ReturnCode_t dds_rc = + DDS_DataWriter_get_matched_subscriptions(this->request_pub->writer(), &matched_req_subs); + if (DDS_RETCODE_OK != dds_rc) { + RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched subscriptions: dds_rc=%d", dds_rc) + return RMW_RET_ERROR; + } + + dds_rc = + DDS_DataReader_get_matched_publications(this->reply_sub->reader(), &matched_rep_pubs); + if (DDS_RETCODE_OK != dds_rc) { + RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched publications: dds_rc=%d", dds_rc) + return RMW_RET_ERROR; + } + + const DDS_Long subs_len = DDS_InstanceHandleSeq_get_length(&matched_req_subs), + pubs_len = DDS_InstanceHandleSeq_get_length(&matched_rep_pubs); + + for (DDS_Long i = 0; i < subs_len && !available; i++) { + DDS_InstanceHandle_t * const sub_ih = + DDS_InstanceHandleSeq_get_reference(&matched_req_subs, i); + + for (DDS_Long j = 0; j < pubs_len && !available; j++) { + DDS_InstanceHandle_t * const pub_ih = + DDS_InstanceHandleSeq_get_reference(&matched_rep_pubs, j); + available = DDS_InstanceHandle_compare_prefix(sub_ih, pub_ih) == 0; + } + } + return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index cdfba613..9a652a22 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -14,7 +14,6 @@ #include "rmw_connextdds/rmw_impl.hpp" -#if RMW_CONNEXT_CPP_STD_WAITSETS /****************************************************************************** * Event ******************************************************************************/ @@ -52,16 +51,6 @@ RMW_Connext_Event::disable(rmw_event_t * const event) } } -bool -RMW_Connext_Event::active(rmw_event_t * const event) -{ - if (RMW_Connext_Event::reader_event(event)) { - return RMW_Connext_Event::subscriber(event)->condition()->has_status(event->event_type); - } else { - return RMW_Connext_Event::publisher(event)->condition()->has_status(event->event_type); - } -} - /****************************************************************************** * StdWaitSet ******************************************************************************/ @@ -131,10 +120,8 @@ RMW_Connext_DataReaderListener_on_data_available( reinterpret_cast(listener_data); UNUSED_ARG(reader); - RMW_CONNEXT_LOG_DEBUG_A( - "data available: condition=%p, reader=%p", - (void *)self, (void *)reader); - self->on_data(); + + self->set_data_available(true); } void @@ -192,7 +179,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < subs->subscriber_count; ++i) { RMW_Connext_Subscriber * const sub = reinterpret_cast(subs->subscribers[i]); - if (sub->has_data()) { + if (sub->condition()->triggered_data) { return true; } } @@ -202,7 +189,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < cls->client_count; ++i) { RMW_Connext_Client * const client = reinterpret_cast(cls->clients[i]); - if (client->subscriber()->has_data()) { + if (client->subscriber()->condition()->triggered_data) { return true; } } @@ -212,7 +199,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < srvs->service_count; ++i) { RMW_Connext_Service * const svc = reinterpret_cast(srvs->services[i]); - if (svc->subscriber()->has_data()) { + if (svc->subscriber()->condition()->triggered_data) { return true; } } @@ -240,7 +227,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < gcs->guard_condition_count; ++i) { RMW_Connext_GuardCondition * const gcond = reinterpret_cast(gcs->guard_conditions[i]); - if (gcond->has_triggered()) { + if (gcond->trigger_value) { return true; } } @@ -263,11 +250,15 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < subs->subscriber_count; ++i) { RMW_Connext_Subscriber * const sub = reinterpret_cast(subs->subscribers[i]); - if (sub->has_data()) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond]() { + return cond->triggered_data; + }); + if (wait_active) { return; } - sub->condition()->attach(&this->condition_mutex, &this->condition); } } @@ -275,11 +266,15 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < cls->client_count; ++i) { RMW_Connext_Client * const client = reinterpret_cast(cls->clients[i]); - if (client->subscriber()->has_data()) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = client->subscriber()->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond]() { + return cond->triggered_data; + }); + if (wait_active) { return; } - client->subscriber()->condition()->attach(&this->condition_mutex, &this->condition); } } @@ -287,11 +282,15 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < srvs->service_count; ++i) { RMW_Connext_Service * const svc = reinterpret_cast(srvs->services[i]); - if (svc->subscriber()->has_data()) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = svc->subscriber()->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond]() { + return cond->triggered_data; + }); + if (wait_active) { return; } - svc->subscriber()->condition()->attach(&this->condition_mutex, &this->condition); } } @@ -301,18 +300,26 @@ RMW_Connext_WaitSet::attach( reinterpret_cast(evs->events[i]); if (RMW_Connext_Event::reader_event(event)) { auto sub = RMW_Connext_Event::subscriber(event); - if (sub->condition()->has_status(event->event_type)) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond, event]() { + return cond->has_status(event->event_type); + }); + if (wait_active) { return; } - sub->condition()->attach(&this->condition_mutex, &this->condition); } else { auto pub = RMW_Connext_Event::publisher(event); - if (pub->condition()->has_status(event->event_type)) { - wait_active = true; + RMW_Connext_PublisherStatusCondition * const cond = pub->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond, event]() { + return cond->has_status(event->event_type); + }); + if (wait_active) { return; } - pub->condition()->attach(&this->condition_mutex, &this->condition); } } } @@ -321,11 +328,14 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < gcs->guard_condition_count; ++i) { RMW_Connext_GuardCondition * const gcond = reinterpret_cast(gcs->guard_conditions[i]); - if (gcond->has_triggered()) { - wait_active = true; + gcond->attach( + &this->mutex_internal, &this->condition, wait_active, + [gcond]() { + return gcond->trigger_value; + }); + if (wait_active) { return; } - gcond->attach(&this->condition_mutex, &this->condition); } } } @@ -343,15 +353,21 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < subs->subscriber_count; ++i) { RMW_Connext_Subscriber * const sub = reinterpret_cast(subs->subscribers[i]); - sub->condition()->detach(); - if (!sub->has_data()) { - subs->subscribers[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active subscriber: sub=%p\n", - reinterpret_cast(sub)) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + const bool data_available = sub->has_data(); + cond->detach( + [cond, subs, sub, &active_conditions, i, data_available]() { + UNUSED_ARG(sub); + cond->triggered_data = data_available; + if (!cond->triggered_data) { + subs->subscribers[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active subscriber: sub=%p", + reinterpret_cast(sub)) + active_conditions += 1; + } + }); } } @@ -359,15 +375,21 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < cls->client_count; ++i) { RMW_Connext_Client * const client = reinterpret_cast(cls->clients[i]); - client->subscriber()->condition()->detach(); - if (!client->subscriber()->has_data()) { - cls->clients[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active client: " - "client=%p", (void *)client) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = client->subscriber()->condition(); + const bool data_available = client->subscriber()->has_data(); + cond->detach( + [cond, cls, client, &active_conditions, i, data_available]() { + UNUSED_ARG(client); + cond->triggered_data = data_available; + if (!cond->triggered_data) { + cls->clients[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active client: " + "client=%p", (void *)client) + active_conditions += 1; + } + }); } } @@ -375,15 +397,21 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < srvs->service_count; ++i) { RMW_Connext_Service * const svc = reinterpret_cast(srvs->services[i]); - svc->subscriber()->condition()->detach(); - if (!svc->subscriber()->has_data()) { - srvs->services[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active service: " - "svc=%p", (void *)svc) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = svc->subscriber()->condition(); + const bool data_available = svc->subscriber()->has_data(); + cond->detach( + [cond, srvs, svc, &active_conditions, i, data_available]() { + UNUSED_ARG(svc); + cond->triggered_data = data_available; + if (!cond->triggered_data) { + srvs->services[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active service: " + "svc=%p", (void *)svc) + active_conditions += 1; + } + }); } } @@ -393,26 +421,32 @@ RMW_Connext_WaitSet::detach( reinterpret_cast(evs->events[i]); if (RMW_Connext_Event::reader_event(event)) { auto sub = RMW_Connext_Event::subscriber(event); - sub->condition()->detach(); - if (!sub->condition()->has_status(event->event_type)) { - evs->events[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active subscriber event: " - "event=%p", (void *)event) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + cond->detach( + [cond, evs, event, &active_conditions, i]() { + if (!cond->has_status(event->event_type)) { + evs->events[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active subscriber event: " + "event=%p", (void *)event) + active_conditions += 1; + } + }); } else { auto pub = RMW_Connext_Event::publisher(event); - pub->condition()->detach(); - if (!pub->condition()->has_status(event->event_type)) { - evs->events[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active publisher event: " - "event=%p", (void *)event) - active_conditions += 1; - } + RMW_Connext_PublisherStatusCondition * const cond = pub->condition(); + cond->detach( + [cond, evs, event, &active_conditions, i]() { + if (!cond->has_status(event->event_type)) { + evs->events[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active pulisher event: " + "event=%p", (void *)event) + active_conditions += 1; + } + }); } } } @@ -421,15 +455,19 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < gcs->guard_condition_count; ++i) { RMW_Connext_GuardCondition * const gcond = reinterpret_cast(gcs->guard_conditions[i]); - gcond->detach(); - if (!gcond->trigger_check()) { - gcs->guard_conditions[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active guard condition: " - "condition=%p", (void *)gcond) - active_conditions += 1; - } + gcond->detach( + [gcond, gcs, &active_conditions, i]() { + bool triggered = gcond->trigger_value; + gcond->trigger_value = false; + if (!triggered) { + gcs->guard_conditions[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active guard condition: " + "condition=%p", (void *)gcond) + active_conditions += 1; + } + }); } } } @@ -443,7 +481,6 @@ RMW_Connext_WaitSet::wait( rmw_events_t * const evs, const rmw_time_t * const wait_timeout) { -#if 1 { std::lock_guard lock(this->mutex_internal); if (this->waiting) { @@ -454,22 +491,20 @@ RMW_Connext_WaitSet::wait( this->waiting = true; } - RMW_Connext_WaitSet * const ws = this; - auto scope_exit = rcpputils::make_scope_exit( - [ws]() + auto scope_exit_ws = rcpputils::make_scope_exit( + [this]() { - std::lock_guard lock(ws->mutex_internal); - ws->waiting = false; + std::lock_guard lock(this->mutex_internal); + this->waiting = false; }); -#endif bool already_active = false; - this->attach(subs, gcs, srvs, cls, evs, already_active); bool timedout = false; if (!already_active) { + std::unique_lock lock(this->mutex_internal); RMW_CONNEXT_LOG_DEBUG_A( "[wait] waiting on: " "waitset=%p, " @@ -485,15 +520,13 @@ RMW_Connext_WaitSet::wait( (nullptr != cls) ? cls->client_count : 0, (nullptr != evs) ? evs->event_count : 0); - std::unique_lock lock(this->condition_mutex); - auto on_condition_active = [self = this, subs, gcs, srvs, cls, evs]() { return self->on_condition_active(subs, gcs, srvs, cls, evs); }; - if (nullptr == wait_timeout) { + if (nullptr == wait_timeout || rmw_time_equal(*wait_timeout, RMW_DURATION_INFINITE)) { this->condition.wait(lock, on_condition_active); } else if (wait_timeout->sec > 0 || wait_timeout->nsec > 0) { auto n = std::chrono::duration_cast( @@ -508,11 +541,9 @@ RMW_Connext_WaitSet::wait( size_t active_conditions = 0; this->detach(subs, gcs, srvs, cls, evs, active_conditions); - // assert(active_conditions > 0 || timedout); + RMW_CONNEXT_ASSERT(active_conditions > 0 || timedout); if (timedout) { - rmw_reset_error(); - RMW_SET_ERROR_MSG("wait timed out"); return RMW_RET_TIMEOUT; } @@ -571,49 +602,32 @@ RMW_Connext_SubscriberStatusCondition::RMW_Connext_SubscriberStatusCondition( DDS_DomainParticipant_as_entity( DDS_Subscriber_get_participant( DDS_DataReader_get_subscriber(reader))))), - triggered_deadline(false), - triggered_liveliness(false), - triggered_qos(false), - triggered_sample_lost(false), - triggered_data(false), - _loan_guard_condition(nullptr), + loan_guard_condition(internal ? DDS_GuardCondition_new() : nullptr), + status_deadline(DDS_RequestedDeadlineMissedStatus_INITIALIZER), + status_qos(DDS_RequestedIncompatibleQosStatus_INITIALIZER), + status_liveliness(DDS_LivelinessChangedStatus_INITIALIZER), + status_sample_lost(DDS_SampleLostStatus_INITIALIZER), + status_deadline_last(DDS_RequestedDeadlineMissedStatus_INITIALIZER), + status_qos_last(DDS_RequestedIncompatibleQosStatus_INITIALIZER), + status_liveliness_last(DDS_LivelinessChangedStatus_INITIALIZER), + status_sample_lost_last(DDS_SampleLostStatus_INITIALIZER), sub(nullptr) { - if (internal) { - this->_loan_guard_condition = DDS_GuardCondition_new(); - if (nullptr == this->_loan_guard_condition) { - RMW_CONNEXT_LOG_ERROR_SET("failed to allocate internal reader condition") - throw new std::runtime_error("failed to allocate internal reader condition"); - } + if (internal && nullptr == this->loan_guard_condition) { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate internal reader condition") + throw new std::runtime_error("failed to allocate internal reader condition"); } } RMW_Connext_SubscriberStatusCondition::~RMW_Connext_SubscriberStatusCondition() { - if (nullptr != this->_loan_guard_condition) { - if (DDS_RETCODE_OK != DDS_GuardCondition_delete(this->_loan_guard_condition)) { + if (nullptr != this->loan_guard_condition) { + if (DDS_RETCODE_OK != DDS_GuardCondition_delete(this->loan_guard_condition)) { RMW_CONNEXT_LOG_ERROR_SET("failed to delete internal reader condition") } } } -void -RMW_Connext_SubscriberStatusCondition::on_data() -{ - std::lock_guard lock(this->mutex_internal); - - this->triggered_data = true; - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } - - // Update loan guard condition's trigger value for internal endpoints - if (nullptr != this->_loan_guard_condition) { - (void)this->trigger_loan_guard_condition(true); - } -} - bool RMW_Connext_SubscriberStatusCondition::has_status( const rmw_event_type_t event_type) @@ -639,57 +653,44 @@ RMW_Connext_SubscriberStatusCondition::has_status( } } - void RMW_Connext_SubscriberStatusCondition::on_requested_deadline_missed( const DDS_RequestedDeadlineMissedStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_deadline(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_deadline(status); + }, true /* notify */); } void RMW_Connext_SubscriberStatusCondition::on_requested_incompatible_qos( const DDS_RequestedIncompatibleQosStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_qos(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_qos(status); + }, true /* notify */); } void RMW_Connext_SubscriberStatusCondition::on_liveliness_changed( const DDS_LivelinessChangedStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_liveliness(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_liveliness(status); + }, true /* notify */); } void RMW_Connext_SubscriberStatusCondition::on_sample_lost( const DDS_SampleLostStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_sample_lost(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_sample_lost(status); + }, true /* notify */); } void @@ -698,6 +699,9 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline( { this->status_deadline = *status; this->triggered_deadline = true; + + this->status_deadline.total_count_change = this->status_deadline.total_count; + this->status_deadline.total_count_change -= this->status_deadline_last.total_count; } void @@ -706,6 +710,12 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness( { this->status_liveliness = *status; this->triggered_liveliness = true; + + this->status_liveliness.alive_count_change = this->status_liveliness.alive_count; + this->status_liveliness.not_alive_count_change = this->status_liveliness.not_alive_count; + this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count; + this->status_liveliness.not_alive_count_change -= + this->status_liveliness_last.not_alive_count; } void @@ -714,6 +724,9 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos( { this->status_qos = *status; this->triggered_qos = true; + + this->status_qos.total_count_change = this->status_qos.total_count; + this->status_qos.total_count_change -= this->status_qos_last.total_count; } void @@ -722,6 +735,10 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost( { this->status_sample_lost = *status; this->triggered_sample_lost = true; + + this->status_sample_lost.total_count_change = this->status_sample_lost.total_count; + this->status_sample_lost.total_count_change -= + this->status_sample_lost_last.total_count; } rmw_ret_t @@ -760,21 +777,13 @@ RMW_Connext_PublisherStatusCondition::install( RMW_Connext_PublisherStatusCondition::RMW_Connext_PublisherStatusCondition( DDS_DataWriter * const writer) : RMW_Connext_StatusCondition(DDS_DataWriter_as_entity(writer)), - triggered_deadline(false), - triggered_liveliness(false), - triggered_qos(false) -{ - const DDS_OfferedDeadlineMissedStatus def_status_deadline = - DDS_OfferedDeadlineMissedStatus_INITIALIZER; - const DDS_OfferedIncompatibleQosStatus def_status_qos = - DDS_OfferedIncompatibleQosStatus_INITIALIZER; - const DDS_LivelinessLostStatus def_status_liveliness = - DDS_LivelinessLostStatus_INITIALIZER; - - this->status_deadline = def_status_deadline; - this->status_liveliness = def_status_liveliness; - this->status_qos = def_status_qos; -} + status_deadline(DDS_OfferedDeadlineMissedStatus_INITIALIZER), + status_qos(DDS_OfferedIncompatibleQosStatus_INITIALIZER), + status_liveliness(DDS_LivelinessLostStatus_INITIALIZER), + status_deadline_last(DDS_OfferedDeadlineMissedStatus_INITIALIZER), + status_qos_last(DDS_OfferedIncompatibleQosStatus_INITIALIZER), + status_liveliness_last(DDS_LivelinessLostStatus_INITIALIZER) +{} bool RMW_Connext_PublisherStatusCondition::has_status( @@ -804,39 +813,30 @@ void RMW_Connext_PublisherStatusCondition::on_offered_deadline_missed( const DDS_OfferedDeadlineMissedStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_deadline(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_deadline(status); + }, true /* notify */); } void RMW_Connext_PublisherStatusCondition::on_offered_incompatible_qos( const DDS_OfferedIncompatibleQosStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_qos(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_qos(status); + }, true /* notify */); } void RMW_Connext_PublisherStatusCondition::on_liveliness_lost( const DDS_LivelinessLostStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_liveliness(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_liveliness(status); + }, true /* notify */); } void @@ -845,6 +845,9 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline( { this->status_deadline = *status; this->triggered_deadline = true; + + this->status_deadline.total_count_change = this->status_deadline.total_count; + this->status_deadline.total_count_change -= this->status_deadline_last.total_count; } void @@ -853,6 +856,9 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness( { this->status_liveliness = *status; this->triggered_liveliness = true; + + this->status_liveliness.total_count_change = this->status_liveliness.total_count; + this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count; } void @@ -861,5 +867,7 @@ RMW_Connext_PublisherStatusCondition::update_status_qos( { this->status_qos = *status; this->triggered_qos = true; + + this->status_qos.total_count_change = this->status_qos.total_count; + this->status_qos.total_count_change -= this->status_qos_last.total_count; } -#endif /* !RMW_CONNEXT_CPP_STD_WAITSETS */ diff --git a/rmw_connextdds_common/src/common/rmw_info.cpp b/rmw_connextdds_common/src/common/rmw_info.cpp index 908b32c6..de6829d0 100644 --- a/rmw_connextdds_common/src/common/rmw_info.cpp +++ b/rmw_connextdds_common/src/common/rmw_info.cpp @@ -538,19 +538,6 @@ rmw_api_connextdds_get_publishers_info_by_topic( RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publishers_info, RMW_RET_INVALID_ARGUMENT); - int validation_result = RMW_TOPIC_VALID; - rmw_ret_t ret = - rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); - if (RMW_RET_OK != ret) { - return ret; - } - if (RMW_TOPIC_VALID != validation_result) { - const char * reason = - rmw_full_topic_name_validation_result_string(validation_result); - RMW_CONNEXT_LOG_ERROR_A_SET("invalid topic name: %s", reason) - return RMW_RET_INVALID_ARGUMENT; - } - RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "allocator argument is invalid", return RMW_RET_INVALID_ARGUMENT); @@ -593,19 +580,6 @@ rmw_api_connextdds_get_subscriptions_info_by_topic( RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscriptions_info, RMW_RET_INVALID_ARGUMENT); - int validation_result = RMW_TOPIC_VALID; - rmw_ret_t ret = - rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); - if (RMW_RET_OK != ret) { - return ret; - } - if (RMW_TOPIC_VALID != validation_result) { - const char * reason = - rmw_full_topic_name_validation_result_string(validation_result); - RMW_CONNEXT_LOG_ERROR_A_SET("invalid topic name: %s", reason) - return RMW_RET_INVALID_ARGUMENT; - } - RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "allocator argument is invalid", return RMW_RET_INVALID_ARGUMENT); diff --git a/rmw_connextdds_common/src/common/rmw_type_support.cpp b/rmw_connextdds_common/src/common/rmw_type_support.cpp index e3c14059..2f3b3a34 100644 --- a/rmw_connextdds_common/src/common/rmw_type_support.cpp +++ b/rmw_connextdds_common/src/common/rmw_type_support.cpp @@ -538,8 +538,25 @@ RMW_Connext_MessageTypeSupport::get_type_support_fastrtps( get_message_typesupport_handle( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_message_typesupport_handle( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP); + if (nullptr == type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required fastrtps message type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); + } } return type_support; } @@ -553,12 +570,28 @@ RMW_Connext_MessageTypeSupport::get_type_support_intro( get_message_typesupport_handle( type_supports, rosidl_typesupport_introspection_c__identifier); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_message_typesupport_handle( type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier); if (nullptr != type_support) { cpp_version = true; + } else { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required introspection message type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); } } else { cpp_version = false; @@ -676,12 +709,28 @@ RMW_Connext_ServiceTypeSupportWrapper::get_type_support_intro( type_supports, rosidl_typesupport_introspection_c__identifier); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_service_typesupport_handle( type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier); if (nullptr != type_support) { cpp_version = true; + } else { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required introspection service type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); } } @@ -697,8 +746,26 @@ RMW_Connext_ServiceTypeSupportWrapper::get_type_support_fastrtps( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_service_typesupport_handle( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP); + + if (nullptr == type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required fastrtps service type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); + } } return type_support; diff --git a/rmw_connextdds_common/src/ndds/rmw_typecode.cpp b/rmw_connextdds_common/src/ndds/rmw_typecode.cpp index 5eba4143..a9f48afa 100644 --- a/rmw_connextdds_common/src/ndds/rmw_typecode.cpp +++ b/rmw_connextdds_common/src/ndds/rmw_typecode.cpp @@ -948,13 +948,18 @@ rmw_connextdds_convert_type_members( DDS_TypeCode * tc_header = nullptr; if (type_support->type_requestreply()) { + // TODO(asorbini) cache condition to a variable to avoid an "unformattable" + // long line on the `else()` (see https://github.com/ament/ament_lint/issues/158) + const bool basic_mapping = + RMW_Connext_RequestReplyMapping::Basic == type_support->ctx()->request_reply_mapping; + if (type_support->ctx()->cyclone_compatible) { tc_header = CycloneRequestHeader_get_typecode(tc_factory, tc_cache); if (nullptr == tc_header) { RMW_CONNEXT_LOG_ERROR("failed to get CycloneRequestHeader typecode") return RMW_RET_ERROR; } - } else { + } else if (basic_mapping) { switch (type_support->message_type()) { case RMW_CONNEXT_MESSAGE_REQUEST: {