diff --git a/README.md b/README.md index 78b25af3..34638fcb 100644 --- a/README.md +++ b/README.md @@ -24,10 +24,11 @@ For any questions or feedback, feel free to reach out to robotics@rti.com. ## Quick Start -1. Load ROS into the shell environment, e.g. if you are using Foxy: +1. Load ROS into the shell environment (Rolling if using the `master` branch, + see [Support for different ROS 2 Releases](#support-for-different-ros-2-releases)) ```sh - source /opt/ros/foxy/setup.bash + source /opt/ros/rolling/setup.bash ``` 2. Configure RTI Connext DDS Professional and/or RTI Connext DDS Micro on your diff --git a/rmw_connextdds_common/CMakeLists.txt b/rmw_connextdds_common/CMakeLists.txt index 574712b8..c95cb899 100644 --- a/rmw_connextdds_common/CMakeLists.txt +++ b/rmw_connextdds_common/CMakeLists.txt @@ -81,15 +81,6 @@ function(rtirmw_add_library) CACHE INTERNAL "") endif() - if(NOT "${RMW_CONNEXT_WAITSET_MODE}" STREQUAL "") - string(TOUPPER "${RMW_CONNEXT_WAITSET_MODE}" rmw_connext_waitset_mode) - if(rmw_connext_waitset_mode STREQUAL "STD") - list(APPEND private_defines "RMW_CONNEXT_CPP_STD_WAITSETS=1") - endif() - set(RMW_CONNEXT_WAITSET_MODE "${RMW_CONNEXT_WAITSET_MODE}" - CACHE INTERNAL "") - endif() - target_compile_definitions(${_rti_build_NAME} PRIVATE ${private_defines}) # Causes the visibility macros to use dllexport rather than dllimport, @@ -138,7 +129,6 @@ set(RMW_CONNEXT_COMMON_SOURCE_CPP src/common/rmw_graph.cpp src/common/rmw_event.cpp src/common/rmw_impl.cpp - src/common/rmw_impl_waitset_dds.cpp src/common/rmw_impl_waitset_std.cpp src/common/rmw_info.cpp src/common/rmw_node.cpp @@ -163,7 +153,6 @@ set(RMW_CONNEXT_COMMON_SOURCE_HPP include/rmw_connextdds/resource_limits.hpp include/rmw_connextdds/rmw_impl.hpp include/rmw_connextdds/rmw_api_impl.hpp - include/rmw_connextdds/rmw_waitset_dds.hpp include/rmw_connextdds/rmw_waitset_std.hpp include/rmw_connextdds/scope_exit.hpp include/rmw_connextdds/static_config.hpp diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp index 36f28855..a40b5b30 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp @@ -41,4 +41,8 @@ typedef RMW_Connext_Uint8ArrayPtrSeq RMW_Connext_UntypedSampleSeq; // the guid with 0's. #define DDS_SampleIdentity_UNKNOWN DDS_SAMPLEIDENTITY_DEFAULT +// Convenience function to compare the first 12 bytes of the handle +#define DDS_InstanceHandle_compare_prefix(ih_a_, ih_b_) \ + memcmp((ih_a_)->keyHash.value, (ih_b_)->keyHash.value, 12) + #endif // RMW_CONNEXTDDS__DDS_API_NDDS_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp b/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp index 05eb487f..a658aaa0 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/dds_api_rtime.hpp @@ -66,4 +66,8 @@ struct DDS_LifespanQosPolicy; // the guid with 0's. #define DDS_SampleIdentity_UNKNOWN DDS_SAMPLE_IDENTITY_UNKNOWN +// Convenience function to compare the first 12 bytes of the handle +#define DDS_InstanceHandle_compare_prefix(ih_a_, ih_b_) \ + memcmp((ih_a_)->octet, (ih_b_)->octet, 12) + #endif // RMW_CONNEXTDDS__DDS_API_RTIME_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 19d55c41..713ac1f9 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -72,7 +72,6 @@ class RMW_Connext_Subscriber; class RMW_Connext_Client; class RMW_Connext_Service; -#include "rmw_connextdds/rmw_waitset_dds.hpp" #include "rmw_connextdds/rmw_waitset_std.hpp" /****************************************************************************** @@ -368,13 +367,13 @@ class RMW_Connext_Subscriber qos(rmw_qos_profile_t * const qos); rmw_ret_t - loan_messages(); + loan_messages(const bool update_condition = true); rmw_ret_t return_messages(); rmw_ret_t - loan_messages_if_needed() + loan_messages_if_needed(const bool update_condition = true) { rmw_ret_t rc = RMW_RET_OK; @@ -389,16 +388,12 @@ class RMW_Connext_Subscriber } } /* loan messages from reader */ - rc = this->loan_messages(); + rc = this->loan_messages(update_condition); if (RMW_RET_OK != rc) { return rc; } } - if (this->internal) { - return this->status_condition.trigger_loan_guard_condition(this->loan_len > 0); - } - return RMW_RET_OK; } @@ -441,7 +436,9 @@ class RMW_Connext_Subscriber has_data() { std::lock_guard lock(this->loan_mutex); - if (RMW_RET_OK != this->loan_messages_if_needed()) { + if (RMW_RET_OK != + this->loan_messages_if_needed(false /* update_condition */)) + { RMW_CONNEXT_LOG_ERROR("failed to check loaned messages") return false; } diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_dds.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_dds.hpp deleted file mode 100644 index cf9bce3b..00000000 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_dds.hpp +++ /dev/null @@ -1,766 +0,0 @@ -// Copyright 2020 Real-Time Innovations, Inc. (RTI) -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RMW_CONNEXTDDS__RMW_WAITSET_DDS_HPP_ -#define RMW_CONNEXTDDS__RMW_WAITSET_DDS_HPP_ - -#include -#include - -#include "rmw_connextdds/context.hpp" - -#if !RMW_CONNEXT_CPP_STD_WAITSETS -/****************************************************************************** - * WaitSet: wrapper implementation on top of DDS_WaitSet - ******************************************************************************/ -class RMW_Connext_Condition; -class RMW_Connext_GuardCondition; -class RMW_Connext_StatusCondition; -class RMW_Connext_SubscriberStatusCondition; -class RMW_Connext_PublisherStatusCondition; - -enum RMW_Connext_WaitSetState -{ - RMW_CONNEXT_WAITSET_FREE, - RMW_CONNEXT_WAITSET_ACQUIRING, - RMW_CONNEXT_WAITSET_BLOCKED, - RMW_CONNEXT_WAITSET_RELEASING, - RMW_CONNEXT_WAITSET_INVALIDATING -}; - -class RMW_Connext_WaitSet -{ -public: - RMW_Connext_WaitSet() - : state(RMW_CONNEXT_WAITSET_FREE), - waitset(nullptr) - { - if (!DDS_ConditionSeq_initialize(&this->active_conditions)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to initialize condition sequence") - throw std::runtime_error("failed to initialize condition sequence"); - } - - RMW_Connext_WaitSet * const ws = this; - auto scope_exit = rcpputils::make_scope_exit( - [ws]() { - if (DDS_RETCODE_OK != DDS_ConditionSeq_finalize(&ws->active_conditions)) { - RMW_CONNEXT_LOG_ERROR("failed to finalize condition sequence") - } - if (nullptr != ws->waitset && - DDS_RETCODE_OK != DDS_WaitSet_delete(ws->waitset)) - { - RMW_CONNEXT_LOG_ERROR("failed to finalize waitset") - } - }); - - this->waitset = DDS_WaitSet_new(); - if (nullptr == this->waitset) { - RMW_CONNEXT_LOG_ERROR_SET("failed to create DDS waitset") - throw std::runtime_error("failed to create DDS waitset"); - } - - scope_exit.cancel(); - } - - ~RMW_Connext_WaitSet() - { - { - std::lock_guard lock(this->mutex_internal); - // the waiset should be accessed while being deleted, otherwise - // bad things(tm) will happen. - if (RMW_CONNEXT_WAITSET_FREE != this->state) { - RMW_CONNEXT_LOG_ERROR_SET("deleting a waitset while waiting on it") - } - if (RMW_RET_OK != this->detach()) { - RMW_CONNEXT_LOG_ERROR_SET("failed to detach conditions from deleted waitset") - } - } - if (!DDS_ConditionSeq_finalize(&this->active_conditions)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to finalize condition sequence") - } - if (nullptr != this->waitset) { - if (DDS_RETCODE_OK != DDS_WaitSet_delete(this->waitset)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to finalize DDS waitset") - } - } - } - - rmw_ret_t - wait( - rmw_subscriptions_t * const subs, - rmw_guard_conditions_t * const gcs, - rmw_services_t * const srvs, - rmw_clients_t * const cls, - rmw_events_t * const evs, - const rmw_time_t * const wait_timeout); - - rmw_ret_t - invalidate(RMW_Connext_Condition * const condition); - -protected: - bool - is_attached(RMW_Connext_Condition * const condition); - - rmw_ret_t - attach( - rmw_subscriptions_t * const subs, - rmw_guard_conditions_t * const gcs, - rmw_services_t * const srvs, - rmw_clients_t * const cls, - rmw_events_t * const evs); - - rmw_ret_t - detach(); - - rmw_ret_t - process_wait( - rmw_subscriptions_t * const subs, - rmw_guard_conditions_t * const gcs, - rmw_services_t * const srvs, - rmw_clients_t * const cls, - rmw_events_t * const evs, - size_t & active_conditions); - - template - bool - require_attach( - const std::vector & attached_els, - const size_t new_els_count, - void ** const new_els); - - bool - active_condition(RMW_Connext_Condition * const cond); - - std::mutex mutex_internal; - std::condition_variable state_cond; - RMW_Connext_WaitSetState state; - DDS_WaitSet * waitset; - DDS_ConditionSeq active_conditions; - - std::vector attached_subscribers; - std::vector attached_conditions; - std::vector attached_clients; - std::vector attached_services; - std::vector attached_events; - std::map attached_events_cache; - - friend class RMW_Connext_Condition; - friend class RMW_Connext_GuardCondition; - friend class RMW_Connext_StatusCondition; - friend class RMW_Connext_SubscriberStatusCondition; - friend class RMW_Connext_PublisherStatusCondition; -}; - -class RMW_Connext_Condition -{ -public: - RMW_Connext_Condition() - : attached_waitset(nullptr), - deleted(false) - { - } - - void - invalidate() - { - RMW_Connext_WaitSet * attached_waitset = nullptr; - { - std::lock_guard lock(this->mutex_internal); - // This function might have already been called by a derived class - if (this->deleted) { - return; - } - this->deleted = true; - attached_waitset = this->attached_waitset; - } - if (nullptr != this->attached_waitset) { - attached_waitset->invalidate(this); - } - } - -protected: - static rmw_ret_t - attach( - DDS_WaitSet * const waitset, - DDS_Condition * const dds_condition) - { - if (DDS_RETCODE_OK != DDS_WaitSet_attach_condition(waitset, dds_condition)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to attach condition to waitset") - return RMW_RET_ERROR; - } - return RMW_RET_OK; - } - - static rmw_ret_t - detach( - DDS_WaitSet * const waitset, - DDS_Condition * const dds_condition) - { - // detach_condition() returns BAD_PARAMETER if the condition is not attached - DDS_ReturnCode_t rc = DDS_WaitSet_detach_condition(waitset, dds_condition); - if (DDS_RETCODE_OK != rc && - DDS_RETCODE_BAD_PARAMETER != rc && - DDS_RETCODE_PRECONDITION_NOT_MET != rc) - { - RMW_CONNEXT_LOG_ERROR_A_SET( - "failed to detach condition from waitset: %d", rc) - return RMW_RET_ERROR; - } - return RMW_RET_OK; - } - - virtual bool owns(DDS_Condition * const cond) = 0; - - rmw_ret_t - attach(RMW_Connext_WaitSet * const waitset) - { - // refuse to attach - if (this->deleted) { - return RMW_RET_ERROR; - } - - rmw_ret_t rc = RMW_RET_ERROR; - - if (nullptr != this->attached_waitset) { - if (waitset != this->attached_waitset) { - rc = this->detach(); - if (RMW_RET_OK != rc) { - return rc; - } - } else { - // condition is already attached to this waitset, nothing to do - return RMW_RET_OK; - } - } - rc = this->_attach(waitset->waitset); - if (RMW_RET_OK != rc) { - return rc; - } - this->attached_waitset = waitset; - return RMW_RET_OK; - } - - rmw_ret_t - detach() - { - rmw_ret_t rc = RMW_RET_ERROR; - - if (nullptr == this->attached_waitset) { - // condition is already detached, nothing to do - return RMW_RET_OK; - } - rc = this->_detach(this->attached_waitset->waitset); - if (RMW_RET_OK != rc) { - return rc; - } - this->attached_waitset = nullptr; - return RMW_RET_OK; - } - - virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) = 0; - virtual rmw_ret_t _detach(DDS_WaitSet * const waitset) = 0; - - RMW_Connext_WaitSet * attached_waitset; - bool deleted; - std::mutex mutex_internal; - - friend class RMW_Connext_WaitSet; -}; - -class RMW_Connext_GuardCondition : public RMW_Connext_Condition -{ -public: - explicit RMW_Connext_GuardCondition(const bool internal) - : gcond(DDS_GuardCondition_new()) - { - UNUSED_ARG(internal); - if (nullptr == this->gcond) { - RMW_CONNEXT_LOG_ERROR_SET("failed to allocate dds guard condition") - throw std::runtime_error("failed to allocate dds guard condition"); - } - } - - virtual ~RMW_Connext_GuardCondition() - { - if (nullptr == RMW_Connext_gv_DomainParticipantFactory) { - // DomainParticipantFactory has already been finalized. - // Don't try to delete the guard condition, or we might - // end up with a segfault. - // For ROS2 releases > Foxy, this is unexpected behavior - // so print an error message. - RMW_CONNEXT_LOG_ERROR( - "DomainParticipantFactory already finalized, " - "leaked DDS guard condition") - return; - } - this->invalidate(); - if (nullptr != this->gcond) { - if (DDS_RETCODE_OK != DDS_GuardCondition_delete(this->gcond)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to finalize DDS guard condition") - } - } - } - - virtual bool - owns(DDS_Condition * const cond) - { - return cond == DDS_GuardCondition_as_condition(this->gcond); - } - - rmw_ret_t - trigger() - { - if (DDS_RETCODE_OK != - DDS_GuardCondition_set_trigger_value(this->gcond, DDS_BOOLEAN_TRUE)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to trigger guard condition") - return RMW_RET_ERROR; - } - return RMW_RET_OK; - } - - rmw_ret_t - reset_trigger() - { - if (DDS_RETCODE_OK != - DDS_GuardCondition_set_trigger_value(this->gcond, DDS_BOOLEAN_FALSE)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to reset guard condition") - return RMW_RET_ERROR; - } - return RMW_RET_OK; - } - - virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) - { - return RMW_Connext_Condition::attach( - waitset, DDS_GuardCondition_as_condition(this->gcond)); - } - virtual rmw_ret_t _detach(DDS_WaitSet * const waitset) - { - return RMW_Connext_Condition::detach( - waitset, DDS_GuardCondition_as_condition(this->gcond)); - } - -protected: - DDS_GuardCondition * gcond; -}; - -class RMW_Connext_StatusCondition : public RMW_Connext_Condition -{ -public: - explicit RMW_Connext_StatusCondition( - DDS_Entity * const entity) - : entity(entity), - scond(nullptr) - { - if (nullptr == this->entity) { - RMW_CONNEXT_LOG_ERROR_SET("invalid DDS entity") - throw new std::runtime_error("invalid DDS entity"); - } - - this->scond = DDS_Entity_get_statuscondition(this->entity); - if (nullptr == this->scond) { - RMW_CONNEXT_LOG_ERROR_SET("failed to get DDS entity's condition") - throw new std::runtime_error("failed to get DDS entity's condition"); - } - } - - ~RMW_Connext_StatusCondition() - { - this->invalidate(); - } - - rmw_ret_t - reset_statuses() - { - if (DDS_RETCODE_OK != - DDS_StatusCondition_set_enabled_statuses( - this->scond, DDS_STATUS_MASK_NONE)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to reset status condition's statuses") - return RMW_RET_ERROR; - } - return RMW_RET_OK; - } - - rmw_ret_t - enable_statuses(const DDS_StatusMask statuses) - { - DDS_StatusMask current_statuses = - DDS_StatusCondition_get_enabled_statuses(this->scond); - current_statuses |= statuses; - if (DDS_RETCODE_OK != - DDS_StatusCondition_set_enabled_statuses(this->scond, current_statuses)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to set status condition's statuses") - return RMW_RET_ERROR; - } - return RMW_RET_OK; - } - - virtual bool - owns(DDS_Condition * const cond) - { - return cond == DDS_StatusCondition_as_condition(this->scond); - } - - bool - has_status(const rmw_event_type_t event_type) - { - const DDS_StatusMask status_mask = ros_event_to_dds(event_type, nullptr); - return DDS_Entity_get_status_changes(this->entity) & status_mask; - } - - virtual rmw_ret_t - get_status(const rmw_event_type_t event_type, void * const event_info) = 0; - - virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) - { - return RMW_Connext_Condition::attach( - waitset, DDS_StatusCondition_as_condition(this->scond)); - } - virtual rmw_ret_t _detach(DDS_WaitSet * const waitset) - { - return RMW_Connext_Condition::detach( - waitset, DDS_StatusCondition_as_condition(this->scond)); - } - -protected: - DDS_Entity * entity; - DDS_StatusCondition * scond; -}; - -class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition -{ -public: - explicit RMW_Connext_PublisherStatusCondition(DDS_DataWriter * const writer) - : RMW_Connext_StatusCondition(DDS_DataWriter_as_entity(writer)), - writer(writer) - { - } - - ~RMW_Connext_PublisherStatusCondition() - { - this->invalidate(); - } - - virtual rmw_ret_t - get_status(const rmw_event_type_t event_type, void * const event_info); - - // Helper functions to retrieve status information - inline rmw_ret_t - get_liveliness_lost_status(rmw_liveliness_lost_status_t * const status) - { - DDS_LivelinessLostStatus dds_status = DDS_LivelinessLostStatus_INITIALIZER; - - if (DDS_RETCODE_OK != - DDS_DataWriter_get_liveliness_lost_status(this->writer, &dds_status)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get liveliness lost status") - return RMW_RET_ERROR; - } - - status->total_count = dds_status.total_count; - status->total_count_change = dds_status.total_count_change; - - return RMW_RET_OK; - } - - inline rmw_ret_t - get_offered_deadline_missed_status( - rmw_offered_deadline_missed_status_t * const status) - { - DDS_OfferedDeadlineMissedStatus dds_status = DDS_OfferedDeadlineMissedStatus_INITIALIZER; - - if (DDS_RETCODE_OK != - DDS_DataWriter_get_offered_deadline_missed_status(this->writer, &dds_status)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get offered deadline missed status") - return RMW_RET_ERROR; - } - - status->total_count = dds_status.total_count; - status->total_count_change = dds_status.total_count_change; - - return RMW_RET_OK; - } - - inline rmw_ret_t - get_offered_qos_incompatible_status( - rmw_offered_qos_incompatible_event_status_t * const status) - { - DDS_OfferedIncompatibleQosStatus dds_status = - DDS_OfferedIncompatibleQosStatus_INITIALIZER; - - if (DDS_RETCODE_OK != - DDS_DataWriter_get_offered_incompatible_qos_status(this->writer, &dds_status)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get offered incompatible qos status") - return RMW_RET_ERROR; - } - - status->total_count = dds_status.total_count; - status->total_count_change = dds_status.total_count_change; - status->last_policy_kind = - dds_qos_policy_to_rmw_qos_policy(dds_status.last_policy_id); - - return RMW_RET_OK; - } - -protected: - DDS_DataWriter * const writer; -}; - -class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition -{ -public: - RMW_Connext_SubscriberStatusCondition( - DDS_DataReader * const reader, - const bool ignore_local, - const bool internal) - : RMW_Connext_StatusCondition(DDS_DataReader_as_entity(reader)), - ignore_local(ignore_local), - participant_handle( - DDS_Entity_get_instance_handle( - DDS_DomainParticipant_as_entity( - DDS_Subscriber_get_participant( - DDS_DataReader_get_subscriber(reader))))), - reader(reader), - dcond(nullptr), - attached_waitset_dcond(nullptr) - { - UNUSED_ARG(internal); - this->dcond = DDS_GuardCondition_new(); - if (nullptr == this->dcond) { - RMW_CONNEXT_LOG_ERROR_SET("failed to create reader's data condition") - throw std::runtime_error("failed to create reader's data condition"); - } - - if (RMW_RET_OK != this->install()) { - RMW_CONNEXT_LOG_ERROR("failed to install condition on reader") - throw std::runtime_error("failed to install condition on reader"); - } - } - - virtual ~RMW_Connext_SubscriberStatusCondition() - { - this->invalidate(); - if (nullptr != this->dcond) { - if (DDS_RETCODE_OK != DDS_GuardCondition_delete(this->dcond)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to delete reader's data condition") - } - } - } - - virtual rmw_ret_t - get_status(const rmw_event_type_t event_type, void * const event_info); - - virtual bool - owns(DDS_Condition * const cond) - { - return RMW_Connext_StatusCondition::owns(cond) || - cond == DDS_GuardCondition_as_condition(this->dcond); - } - - rmw_ret_t - attach_data() - { - if (nullptr == this->attached_waitset) { - RMW_CONNEXT_LOG_ERROR("condition must be already attached") - return RMW_RET_ERROR; - } - rmw_ret_t rc = RMW_Connext_Condition::attach( - this->attached_waitset->waitset, - DDS_GuardCondition_as_condition(this->dcond)); - if (RMW_RET_OK != rc) { - return rc; - } - this->attached_waitset_dcond = this->attached_waitset; - return RMW_RET_OK; - } - - virtual rmw_ret_t - detach() - { - rmw_ret_t rc = RMW_Connext_Condition::detach(); - if (RMW_RET_OK != rc) { - return rc; - } - if (nullptr != this->attached_waitset_dcond) { - RMW_Connext_WaitSet * const ws = this->attached_waitset_dcond; - this->attached_waitset_dcond = nullptr; - return RMW_Connext_Condition::detach( - ws->waitset, DDS_GuardCondition_as_condition(this->dcond)); - } - return RMW_RET_OK; - } - - rmw_ret_t - set_data_available(const bool available) - { - if (DDS_RETCODE_OK != - DDS_GuardCondition_set_trigger_value(this->dcond, available)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to set reader's data condition trigger") - return RMW_RET_ERROR; - } - return RMW_RET_OK; - } - - const bool ignore_local; - const DDS_InstanceHandle_t participant_handle; - - // Methods for "internal" subscribers only - inline rmw_ret_t - trigger_loan_guard_condition(const bool trigger_value) - { - UNUSED_ARG(trigger_value); - return RMW_RET_OK; - } - - // Helper functions to retrieve status information - inline rmw_ret_t - get_liveliness_changed_status(rmw_liveliness_changed_status_t * const status) - { - DDS_LivelinessChangedStatus dds_status = DDS_LivelinessChangedStatus_INITIALIZER; - - if (DDS_RETCODE_OK != - DDS_DataReader_get_liveliness_changed_status(this->reader, &dds_status)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get liveliness changed status") - return RMW_RET_ERROR; - } - - status->alive_count = dds_status.alive_count; - status->alive_count_change = dds_status.alive_count_change; - status->not_alive_count = dds_status.not_alive_count; - status->not_alive_count_change = dds_status.not_alive_count_change; - - return RMW_RET_OK; - } - - inline rmw_ret_t - get_requested_deadline_missed_status( - rmw_requested_deadline_missed_status_t * const status) - { - DDS_RequestedDeadlineMissedStatus dds_status = - DDS_RequestedDeadlineMissedStatus_INITIALIZER; - - if (DDS_RETCODE_OK != - DDS_DataReader_get_requested_deadline_missed_status(this->reader, &dds_status)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get requested deadline missed status") - return RMW_RET_ERROR; - } - - status->total_count = dds_status.total_count; - status->total_count_change = dds_status.total_count_change; - - return RMW_RET_OK; - } - - inline rmw_ret_t - get_requested_qos_incompatible_status( - rmw_requested_qos_incompatible_event_status_t * const status) - { - DDS_RequestedIncompatibleQosStatus dds_status = - DDS_RequestedIncompatibleQosStatus_INITIALIZER; - - if (DDS_RETCODE_OK != - DDS_DataReader_get_requested_incompatible_qos_status(this->reader, &dds_status)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get requested incompatible qos status") - return RMW_RET_ERROR; - } - - status->total_count = dds_status.total_count; - status->total_count_change = dds_status.total_count_change; - status->last_policy_kind = - dds_qos_policy_to_rmw_qos_policy(dds_status.last_policy_id); - - return RMW_RET_OK; - } - - inline rmw_ret_t - get_message_lost_status(rmw_message_lost_status_t * const status) - { - DDS_SampleLostStatus dds_status = DDS_SampleLostStatus_INITIALIZER; - - if (DDS_RETCODE_OK != - DDS_DataReader_get_sample_lost_status(this->reader, &dds_status)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to get sample lost status") - return RMW_RET_ERROR; - } - - status->total_count = dds_status.total_count; - status->total_count_change = dds_status.total_count_change; - - return RMW_RET_OK; - } - -protected: - rmw_ret_t - install(); - - DDS_DataReader * const reader; - DDS_GuardCondition * dcond; - RMW_Connext_WaitSet * attached_waitset_dcond; -}; - -/****************************************************************************** - * Event support - ******************************************************************************/ -class RMW_Connext_Event -{ -public: - static inline - RMW_Connext_StatusCondition * - condition(const rmw_event_t * const event); - - static inline - bool - active(rmw_event_t * const event); - - static - bool - writer_event(const rmw_event_t * const event) - { - return !ros_event_for_reader(event->event_type); - } - - static - bool - reader_event(const rmw_event_t * const event) - { - return ros_event_for_reader(event->event_type); - } - - static - RMW_Connext_Publisher * - publisher(const rmw_event_t * const event) - { - return reinterpret_cast(event->data); - } - - static - RMW_Connext_Subscriber * - subscriber(const rmw_event_t * const event) - { - return reinterpret_cast(event->data); - } -}; -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ -#endif // RMW_CONNEXTDDS__RMW_WAITSET_DDS_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp index f4d4f9a1..09c911e4 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp @@ -17,16 +17,13 @@ #include "rmw_connextdds/context.hpp" -#if RMW_CONNEXT_CPP_STD_WAITSETS /****************************************************************************** * Alternative implementation of WaitSets and Conditions using C++ std ******************************************************************************/ class RMW_Connext_WaitSet { public: - RMW_Connext_WaitSet() - : waiting(false) - {} + RMW_Connext_WaitSet() {} rmw_ret_t wait( @@ -64,10 +61,9 @@ class RMW_Connext_WaitSet rmw_clients_t * const cls, rmw_events_t * const evs); + bool waiting{false}; std::mutex mutex_internal; - bool waiting; std::condition_variable condition; - std::mutex condition_mutex; }; class RMW_Connext_Condition @@ -79,26 +75,52 @@ class RMW_Connext_Condition waitset_condition(nullptr) {} + template void attach( std::mutex * const waitset_mutex, - std::condition_variable * const waitset_condition) + std::condition_variable * const waitset_condition, + bool & already_active, + FunctorT && check_trigger) { std::lock_guard lock(this->mutex_internal); - this->waitset_mutex = waitset_mutex; - this->waitset_condition = waitset_condition; + already_active = check_trigger(); + if (!already_active) { + this->waitset_mutex = waitset_mutex; + this->waitset_condition = waitset_condition; + } } + template void - detach() + detach(FunctorT && on_detached) { std::lock_guard lock(this->mutex_internal); this->waitset_mutex = nullptr; this->waitset_condition = nullptr; + on_detached(); } virtual bool owns(DDS_Condition * const cond) = 0; + template + void + update_state(FunctorT && update_condition, const bool notify) + { + std::lock_guard internal_lock(this->mutex_internal); + + if (nullptr != this->waitset_mutex) { + std::lock_guard lock(*this->waitset_mutex); + update_condition(); + } else { + update_condition(); + } + + if (notify && nullptr != this->waitset_condition) { + this->waitset_condition->notify_one(); + } + } + protected: std::mutex mutex_internal; std::mutex * waitset_mutex; @@ -133,6 +155,9 @@ class RMW_Connext_Condition } return RMW_RET_OK; } + + friend class RMW_Connext_WaitSet; + friend class RMW_Connext_Event; }; class RMW_Connext_GuardCondition : public RMW_Connext_Condition @@ -164,18 +189,6 @@ class RMW_Connext_GuardCondition : public RMW_Connext_Condition return this->gcond; } - bool - trigger_check() - { - return this->trigger_value.exchange(false); - } - - bool - has_triggered() - { - return this->trigger_value; - } - rmw_ret_t trigger() { @@ -184,23 +197,17 @@ class RMW_Connext_GuardCondition : public RMW_Connext_Condition DDS_GuardCondition_set_trigger_value( this->gcond, DDS_BOOLEAN_TRUE)) { - RMW_CONNEXT_LOG_ERROR_SET("failed to trigger guard condition") + RMW_CONNEXT_LOG_ERROR_SET("failed to trigger internal guard condition") return RMW_RET_ERROR; } return RMW_RET_OK; } - std::lock_guard lock(this->mutex_internal); - - if (this->waitset_mutex != nullptr) { - std::unique_lock clock(*this->waitset_mutex); - this->trigger_value = true; - clock.unlock(); - this->waitset_condition->notify_one(); - } else { - this->trigger_value = true; - } + update_state( + [this]() { + this->trigger_value = true; + }, true /* notify */); return RMW_RET_OK; } @@ -223,9 +230,11 @@ class RMW_Connext_GuardCondition : public RMW_Connext_Condition } protected: - std::atomic_bool trigger_value; + bool trigger_value; bool internal; DDS_GuardCondition * gcond; + + friend class RMW_Connext_WaitSet; }; class RMW_Connext_StatusCondition : public RMW_Connext_Condition @@ -321,6 +330,9 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition return RMW_RET_OK; } + virtual bool + has_status(const rmw_event_type_t event_type) = 0; + protected: DDS_StatusCondition * scond; }; @@ -348,7 +360,7 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition public: explicit RMW_Connext_PublisherStatusCondition(DDS_DataWriter * const writer); - bool + virtual bool has_status(const rmw_event_type_t event_type); virtual rmw_ret_t @@ -394,11 +406,16 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition inline rmw_ret_t get_liveliness_lost_status(rmw_liveliness_lost_status_t * const status) { - status->total_count = this->status_liveliness.total_count; - status->total_count_change = this->status_liveliness.total_count_change; + update_state( + [this, status]() { + this->triggered_liveliness = false; + + status->total_count = this->status_liveliness.total_count; + status->total_count_change = this->status_liveliness.total_count_change; - this->status_liveliness.total_count_change = 0; - this->triggered_liveliness = false; + this->status_liveliness.total_count_change = 0; + this->status_liveliness_last = this->status_liveliness; + }, false /* notify */); return RMW_RET_OK; } @@ -407,11 +424,16 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition get_offered_deadline_missed_status( rmw_offered_deadline_missed_status_t * const status) { - status->total_count = this->status_deadline.total_count; - status->total_count_change = this->status_deadline.total_count_change; + update_state( + [this, status]() { + this->triggered_deadline = false; - this->status_deadline.total_count_change = 0; - this->triggered_deadline = false; + status->total_count = this->status_deadline.total_count; + status->total_count_change = this->status_deadline.total_count_change; + + this->status_deadline.total_count_change = 0; + this->status_deadline_last = this->status_deadline; + }, false /* notify */); return RMW_RET_OK; } @@ -420,13 +442,18 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition get_offered_qos_incompatible_status( rmw_offered_qos_incompatible_event_status_t * const status) { - status->total_count = this->status_qos.total_count; - status->total_count_change = this->status_qos.total_count_change; - status->last_policy_kind = - dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); + update_state( + [this, status]() { + this->triggered_qos = false; + + status->total_count = this->status_qos.total_count; + status->total_count_change = this->status_qos.total_count_change; + status->last_policy_kind = + dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); - this->status_qos.total_count_change = 0; - this->triggered_qos = false; + this->status_qos.total_count_change = 0; + this->status_qos_last = this->status_qos; + }, false /* notify */); return RMW_RET_OK; } @@ -441,14 +468,18 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition void update_status_qos( const DDS_OfferedIncompatibleQosStatus * const status); - bool triggered_deadline; - bool triggered_liveliness; - bool triggered_qos; + bool triggered_deadline{false}; + bool triggered_liveliness{false}; + bool triggered_qos{false}; DDS_OfferedDeadlineMissedStatus status_deadline; DDS_OfferedIncompatibleQosStatus status_qos; DDS_LivelinessLostStatus status_liveliness; + DDS_OfferedDeadlineMissedStatus status_deadline_last; + DDS_OfferedIncompatibleQosStatus status_qos_last; + DDS_LivelinessLostStatus status_liveliness_last; + RMW_Connext_Publisher * pub; }; @@ -491,7 +522,7 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition virtual ~RMW_Connext_SubscriberStatusCondition(); - bool + virtual bool has_status(const rmw_event_type_t event_type); virtual rmw_ret_t @@ -534,15 +565,6 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition rmw_ret_t install(RMW_Connext_Subscriber * const sub); - bool - on_data_triggered() - { - return this->triggered_data.exchange(false); - } - - void - on_data(); - void on_requested_deadline_missed( const DDS_RequestedDeadlineMissedStatus * const status); @@ -564,31 +586,28 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition rmw_ret_t set_data_available(const bool available) { - UNUSED_ARG(available); - return RMW_RET_OK; - } + update_state( + [this, available]() { + this->triggered_data = available; + }, true /* notify */); - // Methods for "internal" subscribers only - inline rmw_ret_t - trigger_loan_guard_condition(const bool trigger_value) - { - if (nullptr == this->_loan_guard_condition) { - return RMW_RET_ERROR; - } - if (DDS_RETCODE_OK != DDS_GuardCondition_set_trigger_value( - this->_loan_guard_condition, trigger_value)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to set internal reader condition's trigger") - return RMW_RET_ERROR; + if (nullptr != this->loan_guard_condition) { + if (DDS_RETCODE_OK != + DDS_GuardCondition_set_trigger_value(this->loan_guard_condition, available)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to set internal reader condition's trigger") + return RMW_RET_ERROR; + } } return RMW_RET_OK; } - DDS_Condition * - loan_guard_condition() const + virtual bool + owns(DDS_Condition * const cond) { - return (nullptr != this->_loan_guard_condition) ? - DDS_GuardCondition_as_condition(this->_loan_guard_condition) : nullptr; + return RMW_Connext_StatusCondition::owns(cond) || + (nullptr != this->loan_guard_condition && + cond == DDS_GuardCondition_as_condition(this->loan_guard_condition)); } virtual rmw_ret_t _attach(DDS_WaitSet * const waitset) @@ -597,9 +616,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition if (RMW_RET_OK != rc) { return rc; } - if (nullptr != this->_loan_guard_condition) { + if (nullptr != this->loan_guard_condition) { return RMW_Connext_Condition::_attach( - waitset, DDS_GuardCondition_as_condition(this->_loan_guard_condition)); + waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition)); } return RMW_RET_OK; } @@ -609,9 +628,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition if (RMW_RET_OK != rc) { return rc; } - if (nullptr != this->_loan_guard_condition) { + if (nullptr != this->loan_guard_condition) { return RMW_Connext_Condition::_detach( - waitset, DDS_GuardCondition_as_condition(this->_loan_guard_condition)); + waitset, DDS_GuardCondition_as_condition(this->loan_guard_condition)); } return RMW_RET_OK; } @@ -620,14 +639,19 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition inline rmw_ret_t get_liveliness_changed_status(rmw_liveliness_changed_status_t * const status) { - status->alive_count = this->status_liveliness.alive_count; - status->alive_count_change = this->status_liveliness.alive_count_change; - status->not_alive_count = this->status_liveliness.not_alive_count; - status->not_alive_count_change = this->status_liveliness.not_alive_count_change; + update_state( + [this, status]() { + this->triggered_liveliness = false; + + status->alive_count = this->status_liveliness.alive_count; + status->alive_count_change = this->status_liveliness.alive_count_change; + status->not_alive_count = this->status_liveliness.not_alive_count; + status->not_alive_count_change = this->status_liveliness.not_alive_count_change; - this->status_liveliness.alive_count_change = 0; - this->status_liveliness.not_alive_count_change = 0; - this->triggered_liveliness = false; + this->status_liveliness.alive_count_change = 0; + this->status_liveliness.not_alive_count_change = 0; + this->status_liveliness_last = this->status_liveliness; + }, false /* notify */); return RMW_RET_OK; } @@ -636,11 +660,16 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition get_requested_deadline_missed_status( rmw_requested_deadline_missed_status_t * const status) { - status->total_count = this->status_deadline.total_count; - status->total_count_change = this->status_deadline.total_count_change; + update_state( + [this, status]() { + this->triggered_deadline = false; - this->status_deadline.total_count_change = 0; - this->triggered_deadline = false; + status->total_count = this->status_deadline.total_count; + status->total_count_change = this->status_deadline.total_count_change; + + this->status_deadline.total_count_change = 0; + this->status_deadline_last = this->status_deadline; + }, false /* notify */); return RMW_RET_OK; } @@ -650,13 +679,18 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition get_requested_qos_incompatible_status( rmw_requested_qos_incompatible_event_status_t * const status) { - status->total_count = this->status_qos.total_count; - status->total_count_change = this->status_qos.total_count_change; - status->last_policy_kind = - dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); + update_state( + [this, status]() { + this->triggered_qos = false; + + status->total_count = this->status_qos.total_count; + status->total_count_change = this->status_qos.total_count_change; + status->last_policy_kind = + dds_qos_policy_to_rmw_qos_policy(this->status_qos.last_policy_id); - this->status_qos.total_count_change = 0; - this->triggered_qos = false; + this->status_qos.total_count_change = 0; + this->status_qos_last = this->status_qos; + }, false /* notify */); return RMW_RET_OK; } @@ -664,11 +698,16 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition inline rmw_ret_t get_message_lost_status(rmw_message_lost_status_t * const status) { - status->total_count = this->status_sample_lost.total_count; - status->total_count_change = this->status_sample_lost.total_count_change; + update_state( + [this, status]() { + this->triggered_sample_lost = false; - this->status_sample_lost.total_count_change = 0; - this->triggered_sample_lost = false; + status->total_count = this->status_sample_lost.total_count; + status->total_count_change = this->status_sample_lost.total_count_change; + + this->status_sample_lost.total_count_change = 0; + this->status_sample_lost_last = this->status_sample_lost; + }, false /* notify */); return RMW_RET_OK; } @@ -686,20 +725,27 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition void update_status_sample_lost( const DDS_SampleLostStatus * const status); - std::atomic_bool triggered_deadline; - std::atomic_bool triggered_liveliness; - std::atomic_bool triggered_qos; - std::atomic_bool triggered_sample_lost; - std::atomic_bool triggered_data; + DDS_GuardCondition * const loan_guard_condition; - DDS_GuardCondition * _loan_guard_condition; + bool triggered_deadline{false}; + bool triggered_liveliness{false}; + bool triggered_qos{false}; + bool triggered_sample_lost{false}; + bool triggered_data{false}; DDS_RequestedDeadlineMissedStatus status_deadline; DDS_RequestedIncompatibleQosStatus status_qos; DDS_LivelinessChangedStatus status_liveliness; DDS_SampleLostStatus status_sample_lost; + DDS_RequestedDeadlineMissedStatus status_deadline_last; + DDS_RequestedIncompatibleQosStatus status_qos_last; + DDS_LivelinessChangedStatus status_liveliness_last; + DDS_SampleLostStatus status_sample_lost_last; + RMW_Connext_Subscriber * sub; + + friend class RMW_Connext_WaitSet; }; /****************************************************************************** @@ -716,10 +762,6 @@ class RMW_Connext_Event rmw_ret_t disable(rmw_event_t * const event); - static - bool - active(rmw_event_t * const event); - static DDS_Condition * condition(const rmw_event_t * const event); @@ -752,5 +794,4 @@ class RMW_Connext_Event return reinterpret_cast(event->data); } }; -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ #endif // RMW_CONNEXTDDS__RMW_WAITSET_STD_HPP_ diff --git a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp index f4732d28..2ad1c9fc 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/static_config.hpp @@ -263,14 +263,14 @@ * In order to reduce the time to cleanup a participant (Node), we use the * advice from * https://community.rti.com/static/documentation/connext-dds/5.3.1/doc/api/connext_dds/api_cpp/structDDS__DomainParticipantQos.html - * and reduce the shutdown_cleanup_period to 50 milliseconds. + * and reduce the shutdown_cleanup_period to 10 milliseconds. ******************************************************************************/ #ifndef RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_SEC #define RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_SEC 0 #endif /* RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_SEC */ #ifndef RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC -#define RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC 50000000 +#define RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC 10000000 #endif /* RMW_CONNEXT_SHUTDOWN_CLEANUP_PERIOD_NSEC */ /****************************************************************************** @@ -299,13 +299,6 @@ #define RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE 1 #endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */ -/****************************************************************************** - * Use an alternative implementation of WaitSets based on C++ std library - ******************************************************************************/ -#ifndef RMW_CONNEXT_CPP_STD_WAITSETS -#define RMW_CONNEXT_CPP_STD_WAITSETS 0 -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ - #include "resource_limits.hpp" #endif // RMW_CONNEXTDDS__STATIC_CONFIG_HPP_ diff --git a/rmw_connextdds_common/src/common/rmw_graph.cpp b/rmw_connextdds_common/src/common/rmw_graph.cpp index acdbb3c1..f40659e8 100644 --- a/rmw_connextdds_common/src/common/rmw_graph.cpp +++ b/rmw_connextdds_common/src/common/rmw_graph.cpp @@ -27,6 +27,7 @@ rmw_connextdds_graph_add_entityEA( const DDS_GUID_t * const dp_guid, const char * const topic_name, const char * const type_name, + const DDS_HistoryQosPolicy * const history, const DDS_ReliabilityQosPolicy * const reliability, const DDS_DurabilityQosPolicy * const durability, const DDS_DeadlineQosPolicy * const deadline, @@ -726,6 +727,7 @@ rmw_connextdds_graph_add_entityEA( const DDS_GUID_t * const dp_guid, const char * const topic_name, const char * const type_name, + const DDS_HistoryQosPolicy * const history, const DDS_ReliabilityQosPolicy * const reliability, const DDS_DurabilityQosPolicy * const durability, const DDS_DeadlineQosPolicy * const deadline, @@ -744,7 +746,7 @@ rmw_connextdds_graph_add_entityEA( if (RMW_RET_OK != rmw_connextdds_readerwriter_qos_to_ros( - nullptr /* history */, + history, reliability, durability, deadline, @@ -873,6 +875,7 @@ rmw_connextdds_graph_add_local_publisherEA( &dp_guid, topic_name, pub->message_type_support()->type_name(), + &dw_qos.history, &dw_qos.reliability, &dw_qos.durability, &dw_qos.deadline, @@ -943,6 +946,7 @@ rmw_connextdds_graph_add_local_subscriberEA( &dp_guid, topic_name, sub->message_type_support()->type_name(), + &dr_qos.history, &dr_qos.reliability, &dr_qos.durability, &dr_qos.deadline, @@ -984,6 +988,7 @@ rmw_connextdds_graph_add_remote_entity( dp_guid, topic_name, type_name, + nullptr /* history (not propagated via discovery) */, reliability, durability, deadline, diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index 29b671d7..90ac36f4 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -617,12 +617,10 @@ RMW_Connext_Publisher::RMW_Connext_Publisher( status_condition(dds_writer) { rmw_connextdds_get_entity_gid(this->dds_writer, this->ros_gid); -#if RMW_CONNEXT_CPP_STD_WAITSETS if (RMW_RET_OK != this->status_condition.install(this)) { RMW_CONNEXT_LOG_ERROR("failed to install condition on writer") throw std::runtime_error("failed to install condition on writer"); } -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ } RMW_Connext_Publisher * @@ -1106,12 +1104,10 @@ RMW_Connext_Subscriber::RMW_Connext_Subscriber( this->loan_info = def_info_seq; this->loan_len = 0; this->loan_next = 0; -#if RMW_CONNEXT_CPP_STD_WAITSETS if (RMW_RET_OK != this->status_condition.install(this)) { RMW_CONNEXT_LOG_ERROR("failed to install condition on reader") throw std::runtime_error("failed to install condition on reader"); } -#endif /* RMW_CONNEXT_CPP_STD_WAITSETS */ } RMW_Connext_Subscriber * @@ -1489,7 +1485,7 @@ RMW_Connext_Subscriber::take_serialized( } rmw_ret_t -RMW_Connext_Subscriber::loan_messages() +RMW_Connext_Subscriber::loan_messages(const bool update_condition) { /* this function should only be called once all previously loaned messages have been returned */ @@ -1506,11 +1502,11 @@ RMW_Connext_Subscriber::loan_messages() "[%s] loaned messages: %lu", this->type_support->type_name(), this->loan_len) - if (this->loan_len > 0) { - return this->status_condition.set_data_available(true); + if (update_condition) { + return this->status_condition.set_data_available(this->loan_len > 0); + } else { + return RMW_RET_OK; } - - return RMW_RET_OK; } rmw_ret_t @@ -2429,10 +2425,49 @@ RMW_Connext_Client::enable() rmw_ret_t RMW_Connext_Client::is_service_available(bool & available) { - /* TODO(asorbini): check that we actually have at least one service matched by both - request writer and response reader */ - available = (this->request_pub->subscriptions_count() > 0 && - this->reply_sub->publications_count() > 0); + // mark service as available if we have at least one writer and one reader + // matched from the same remote DomainParticipant. + struct DDS_InstanceHandleSeq matched_req_subs = DDS_SEQUENCE_INITIALIZER, + matched_rep_pubs = DDS_SEQUENCE_INITIALIZER; + auto scope_exit_seqs = rcpputils::make_scope_exit( + [&matched_req_subs, &matched_rep_pubs]() + { + if (!DDS_InstanceHandleSeq_finalize(&matched_req_subs)) { + RMW_CONNEXT_LOG_ERROR("failed to finalize req instance handle sequence") + } + if (!DDS_InstanceHandleSeq_finalize(&matched_rep_pubs)) { + RMW_CONNEXT_LOG_ERROR("failed to finalize rep instance handle sequence") + } + }); + + DDS_ReturnCode_t dds_rc = + DDS_DataWriter_get_matched_subscriptions(this->request_pub->writer(), &matched_req_subs); + if (DDS_RETCODE_OK != dds_rc) { + RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched subscriptions: dds_rc=%d", dds_rc) + return RMW_RET_ERROR; + } + + dds_rc = + DDS_DataReader_get_matched_publications(this->reply_sub->reader(), &matched_rep_pubs); + if (DDS_RETCODE_OK != dds_rc) { + RMW_CONNEXT_LOG_ERROR_A_SET("failed to list matched publications: dds_rc=%d", dds_rc) + return RMW_RET_ERROR; + } + + const DDS_Long subs_len = DDS_InstanceHandleSeq_get_length(&matched_req_subs), + pubs_len = DDS_InstanceHandleSeq_get_length(&matched_rep_pubs); + + for (DDS_Long i = 0; i < subs_len && !available; i++) { + DDS_InstanceHandle_t * const sub_ih = + DDS_InstanceHandleSeq_get_reference(&matched_req_subs, i); + + for (DDS_Long j = 0; j < pubs_len && !available; j++) { + DDS_InstanceHandle_t * const pub_ih = + DDS_InstanceHandleSeq_get_reference(&matched_rep_pubs, j); + available = DDS_InstanceHandle_compare_prefix(sub_ih, pub_ih) == 0; + } + } + return RMW_RET_OK; } diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_dds.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_dds.cpp deleted file mode 100644 index 4feb1e19..00000000 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_dds.cpp +++ /dev/null @@ -1,819 +0,0 @@ -// Copyright 2020 Real-Time Innovations, Inc. (RTI) -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include "rmw_connextdds/rmw_impl.hpp" - -#include "rmw_dds_common/time_utils.hpp" - -#if !RMW_CONNEXT_CPP_STD_WAITSETS -/****************************************************************************** - * WaitSet - ******************************************************************************/ - -template -bool -RMW_Connext_WaitSet::require_attach( - const std::vector & attached_els, - const size_t new_els_count, - void ** const new_els) -{ - if (nullptr == new_els || 0 == new_els_count) { - return attached_els.size() > 0; - } else if (new_els_count != attached_els.size()) { - return true; - } else { - const void * const attached_data = - static_cast(attached_els.data()); - void * const new_els_data = static_cast(new_els); - - const size_t cmp_size = new_els_count * sizeof(void *); - - return memcmp(attached_data, new_els_data, cmp_size) != 0; - } -} - -rmw_ret_t -RMW_Connext_WaitSet::detach() -{ - bool failed = false; - - for (auto && sub : this->attached_subscribers) { - RMW_Connext_StatusCondition * const cond = sub->condition(); - { - std::lock_guard lock(cond->mutex_internal); - rmw_ret_t rc = cond->detach(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to detach subscriber's condition") - failed = true; - } - } - } - this->attached_subscribers.clear(); - - for (auto && gc : this->attached_conditions) { - { - std::lock_guard lock(gc->mutex_internal); - rmw_ret_t rc = gc->detach(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to detach guard condition") - failed = true; - } - } - } - this->attached_conditions.clear(); - - for (auto && client : this->attached_clients) { - RMW_Connext_StatusCondition * const cond = client->subscriber()->condition(); - { - std::lock_guard lock(cond->mutex_internal); - rmw_ret_t rc = cond->detach(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to detach client's condition") - failed = true; - } - } - } - this->attached_clients.clear(); - - for (auto && service : this->attached_services) { - RMW_Connext_StatusCondition * const cond = service->subscriber()->condition(); - { - std::lock_guard lock(cond->mutex_internal); - rmw_ret_t rc = cond->detach(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to detach service's condition") - failed = true; - } - } - } - this->attached_services.clear(); - - for (auto && e : this->attached_events) { - auto e_cached = this->attached_events_cache[e]; - RMW_Connext_StatusCondition * const cond = RMW_Connext_Event::condition(&e_cached); - { - std::lock_guard lock(cond->mutex_internal); - rmw_ret_t rc = cond->detach(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to detach event's condition") - failed = true; - } - } - } - this->attached_events.clear(); - this->attached_events_cache.clear(); - - return failed ? RMW_RET_ERROR : RMW_RET_OK; -} - - -rmw_ret_t -RMW_Connext_WaitSet::attach( - rmw_subscriptions_t * const subs, - rmw_guard_conditions_t * const gcs, - rmw_services_t * const srvs, - rmw_clients_t * const cls, - rmw_events_t * const evs) -{ - const bool refresh_attach_subs = - this->require_attach( - this->attached_subscribers, - (nullptr != subs) ? subs->subscriber_count : 0, - (nullptr != subs) ? subs->subscribers : nullptr), - refresh_attach_gcs = - this->require_attach( - this->attached_conditions, - (nullptr != gcs) ? gcs->guard_condition_count : 0, - (nullptr != gcs) ? gcs->guard_conditions : nullptr), - refresh_attach_srvs = - this->require_attach( - this->attached_services, - (nullptr != srvs) ? srvs->service_count : 0, - (nullptr != srvs) ? srvs->services : nullptr), - refresh_attach_cls = - this->require_attach( - this->attached_clients, - (nullptr != cls) ? cls->client_count : 0, - (nullptr != cls) ? cls->clients : nullptr), - refresh_attach_evs = - this->require_attach( - this->attached_events, - (nullptr != evs) ? evs->event_count : 0, - (nullptr != evs) ? evs->events : nullptr); - const bool refresh_attach = - refresh_attach_subs || - refresh_attach_gcs || - refresh_attach_evs || - refresh_attach_srvs || - refresh_attach_cls; - - if (!refresh_attach) { - // nothing to do since lists of attached elements didn't change - return RMW_RET_OK; - } - - rmw_ret_t rc = this->detach(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to detach conditions from waitset") - return rc; - } - - // First iterate over events, and reset the "enabled statuses" of the - // target entity's status condition. We could skip any subscriber that is - // also passed in for data (since the "enabled statuses" will be reset to - // DATA_AVAILABLE only for these subscribers), but we reset them anyway to - // avoid having to search the subscriber's list for each one. - if (nullptr != evs) { - for (size_t i = 0; i < evs->event_count; ++i) { - rmw_event_t * const event = - reinterpret_cast(evs->events[i]); - RMW_Connext_StatusCondition * const cond = - RMW_Connext_Event::condition(event); - RMW_Connext_WaitSet * const otherws = cond->attached_waitset; - bool detached = false; - if (nullptr != otherws && this != otherws) { - otherws->invalidate(cond); - detached = true; - } - { - std::lock_guard lock(cond->mutex_internal); - if (cond->deleted) { - return RMW_RET_ERROR; - } - if (detached) { - cond->attached_waitset = nullptr; - } - rmw_ret_t rc = cond->reset_statuses(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to reset event's condition") - return rc; - } - } - } - } - - if (nullptr != subs) { - for (size_t i = 0; i < subs->subscriber_count; ++i) { - RMW_Connext_Subscriber * const sub = - reinterpret_cast(subs->subscribers[i]); - RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); - RMW_Connext_WaitSet * const otherws = cond->attached_waitset; - bool detached = false; - if (nullptr != otherws && this != otherws) { - otherws->invalidate(cond); - detached = true; - } - { - std::lock_guard lock(cond->mutex_internal); - if (cond->deleted) { - return RMW_RET_ERROR; - } - if (detached) { - cond->attached_waitset = nullptr; - } - rmw_ret_t rc = cond->reset_statuses(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to reset subscriber's condition") - return rc; - } - rc = cond->enable_statuses(DDS_DATA_AVAILABLE_STATUS); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable subscriber's condition") - return rc; - } - rc = cond->attach(this); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach subscriber's condition") - return rc; - } - rc = cond->attach_data(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach subscriber's data condition") - return rc; - } - } - this->attached_subscribers.push_back(sub); - } - } - - if (nullptr != cls) { - for (size_t i = 0; i < cls->client_count; ++i) { - RMW_Connext_Client * const client = - reinterpret_cast(cls->clients[i]); - RMW_Connext_SubscriberStatusCondition * const cond = client->subscriber()->condition(); - RMW_Connext_WaitSet * const otherws = cond->attached_waitset; - bool detached = false; - if (nullptr != otherws && this != otherws) { - otherws->invalidate(cond); - detached = true; - } - { - std::lock_guard lock(cond->mutex_internal); - if (cond->deleted) { - return RMW_RET_ERROR; - } - if (detached) { - cond->attached_waitset = nullptr; - } - rmw_ret_t rc = cond->reset_statuses(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to reset subscriber's condition") - return rc; - } - rc = cond->enable_statuses(DDS_DATA_AVAILABLE_STATUS); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable client's condition") - return rc; - } - rc = cond->attach(this); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach client's condition") - return rc; - } - rc = cond->attach_data(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach client's data condition") - return rc; - } - } - this->attached_clients.push_back(client); - } - } - - if (nullptr != srvs) { - for (size_t i = 0; i < srvs->service_count; ++i) { - RMW_Connext_Service * const svc = - reinterpret_cast(srvs->services[i]); - RMW_Connext_SubscriberStatusCondition * const cond = svc->subscriber()->condition(); - RMW_Connext_WaitSet * const otherws = cond->attached_waitset; - bool detached = false; - if (nullptr != otherws && this != otherws) { - otherws->invalidate(cond); - detached = true; - } - { - std::lock_guard lock(cond->mutex_internal); - if (cond->deleted) { - return RMW_RET_ERROR; - } - if (detached) { - cond->attached_waitset = nullptr; - } - rmw_ret_t rc = cond->reset_statuses(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to reset subscriber's condition") - return rc; - } - rc = cond->enable_statuses(DDS_DATA_AVAILABLE_STATUS); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable service's condition") - return rc; - } - rc = cond->attach(this); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach service's condition") - return rc; - } - rc = cond->attach_data(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach service's data condition") - return rc; - } - } - this->attached_services.push_back(svc); - } - } - - if (nullptr != evs) { - for (size_t i = 0; i < evs->event_count; ++i) { - rmw_event_t * const event = - reinterpret_cast(evs->events[i]); - RMW_Connext_StatusCondition * const cond = - RMW_Connext_Event::condition(event); - { - std::lock_guard lock(cond->mutex_internal); - if (cond->deleted) { - return RMW_RET_ERROR; - } - const DDS_StatusKind evt = ros_event_to_dds(event->event_type, nullptr); - rmw_ret_t rc = cond->enable_statuses(evt); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to enable event's condition") - return rc; - } - rc = cond->attach(this); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach event's condition") - return rc; - } - } - this->attached_events.push_back(event); - // Cache a shallow copy of the rmw_event_t structure so that we may - // access it safely during detach(), even if the original event has been - // deleted already by that time. - this->attached_events_cache.emplace(event, *event); - } - } - - if (nullptr != gcs) { - for (size_t i = 0; i < gcs->guard_condition_count; ++i) { - RMW_Connext_GuardCondition * const gcond = - reinterpret_cast(gcs->guard_conditions[i]); - RMW_Connext_WaitSet * const otherws = gcond->attached_waitset; - bool detached = false; - if (nullptr != otherws && this != otherws) { - otherws->invalidate(gcond); - detached = true; - } - { - std::lock_guard lock(gcond->mutex_internal); - if (gcond->deleted) { - return RMW_RET_ERROR; - } - if (detached) { - gcond->attached_waitset = nullptr; - } - rmw_ret_t rc = gcond->attach(this); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to attach guard condition") - return rc; - } - } - this->attached_conditions.push_back(gcond); - } - } - return RMW_RET_OK; -} - -bool -RMW_Connext_WaitSet::active_condition(RMW_Connext_Condition * const cond) -{ - DDS_Long active_len = DDS_ConditionSeq_get_length(&this->active_conditions); - bool active = false; - - for (DDS_Long i = 0; i < active_len && !active; i++) { - DDS_Condition * const acond = - *DDS_ConditionSeq_get_reference(&this->active_conditions, i); - active = cond->owns(acond); - } - - return active; -} - -bool -RMW_Connext_WaitSet::is_attached(RMW_Connext_Condition * const cond) -{ - for (auto && sub : this->attached_subscribers) { - if (sub->condition() == cond) { - return true; - } - } - - for (auto && gc : this->attached_conditions) { - if (gc == cond) { - return true; - } - } - - for (auto && client : this->attached_clients) { - if (client->subscriber()->condition() == cond) { - return true; - } - } - - for (auto && service : this->attached_services) { - if (service->subscriber()->condition() == cond) { - return true; - } - } - - for (auto && e : this->attached_events) { - auto e_cached = this->attached_events_cache[e]; - if (RMW_Connext_Event::condition(&e_cached) == cond) { - return true; - } - } - - return false; -} - -rmw_ret_t -RMW_Connext_WaitSet::process_wait( - rmw_subscriptions_t * const subs, - rmw_guard_conditions_t * const gcs, - rmw_services_t * const srvs, - rmw_clients_t * const cls, - rmw_events_t * const evs, - size_t & active_conditions) -{ - bool failed = false; - size_t i = 0; - - // If any of the attached conditions has become "invalid" while we were - // waiting, we will finish processing the results, and detach all existing - // conditions at the end, to make sure that no stale references is stored by - // the waitset after returning from the wait() call. - bool valid = true; - - i = 0; - for (auto && sub : this->attached_subscribers) { - // Check if the subscriber has some data already cached from the DataReader, - // or check the DataReader's cache and loan samples if needed. If empty, - // remove subscriber from returned list. - if (!sub->has_data()) { - subs->subscribers[i] = nullptr; - } else { - active_conditions += 1; - } - i += 1; - valid = valid && !sub->condition()->deleted; - } - - i = 0; - for (auto && gc : this->attached_conditions) { - // Scan active conditions returned by wait() looking for this guard condition - if (!this->active_condition(gc)) { - gcs->guard_conditions[i] = nullptr; - } else { - // reset conditions's trigger value. There is a risk of "race condition" - // here since resetting the trigger value might overwrite a positive - // trigger set by the upstream. In general, the DDS API expects guard - // conditions to be fully managed by the application, exactly to avoid - // this type of (pretty much unsolvable) issue (hence why - // DDS_WaitSet_wait() will not automatically reset the trigger value of - // an attached guard conditions). - if (RMW_RET_OK != gc->reset_trigger()) { - failed = true; - } - active_conditions += 1; - } - i += 1; - valid = valid && !gc->deleted; - } - - i = 0; - for (auto && client : this->attached_clients) { - if (!client->subscriber()->has_data()) { - cls->clients[i] = nullptr; - } else { - active_conditions += 1; - } - i += 1; - valid = valid && !client->subscriber()->condition()->deleted; - } - - i = 0; - for (auto && service : this->attached_services) { - if (!service->subscriber()->has_data()) { - srvs->services[i] = nullptr; - } else { - active_conditions += 1; - } - i += 1; - } - - i = 0; - for (auto && e : this->attached_events) { - auto e_cached = this->attached_events_cache[e]; - // Check if associated DDS status is active on the associated entity - if (!RMW_Connext_Event::active(&e_cached)) { - evs->events[i] = nullptr; - } else { - active_conditions += 1; - } - i += 1; - valid = valid && !RMW_Connext_Event::condition(&e_cached)->deleted; - } - - if (!failed && valid) { - return RMW_RET_OK; - } - - return RMW_RET_ERROR; -} - -static -rmw_ret_t -rmw_connextdds_duration_from_ros_time( - DDS_Duration_t * const duration, - const rmw_time_t * const ros_time) -{ - rmw_time_t in_time = rmw_dds_common::clamp_rmw_time_to_dds_time(*ros_time); - - duration->sec = static_cast(in_time.sec); - duration->nanosec = static_cast(in_time.nsec); - return RMW_RET_OK; -} - -rmw_ret_t -RMW_Connext_WaitSet::wait( - rmw_subscriptions_t * const subs, - rmw_guard_conditions_t * const gcs, - rmw_services_t * const srvs, - rmw_clients_t * const cls, - rmw_events_t * const evs, - const rmw_time_t * const wait_timeout) -{ - { - std::unique_lock lock(this->mutex_internal); - bool already_taken = false; - switch (this->state) { - case RMW_CONNEXT_WAITSET_FREE: - { - // waitset is available - break; - } - case RMW_CONNEXT_WAITSET_INVALIDATING: - { - // waitset is currently being invalidated, wait for other thread to - // complete. - this->state_cond.wait(lock); - already_taken = (RMW_CONNEXT_WAITSET_FREE != this->state); - break; - } - default: - { - already_taken = true; - break; - } - } - if (already_taken) { - // waitset is owned by another thread - RMW_CONNEXT_LOG_ERROR_SET( - "multiple concurrent wait()s not supported"); - return RMW_RET_ERROR; - } - this->state = RMW_CONNEXT_WAITSET_ACQUIRING; - } - // Notify condition variable of state transition - this->state_cond.notify_all(); - - RMW_Connext_WaitSet * const ws = this; - // If we return with an error, then try to detach all conditions to leave - // the waitset in a "clean" state. - auto scope_exit_detach = rcpputils::make_scope_exit( - [ws]() - { - if (RMW_RET_OK != ws->detach()) { - RMW_CONNEXT_LOG_ERROR("failed to detach conditions from waitset") - } - }); - - // After handling a possible error condition (i.e. clearing the waitset), - // transition back to "FREE" state. - auto scope_exit = rcpputils::make_scope_exit( - [ws]() - { - // transition waitset back to FREE state on exit - { - std::lock_guard lock(ws->mutex_internal); - ws->state = RMW_CONNEXT_WAITSET_FREE; - } - // Notify condition variable of state transition - ws->state_cond.notify_all(); - }); - - rmw_ret_t rc = this->attach(subs, gcs, srvs, cls, evs); - if (RMW_RET_OK != rc) { - return rc; - } - - const size_t attached_count = - this->attached_subscribers.size() + - this->attached_conditions.size() + - this->attached_clients.size() + - this->attached_services.size() + - this->attached_events.size(); - - if (attached_count > INT32_MAX) { - RMW_CONNEXT_LOG_ERROR("too many conditions attached to waitset") - return RMW_RET_ERROR; - } - - if (!DDS_ConditionSeq_ensure_length( - &this->active_conditions, - static_cast(attached_count), - static_cast(attached_count))) - { - RMW_CONNEXT_LOG_ERROR("failed to resize conditions sequence") - return RMW_RET_ERROR; - } - - DDS_Duration_t wait_duration = DDS_DURATION_INFINITE; - if (nullptr != wait_timeout) { - rc = rmw_connextdds_duration_from_ros_time(&wait_duration, wait_timeout); - if (RMW_RET_OK != rc) { - return rc; - } - } - - // transition to state BLOCKED - { - std::lock_guard lock(this->mutex_internal); - this->state = RMW_CONNEXT_WAITSET_BLOCKED; - } - // Notify condition variable of state transition - this->state_cond.notify_all(); - - const DDS_ReturnCode_t wait_rc = - DDS_WaitSet_wait(this->waitset, &this->active_conditions, &wait_duration); - - if (DDS_RETCODE_OK != wait_rc && DDS_RETCODE_TIMEOUT != wait_rc) { - RMW_CONNEXT_LOG_ERROR_A_SET("DDS wait failed: %d", wait_rc) - return RMW_RET_ERROR; - } - - // transition to state RELEASING - { - std::lock_guard lock(this->mutex_internal); - this->state = RMW_CONNEXT_WAITSET_RELEASING; - } - // Notify condition variable of state transition - this->state_cond.notify_all(); - - size_t active_conditions = 0; - rc = this->process_wait(subs, gcs, srvs, cls, evs, active_conditions); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to process wait result") - return rc; - } - - scope_exit_detach.cancel(); - - RMW_CONNEXT_ASSERT(active_conditions > 0 || DDS_RETCODE_TIMEOUT == wait_rc) - - if (DDS_RETCODE_TIMEOUT == wait_rc) { - // TODO(asorbini) This error was added to have a unit test that - // generates random errors, and expects the RMW to call RMW_SET_ERROR_MSG() - // when it returns a retcode other than "OK". Adding this log is quite - // intrusive, since `rcl` will print out an error on every wait time-out, - // which is really not an error, but a common occurrance in any ROS/DDS - // application. For this reason, the code is disabled, but kept in - // to keep track of the potential issue with that unit test (which should - // be resolved by making it accept "timeout" as a valid retcode without - // an error message). -#if 0 - rmw_reset_error(); - RMW_SET_ERROR_MSG("DDS wait timed out"); -#endif - return RMW_RET_TIMEOUT; - } - - return RMW_RET_OK; -} - - -rmw_ret_t -RMW_Connext_WaitSet::invalidate(RMW_Connext_Condition * const condition) -{ - std::unique_lock lock(this->mutex_internal); - - // Scan attached elements to see if condition is still attached. - // If the invalidated condition is not attached, then there's nothing to do, - // since the waitset is already free from potential stale references. - if (!this->is_attached(condition)) { - return RMW_RET_OK; - } - - // If the waitset is "FREE" then we can just mark it as "INVALIDATING", - // do the clean up, and release it. A wait()'ing thread will detect the - // "INVALIDATING" state and block until notified. - if (this->state == RMW_CONNEXT_WAITSET_FREE) { - this->state = RMW_CONNEXT_WAITSET_INVALIDATING; - lock.unlock(); - - rmw_ret_t rc = this->detach(); - if (RMW_RET_OK != rc) { - RMW_CONNEXT_LOG_ERROR("failed to detach conditions on invalidate") - } - - lock.lock(); - this->state = RMW_CONNEXT_WAITSET_FREE; - lock.unlock(); - this->state_cond.notify_all(); - return rc; - } - - // Waitset is currently inside a wait() call. If the state is not "ACQUIRING" - // then it means the user is trying to delete a condition while simultaneously - // waiting on it. This is an error. - if (this->state != RMW_CONNEXT_WAITSET_ACQUIRING) { - RMW_CONNEXT_LOG_ERROR_SET("cannot delete and wait on the same object") - return RMW_RET_ERROR; - } - - // Block on state_cond and wait for the next state transition, at which - // point the condition must have been detached, or we can return an error. - this->state_cond.wait(lock); - - if (this->is_attached(condition)) { - RMW_CONNEXT_LOG_ERROR_SET("deleted condition not detached") - return RMW_RET_ERROR; - } - - return RMW_RET_OK; -} - -rmw_ret_t -RMW_Connext_SubscriberStatusCondition::install() -{ - DDS_DataReaderListener listener = DDS_DataReaderListener_INITIALIZER; - DDS_StatusMask listener_mask = DDS_STATUS_MASK_NONE; - - listener.as_listener.listener_data = this; - - rmw_connextdds_configure_subscriber_condition_listener( - this, &listener, &listener_mask); - - // TODO(asorbini) only call set_listener() if actually setting something? - if (DDS_STATUS_MASK_NONE != listener_mask) { - if (DDS_RETCODE_OK != - DDS_DataReader_set_listener(this->reader, &listener, listener_mask)) - { - RMW_CONNEXT_LOG_ERROR_SET("failed to configure reader listener") - return RMW_RET_ERROR; - } - } - - return RMW_RET_OK; -} - -/****************************************************************************** - * Event wrapper - ******************************************************************************/ -RMW_Connext_StatusCondition * -RMW_Connext_Event::condition(const rmw_event_t * const event) -{ - if (RMW_Connext_Event::reader_event(event)) { - return RMW_Connext_Event::subscriber(event)->condition(); - } else { - return RMW_Connext_Event::publisher(event)->condition(); - } -} - -bool -RMW_Connext_Event::active(rmw_event_t * const event) -{ - RMW_Connext_StatusCondition * condition = nullptr; - if (RMW_Connext_Event::reader_event(event)) { - condition = RMW_Connext_Event::subscriber(event)->condition(); - } else { - condition = RMW_Connext_Event::publisher(event)->condition(); - } - return condition->has_status(event->event_type); -} - -#endif /* !RMW_CONNEXT_CPP_STD_WAITSETS */ diff --git a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp index 6c73dd77..c8e10121 100644 --- a/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp @@ -14,7 +14,6 @@ #include "rmw_connextdds/rmw_impl.hpp" -#if RMW_CONNEXT_CPP_STD_WAITSETS /****************************************************************************** * Event ******************************************************************************/ @@ -52,16 +51,6 @@ RMW_Connext_Event::disable(rmw_event_t * const event) } } -bool -RMW_Connext_Event::active(rmw_event_t * const event) -{ - if (RMW_Connext_Event::reader_event(event)) { - return RMW_Connext_Event::subscriber(event)->condition()->has_status(event->event_type); - } else { - return RMW_Connext_Event::publisher(event)->condition()->has_status(event->event_type); - } -} - /****************************************************************************** * StdWaitSet ******************************************************************************/ @@ -131,10 +120,8 @@ RMW_Connext_DataReaderListener_on_data_available( reinterpret_cast(listener_data); UNUSED_ARG(reader); - RMW_CONNEXT_LOG_DEBUG_A( - "data available: condition=%p, reader=%p", - (void *)self, (void *)reader); - self->on_data(); + + self->set_data_available(true); } void @@ -192,7 +179,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < subs->subscriber_count; ++i) { RMW_Connext_Subscriber * const sub = reinterpret_cast(subs->subscribers[i]); - if (sub->has_data()) { + if (sub->condition()->triggered_data) { return true; } } @@ -202,7 +189,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < cls->client_count; ++i) { RMW_Connext_Client * const client = reinterpret_cast(cls->clients[i]); - if (client->subscriber()->has_data()) { + if (client->subscriber()->condition()->triggered_data) { return true; } } @@ -212,7 +199,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < srvs->service_count; ++i) { RMW_Connext_Service * const svc = reinterpret_cast(srvs->services[i]); - if (svc->subscriber()->has_data()) { + if (svc->subscriber()->condition()->triggered_data) { return true; } } @@ -240,7 +227,7 @@ RMW_Connext_WaitSet::on_condition_active( for (size_t i = 0; i < gcs->guard_condition_count; ++i) { RMW_Connext_GuardCondition * const gcond = reinterpret_cast(gcs->guard_conditions[i]); - if (gcond->has_triggered()) { + if (gcond->trigger_value) { return true; } } @@ -263,11 +250,15 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < subs->subscriber_count; ++i) { RMW_Connext_Subscriber * const sub = reinterpret_cast(subs->subscribers[i]); - if (sub->has_data()) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond]() { + return cond->triggered_data; + }); + if (wait_active) { return; } - sub->condition()->attach(&this->condition_mutex, &this->condition); } } @@ -275,11 +266,15 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < cls->client_count; ++i) { RMW_Connext_Client * const client = reinterpret_cast(cls->clients[i]); - if (client->subscriber()->has_data()) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = client->subscriber()->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond]() { + return cond->triggered_data; + }); + if (wait_active) { return; } - client->subscriber()->condition()->attach(&this->condition_mutex, &this->condition); } } @@ -287,11 +282,15 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < srvs->service_count; ++i) { RMW_Connext_Service * const svc = reinterpret_cast(srvs->services[i]); - if (svc->subscriber()->has_data()) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = svc->subscriber()->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond]() { + return cond->triggered_data; + }); + if (wait_active) { return; } - svc->subscriber()->condition()->attach(&this->condition_mutex, &this->condition); } } @@ -301,18 +300,26 @@ RMW_Connext_WaitSet::attach( reinterpret_cast(evs->events[i]); if (RMW_Connext_Event::reader_event(event)) { auto sub = RMW_Connext_Event::subscriber(event); - if (sub->condition()->has_status(event->event_type)) { - wait_active = true; + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond, event]() { + return cond->has_status(event->event_type); + }); + if (wait_active) { return; } - sub->condition()->attach(&this->condition_mutex, &this->condition); } else { auto pub = RMW_Connext_Event::publisher(event); - if (pub->condition()->has_status(event->event_type)) { - wait_active = true; + RMW_Connext_PublisherStatusCondition * const cond = pub->condition(); + cond->attach( + &this->mutex_internal, &this->condition, wait_active, + [cond, event]() { + return cond->has_status(event->event_type); + }); + if (wait_active) { return; } - pub->condition()->attach(&this->condition_mutex, &this->condition); } } } @@ -321,11 +328,14 @@ RMW_Connext_WaitSet::attach( for (size_t i = 0; i < gcs->guard_condition_count; ++i) { RMW_Connext_GuardCondition * const gcond = reinterpret_cast(gcs->guard_conditions[i]); - if (gcond->has_triggered()) { - wait_active = true; + gcond->attach( + &this->mutex_internal, &this->condition, wait_active, + [gcond]() { + return gcond->trigger_value; + }); + if (wait_active) { return; } - gcond->attach(&this->condition_mutex, &this->condition); } } } @@ -343,15 +353,21 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < subs->subscriber_count; ++i) { RMW_Connext_Subscriber * const sub = reinterpret_cast(subs->subscribers[i]); - sub->condition()->detach(); - if (!sub->has_data()) { - subs->subscribers[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active subscriber: sub=%p\n", - reinterpret_cast(sub)) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + const bool data_available = sub->has_data(); + cond->detach( + [cond, subs, sub, &active_conditions, i, data_available]() { + UNUSED_ARG(sub); + cond->triggered_data = data_available; + if (!cond->triggered_data) { + subs->subscribers[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active subscriber: sub=%p", + reinterpret_cast(sub)) + active_conditions += 1; + } + }); } } @@ -359,15 +375,21 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < cls->client_count; ++i) { RMW_Connext_Client * const client = reinterpret_cast(cls->clients[i]); - client->subscriber()->condition()->detach(); - if (!client->subscriber()->has_data()) { - cls->clients[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active client: " - "client=%p", (void *)client) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = client->subscriber()->condition(); + const bool data_available = client->subscriber()->has_data(); + cond->detach( + [cond, cls, client, &active_conditions, i, data_available]() { + UNUSED_ARG(client); + cond->triggered_data = data_available; + if (!cond->triggered_data) { + cls->clients[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active client: " + "client=%p", (void *)client) + active_conditions += 1; + } + }); } } @@ -375,15 +397,21 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < srvs->service_count; ++i) { RMW_Connext_Service * const svc = reinterpret_cast(srvs->services[i]); - svc->subscriber()->condition()->detach(); - if (!svc->subscriber()->has_data()) { - srvs->services[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active service: " - "svc=%p", (void *)svc) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = svc->subscriber()->condition(); + const bool data_available = svc->subscriber()->has_data(); + cond->detach( + [cond, srvs, svc, &active_conditions, i, data_available]() { + UNUSED_ARG(svc); + cond->triggered_data = data_available; + if (!cond->triggered_data) { + srvs->services[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active service: " + "svc=%p", (void *)svc) + active_conditions += 1; + } + }); } } @@ -393,26 +421,32 @@ RMW_Connext_WaitSet::detach( reinterpret_cast(evs->events[i]); if (RMW_Connext_Event::reader_event(event)) { auto sub = RMW_Connext_Event::subscriber(event); - sub->condition()->detach(); - if (!sub->condition()->has_status(event->event_type)) { - evs->events[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active subscriber event: " - "event=%p", (void *)event) - active_conditions += 1; - } + RMW_Connext_SubscriberStatusCondition * const cond = sub->condition(); + cond->detach( + [cond, evs, event, &active_conditions, i]() { + if (!cond->has_status(event->event_type)) { + evs->events[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active subscriber event: " + "event=%p", (void *)event) + active_conditions += 1; + } + }); } else { auto pub = RMW_Connext_Event::publisher(event); - pub->condition()->detach(); - if (!pub->condition()->has_status(event->event_type)) { - evs->events[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active publisher event: " - "event=%p", (void *)event) - active_conditions += 1; - } + RMW_Connext_PublisherStatusCondition * const cond = pub->condition(); + cond->detach( + [cond, evs, event, &active_conditions, i]() { + if (!cond->has_status(event->event_type)) { + evs->events[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active pulisher event: " + "event=%p", (void *)event) + active_conditions += 1; + } + }); } } } @@ -421,15 +455,19 @@ RMW_Connext_WaitSet::detach( for (size_t i = 0; i < gcs->guard_condition_count; ++i) { RMW_Connext_GuardCondition * const gcond = reinterpret_cast(gcs->guard_conditions[i]); - gcond->detach(); - if (!gcond->trigger_check()) { - gcs->guard_conditions[i] = nullptr; - } else { - RMW_CONNEXT_LOG_DEBUG_A( - "[wait] active guard condition: " - "condition=%p", (void *)gcond) - active_conditions += 1; - } + gcond->detach( + [gcond, gcs, &active_conditions, i]() { + bool triggered = gcond->trigger_value; + gcond->trigger_value = false; + if (!triggered) { + gcs->guard_conditions[i] = nullptr; + } else { + RMW_CONNEXT_LOG_DEBUG_A( + "[wait] active guard condition: " + "condition=%p", (void *)gcond) + active_conditions += 1; + } + }); } } } @@ -443,7 +481,6 @@ RMW_Connext_WaitSet::wait( rmw_events_t * const evs, const rmw_time_t * const wait_timeout) { -#if 1 { std::lock_guard lock(this->mutex_internal); if (this->waiting) { @@ -454,22 +491,20 @@ RMW_Connext_WaitSet::wait( this->waiting = true; } - RMW_Connext_WaitSet * const ws = this; - auto scope_exit = rcpputils::make_scope_exit( - [ws]() + auto scope_exit_ws = rcpputils::make_scope_exit( + [this]() { - std::lock_guard lock(ws->mutex_internal); - ws->waiting = false; + std::lock_guard lock(this->mutex_internal); + this->waiting = false; }); -#endif bool already_active = false; - this->attach(subs, gcs, srvs, cls, evs, already_active); bool timedout = false; if (!already_active) { + std::unique_lock lock(this->mutex_internal); RMW_CONNEXT_LOG_DEBUG_A( "[wait] waiting on: " "waitset=%p, " @@ -485,15 +520,13 @@ RMW_Connext_WaitSet::wait( (nullptr != cls) ? cls->client_count : 0, (nullptr != evs) ? evs->event_count : 0); - std::unique_lock lock(this->condition_mutex); - auto on_condition_active = [self = this, subs, gcs, srvs, cls, evs]() { return self->on_condition_active(subs, gcs, srvs, cls, evs); }; - if (nullptr == wait_timeout) { + if (nullptr == wait_timeout || rmw_time_equal(*wait_timeout, RMW_DURATION_INFINITE)) { this->condition.wait(lock, on_condition_active); } else if (wait_timeout->sec > 0 || wait_timeout->nsec > 0) { auto n = std::chrono::duration_cast( @@ -508,11 +541,9 @@ RMW_Connext_WaitSet::wait( size_t active_conditions = 0; this->detach(subs, gcs, srvs, cls, evs, active_conditions); - // assert(active_conditions > 0 || timedout); + RMW_CONNEXT_ASSERT(active_conditions > 0 || timedout); if (timedout) { - rmw_reset_error(); - RMW_SET_ERROR_MSG("wait timed out"); return RMW_RET_TIMEOUT; } @@ -571,49 +602,32 @@ RMW_Connext_SubscriberStatusCondition::RMW_Connext_SubscriberStatusCondition( DDS_DomainParticipant_as_entity( DDS_Subscriber_get_participant( DDS_DataReader_get_subscriber(reader))))), - triggered_deadline(false), - triggered_liveliness(false), - triggered_qos(false), - triggered_sample_lost(false), - triggered_data(false), - _loan_guard_condition(nullptr), + loan_guard_condition(internal ? DDS_GuardCondition_new() : nullptr), + status_deadline(DDS_RequestedDeadlineMissedStatus_INITIALIZER), + status_qos(DDS_RequestedIncompatibleQosStatus_INITIALIZER), + status_liveliness(DDS_LivelinessChangedStatus_INITIALIZER), + status_sample_lost(DDS_SampleLostStatus_INITIALIZER), + status_deadline_last(DDS_RequestedDeadlineMissedStatus_INITIALIZER), + status_qos_last(DDS_RequestedIncompatibleQosStatus_INITIALIZER), + status_liveliness_last(DDS_LivelinessChangedStatus_INITIALIZER), + status_sample_lost_last(DDS_SampleLostStatus_INITIALIZER), sub(nullptr) { - if (internal) { - this->_loan_guard_condition = DDS_GuardCondition_new(); - if (nullptr == this->_loan_guard_condition) { - RMW_CONNEXT_LOG_ERROR_SET("failed to allocate internal reader condition") - throw new std::runtime_error("failed to allocate internal reader condition"); - } + if (internal && nullptr == this->loan_guard_condition) { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate internal reader condition") + throw new std::runtime_error("failed to allocate internal reader condition"); } } RMW_Connext_SubscriberStatusCondition::~RMW_Connext_SubscriberStatusCondition() { - if (nullptr != this->_loan_guard_condition) { - if (DDS_RETCODE_OK != DDS_GuardCondition_delete(this->_loan_guard_condition)) { + if (nullptr != this->loan_guard_condition) { + if (DDS_RETCODE_OK != DDS_GuardCondition_delete(this->loan_guard_condition)) { RMW_CONNEXT_LOG_ERROR_SET("failed to delete internal reader condition") } } } -void -RMW_Connext_SubscriberStatusCondition::on_data() -{ - std::lock_guard lock(this->mutex_internal); - - this->triggered_data = true; - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } - - // Update loan guard condition's trigger value for internal endpoints - if (nullptr != this->_loan_guard_condition) { - (void)this->trigger_loan_guard_condition(true); - } -} - bool RMW_Connext_SubscriberStatusCondition::has_status( const rmw_event_type_t event_type) @@ -643,57 +657,44 @@ RMW_Connext_SubscriberStatusCondition::has_status( } } - void RMW_Connext_SubscriberStatusCondition::on_requested_deadline_missed( const DDS_RequestedDeadlineMissedStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_deadline(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_deadline(status); + }, true /* notify */); } void RMW_Connext_SubscriberStatusCondition::on_requested_incompatible_qos( const DDS_RequestedIncompatibleQosStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_qos(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_qos(status); + }, true /* notify */); } void RMW_Connext_SubscriberStatusCondition::on_liveliness_changed( const DDS_LivelinessChangedStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_liveliness(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_liveliness(status); + }, true /* notify */); } void RMW_Connext_SubscriberStatusCondition::on_sample_lost( const DDS_SampleLostStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_sample_lost(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_sample_lost(status); + }, true /* notify */); } void @@ -702,6 +703,9 @@ RMW_Connext_SubscriberStatusCondition::update_status_deadline( { this->status_deadline = *status; this->triggered_deadline = true; + + this->status_deadline.total_count_change = this->status_deadline.total_count; + this->status_deadline.total_count_change -= this->status_deadline_last.total_count; } void @@ -710,6 +714,12 @@ RMW_Connext_SubscriberStatusCondition::update_status_liveliness( { this->status_liveliness = *status; this->triggered_liveliness = true; + + this->status_liveliness.alive_count_change = this->status_liveliness.alive_count; + this->status_liveliness.not_alive_count_change = this->status_liveliness.not_alive_count; + this->status_liveliness.alive_count_change -= this->status_liveliness_last.alive_count; + this->status_liveliness.not_alive_count_change -= + this->status_liveliness_last.not_alive_count; } void @@ -718,6 +728,9 @@ RMW_Connext_SubscriberStatusCondition::update_status_qos( { this->status_qos = *status; this->triggered_qos = true; + + this->status_qos.total_count_change = this->status_qos.total_count; + this->status_qos.total_count_change -= this->status_qos_last.total_count; } void @@ -726,6 +739,10 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost( { this->status_sample_lost = *status; this->triggered_sample_lost = true; + + this->status_sample_lost.total_count_change = this->status_sample_lost.total_count; + this->status_sample_lost.total_count_change -= + this->status_sample_lost_last.total_count; } rmw_ret_t @@ -764,21 +781,13 @@ RMW_Connext_PublisherStatusCondition::install( RMW_Connext_PublisherStatusCondition::RMW_Connext_PublisherStatusCondition( DDS_DataWriter * const writer) : RMW_Connext_StatusCondition(DDS_DataWriter_as_entity(writer)), - triggered_deadline(false), - triggered_liveliness(false), - triggered_qos(false) -{ - const DDS_OfferedDeadlineMissedStatus def_status_deadline = - DDS_OfferedDeadlineMissedStatus_INITIALIZER; - const DDS_OfferedIncompatibleQosStatus def_status_qos = - DDS_OfferedIncompatibleQosStatus_INITIALIZER; - const DDS_LivelinessLostStatus def_status_liveliness = - DDS_LivelinessLostStatus_INITIALIZER; - - this->status_deadline = def_status_deadline; - this->status_liveliness = def_status_liveliness; - this->status_qos = def_status_qos; -} + status_deadline(DDS_OfferedDeadlineMissedStatus_INITIALIZER), + status_qos(DDS_OfferedIncompatibleQosStatus_INITIALIZER), + status_liveliness(DDS_LivelinessLostStatus_INITIALIZER), + status_deadline_last(DDS_OfferedDeadlineMissedStatus_INITIALIZER), + status_qos_last(DDS_OfferedIncompatibleQosStatus_INITIALIZER), + status_liveliness_last(DDS_LivelinessLostStatus_INITIALIZER) +{} bool RMW_Connext_PublisherStatusCondition::has_status( @@ -808,39 +817,30 @@ void RMW_Connext_PublisherStatusCondition::on_offered_deadline_missed( const DDS_OfferedDeadlineMissedStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_deadline(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_deadline(status); + }, true /* notify */); } void RMW_Connext_PublisherStatusCondition::on_offered_incompatible_qos( const DDS_OfferedIncompatibleQosStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_qos(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_qos(status); + }, true /* notify */); } void RMW_Connext_PublisherStatusCondition::on_liveliness_lost( const DDS_LivelinessLostStatus * const status) { - std::lock_guard lock(this->mutex_internal); - - this->update_status_liveliness(status); - - if (nullptr != this->waitset_condition) { - this->waitset_condition->notify_one(); - } + update_state( + [this, status]() { + this->update_status_liveliness(status); + }, true /* notify */); } void @@ -849,6 +849,9 @@ RMW_Connext_PublisherStatusCondition::update_status_deadline( { this->status_deadline = *status; this->triggered_deadline = true; + + this->status_deadline.total_count_change = this->status_deadline.total_count; + this->status_deadline.total_count_change -= this->status_deadline_last.total_count; } void @@ -857,6 +860,9 @@ RMW_Connext_PublisherStatusCondition::update_status_liveliness( { this->status_liveliness = *status; this->triggered_liveliness = true; + + this->status_liveliness.total_count_change = this->status_liveliness.total_count; + this->status_liveliness.total_count_change -= this->status_liveliness_last.total_count; } void @@ -865,5 +871,7 @@ RMW_Connext_PublisherStatusCondition::update_status_qos( { this->status_qos = *status; this->triggered_qos = true; + + this->status_qos.total_count_change = this->status_qos.total_count; + this->status_qos.total_count_change -= this->status_qos_last.total_count; } -#endif /* !RMW_CONNEXT_CPP_STD_WAITSETS */ diff --git a/rmw_connextdds_common/src/common/rmw_info.cpp b/rmw_connextdds_common/src/common/rmw_info.cpp index 908b32c6..de6829d0 100644 --- a/rmw_connextdds_common/src/common/rmw_info.cpp +++ b/rmw_connextdds_common/src/common/rmw_info.cpp @@ -538,19 +538,6 @@ rmw_api_connextdds_get_publishers_info_by_topic( RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(publishers_info, RMW_RET_INVALID_ARGUMENT); - int validation_result = RMW_TOPIC_VALID; - rmw_ret_t ret = - rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); - if (RMW_RET_OK != ret) { - return ret; - } - if (RMW_TOPIC_VALID != validation_result) { - const char * reason = - rmw_full_topic_name_validation_result_string(validation_result); - RMW_CONNEXT_LOG_ERROR_A_SET("invalid topic name: %s", reason) - return RMW_RET_INVALID_ARGUMENT; - } - RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "allocator argument is invalid", return RMW_RET_INVALID_ARGUMENT); @@ -593,19 +580,6 @@ rmw_api_connextdds_get_subscriptions_info_by_topic( RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, RMW_RET_INVALID_ARGUMENT); RMW_CHECK_ARGUMENT_FOR_NULL(subscriptions_info, RMW_RET_INVALID_ARGUMENT); - int validation_result = RMW_TOPIC_VALID; - rmw_ret_t ret = - rmw_validate_full_topic_name(topic_name, &validation_result, nullptr); - if (RMW_RET_OK != ret) { - return ret; - } - if (RMW_TOPIC_VALID != validation_result) { - const char * reason = - rmw_full_topic_name_validation_result_string(validation_result); - RMW_CONNEXT_LOG_ERROR_A_SET("invalid topic name: %s", reason) - return RMW_RET_INVALID_ARGUMENT; - } - RCUTILS_CHECK_ALLOCATOR_WITH_MSG( allocator, "allocator argument is invalid", return RMW_RET_INVALID_ARGUMENT); diff --git a/rmw_connextdds_common/src/common/rmw_type_support.cpp b/rmw_connextdds_common/src/common/rmw_type_support.cpp index e3c14059..2f3b3a34 100644 --- a/rmw_connextdds_common/src/common/rmw_type_support.cpp +++ b/rmw_connextdds_common/src/common/rmw_type_support.cpp @@ -538,8 +538,25 @@ RMW_Connext_MessageTypeSupport::get_type_support_fastrtps( get_message_typesupport_handle( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_message_typesupport_handle( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP); + if (nullptr == type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required fastrtps message type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); + } } return type_support; } @@ -553,12 +570,28 @@ RMW_Connext_MessageTypeSupport::get_type_support_intro( get_message_typesupport_handle( type_supports, rosidl_typesupport_introspection_c__identifier); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_message_typesupport_handle( type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier); if (nullptr != type_support) { cpp_version = true; + } else { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required introspection message type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); } } else { cpp_version = false; @@ -676,12 +709,28 @@ RMW_Connext_ServiceTypeSupportWrapper::get_type_support_intro( type_supports, rosidl_typesupport_introspection_c__identifier); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_service_typesupport_handle( type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier); if (nullptr != type_support) { cpp_version = true; + } else { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required introspection service type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); } } @@ -697,8 +746,26 @@ RMW_Connext_ServiceTypeSupportWrapper::get_type_support_fastrtps( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C); if (nullptr == type_support) { + // Reset error string since this is not (yet) an error + // (see https://github.com/ros2/rosidl_typesupport/pull/102) + rcutils_error_string_t prev_error_string = rcutils_get_error_string(); + rcutils_reset_error(); + type_support = get_service_typesupport_handle( type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP); + + if (nullptr == type_support) { + rcutils_error_string_t error_string = rcutils_get_error_string(); + rcutils_reset_error(); + + RMW_CONNEXT_LOG_ERROR_A_SET( + "failed to load required fastrtps service type support. \n" + "Received these errors:\n" + "C: '%s'\n" + "CPP: '%s'", + prev_error_string.str, + error_string.str); + } } return type_support; diff --git a/rmw_connextdds_common/src/ndds/rmw_typecode.cpp b/rmw_connextdds_common/src/ndds/rmw_typecode.cpp index 5eba4143..a9f48afa 100644 --- a/rmw_connextdds_common/src/ndds/rmw_typecode.cpp +++ b/rmw_connextdds_common/src/ndds/rmw_typecode.cpp @@ -948,13 +948,18 @@ rmw_connextdds_convert_type_members( DDS_TypeCode * tc_header = nullptr; if (type_support->type_requestreply()) { + // TODO(asorbini) cache condition to a variable to avoid an "unformattable" + // long line on the `else()` (see https://github.com/ament/ament_lint/issues/158) + const bool basic_mapping = + RMW_Connext_RequestReplyMapping::Basic == type_support->ctx()->request_reply_mapping; + if (type_support->ctx()->cyclone_compatible) { tc_header = CycloneRequestHeader_get_typecode(tc_factory, tc_cache); if (nullptr == tc_header) { RMW_CONNEXT_LOG_ERROR("failed to get CycloneRequestHeader typecode") return RMW_RET_ERROR; } - } else { + } else if (basic_mapping) { switch (type_support->message_type()) { case RMW_CONNEXT_MESSAGE_REQUEST: {