diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 6d2d53456..46f8a5ba4 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -192,6 +192,7 @@ rmw_create_client( } info->writer_guid_ = info->request_publisher_->getGuid(); + info->reader_guid_ = info->response_subscriber_->getGuid(); rmw_client = rmw_client_allocate(); if (!rmw_client) { diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index e1dbe4eac..b24584a59 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -192,8 +192,9 @@ rmw_create_service( RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } + info->pub_listener_ = new ServicePubListener(); info->response_publisher_ = - Domain::createPublisher(participant, publisherParam, nullptr); + Domain::createPublisher(participant, publisherParam, info->pub_listener_); if (!info->response_publisher_) { RMW_SET_ERROR_MSG("create_service() could not create publisher"); goto fail; @@ -255,6 +256,10 @@ rmw_create_service( Domain::removePublisher(info->response_publisher_); } + if (info->pub_listener_) { + delete info->pub_listener_; + } + if (info->request_subscriber_) { rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid( eprosima_fastrtps_identifier, info->request_subscriber_->getGuid()); diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 3a815e027..3f15c2918 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -217,11 +217,12 @@ rmw_create_client( info->request_publisher_ = Domain::createPublisher(participant, publisherParam, info->pub_listener_); if (!info->request_publisher_) { - RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); + RMW_SET_ERROR_MSG("create_client() could not create publisher"); goto fail; } info->writer_guid_ = info->request_publisher_->getGuid(); + info->reader_guid_ = info->response_subscriber_->getGuid(); rmw_client = rmw_client_allocate(); if (!rmw_client) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 8f8b5648e..4427efe77 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -222,8 +222,9 @@ rmw_create_service( RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } + info->pub_listener_ = new ServicePubListener(); info->response_publisher_ = - Domain::createPublisher(participant, publisherParam, nullptr); + Domain::createPublisher(participant, publisherParam, info->pub_listener_); if (!info->response_publisher_) { RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); goto fail; @@ -285,6 +286,10 @@ rmw_create_service( Domain::removePublisher(info->response_publisher_); } + if (info->pub_listener_) { + delete info->pub_listener_; + } + if (info->request_subscriber_) { rmw_gid_t gid = rmw_fastrtps_shared_cpp::create_rmw_gid( eprosima_fastrtps_identifier, info->request_subscriber_->getGuid()); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 3a48b3c06..5082b1eef 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -49,6 +49,7 @@ typedef struct CustomClientInfo eprosima::fastrtps::Publisher * request_publisher_; ClientListener * listener_; eprosima::fastrtps::rtps::GUID_t writer_guid_; + eprosima::fastrtps::rtps::GUID_t reader_guid_; eprosima::fastrtps::Participant * participant_; const char * typesupport_identifier_; ClientPubListener * pub_listener_; @@ -88,7 +89,9 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener if (eprosima::fastrtps::rtps::ALIVE == response.sample_info_.sampleKind) { response.sample_identity_ = response.sample_info_.related_sample_identity; - if (response.sample_identity_.writer_guid() == info_->writer_guid_) { + if (response.sample_identity_.writer_guid() == info_->reader_guid_ || + response.sample_identity_.writer_guid() == info_->writer_guid_) + { std::lock_guard lock(internalMutex_); if (conditionMutex_ != nullptr) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 098bb2093..52d29e701 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "fastcdr/FastBuffer.h" @@ -32,8 +33,10 @@ #include "rcpputils/thread_safety_annotations.hpp" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" +#include "rmw_fastrtps_shared_cpp/guid_utils.hpp" class ServiceListener; +class ServicePubListener; typedef struct CustomServiceInfo { @@ -44,6 +47,7 @@ typedef struct CustomServiceInfo eprosima::fastrtps::Subscriber * request_subscriber_; eprosima::fastrtps::Publisher * response_publisher_; ServiceListener * listener_; + ServicePubListener * pub_listener_; eprosima::fastrtps::Participant * participant_; const char * typesupport_identifier_; } CustomServiceInfo; @@ -84,6 +88,12 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener if (sub->takeNextData(&data, &request.sample_info_)) { if (eprosima::fastrtps::rtps::ALIVE == request.sample_info_.sampleKind) { request.sample_identity_ = request.sample_info_.sample_identity; + // Use response subscriber guid (on related_sample_identity) when present. + const eprosima::fastrtps::rtps::GUID_t & reader_guid = + request.sample_info_.related_sample_identity.writer_guid(); + if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown() ) { + request.sample_identity_.writer_guid() = reader_guid; + } std::lock_guard lock(internalMutex_); @@ -159,4 +169,49 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); }; +class ServicePubListener : public eprosima::fastrtps::PublisherListener +{ +public: + ServicePubListener() = default; + + template + bool wait_for_subscription( + const eprosima::fastrtps::rtps::GUID_t & guid, + const std::chrono::duration & rel_time) + { + auto guid_is_present = [this, guid]() -> bool + { + return subscriptions_.find(guid) != subscriptions_.end(); + }; + + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, rel_time, guid_is_present); + } + + void onPublicationMatched( + eprosima::fastrtps::Publisher * pub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + (void) pub; + std::lock_guard lock(mutex_); + if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) { + subscriptions_.insert(matchingInfo.remoteEndpointGuid); + } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) { + subscriptions_.erase(matchingInfo.remoteEndpointGuid); + } else { + return; + } + cv_.notify_all(); + } + +private: + using subscriptions_set_t = + std::unordered_set; + + std::mutex mutex_; + subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + std::condition_variable cv_; +}; + #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/guid_utils.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/guid_utils.hpp index 6b8872fac..21cd07fee 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/guid_utils.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/guid_utils.hpp @@ -16,6 +16,7 @@ #define RMW_FASTRTPS_SHARED_CPP__GUID_UTILS_HPP_ #include +#include #include #include @@ -55,6 +56,36 @@ copy_from_fastrtps_guid_to_byte_array( memcpy(&guid_byte_array[prefix_size], &guid.entityId, guid.entityId.size); } +struct hash_fastrtps_guid +{ + std::size_t operator()(const eprosima::fastrtps::rtps::GUID_t & guid) const + { + union u_convert { + uint8_t plain_value[sizeof(guid)]; + uint32_t plain_ints[sizeof(guid) / sizeof(uint32_t)]; + } u; + + static_assert( + sizeof(guid) == 16 && + sizeof(u.plain_value) == sizeof(u.plain_ints) && + offsetof(u_convert, plain_value) == offsetof(u_convert, plain_ints), + "Plain guid should be easily convertible to uint32_t[4]"); + + copy_from_fastrtps_guid_to_byte_array(guid, u.plain_value); + + constexpr std::size_t prime_1 = 7; + constexpr std::size_t prime_2 = 31; + constexpr std::size_t prime_3 = 59; + + size_t ret_val = prime_1 * u.plain_ints[0]; + ret_val = prime_2 * (u.plain_ints[1] + ret_val); + ret_val = prime_3 * (u.plain_ints[2] + ret_val); + ret_val = u.plain_ints[3] + ret_val; + + return ret_val; + } +}; + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__GUID_UTILS_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp index d5f0408b3..be28994f6 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp @@ -57,6 +57,7 @@ __rmw_send_request( data.is_cdr_buffer = false; data.data = const_cast(ros_request); data.impl = info->request_type_support_impl_; + wparams.related_sample_identity().writer_guid() = info->reader_guid_; if (info->request_publisher_->write(&data, wparams)) { returnedValue = RMW_RET_OK; *sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 | diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp index c960c6549..a91cf7276 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp @@ -104,6 +104,28 @@ __rmw_send_response( wparams.related_sample_identity().sequence_number().low = (int32_t)(request_header->sequence_number & 0xFFFFFFFF); + // TODO(MiguelCompany) The following block is a workaround for the race on the + // discovery of services. It is (ab)using a related_sample_identity on the request + // with the GUID of the response reader, so we can wait here for it to be matched to + // the server response writer. In the future, this should be done with the mechanism + // explained on OMG DDS-RPC 1.0 spec under section 7.6.2 (Enhanced Service Mapping) + + // According to the list of possible entity kinds in section 9.3.1.2 of RTPS + // readers will have this bit on, while writers will not. We use this to know + // if the related guid is the request writer or the response reader. + constexpr uint8_t entity_id_is_reader_bit = 0x04; + const eprosima::fastrtps::rtps::GUID_t & related_guid = + wparams.related_sample_identity().writer_guid(); + if ((related_guid.entityId.value[3] & entity_id_is_reader_bit) != 0) { + // Related guid is a reader, so it is the response subscription guid. + // Wait for the response writer to be matched with it. + auto listener = info->pub_listener_; + if (!listener->wait_for_subscription(related_guid, std::chrono::milliseconds(100))) { + RMW_SET_ERROR_MSG("client will not receive response"); + return RMW_RET_ERROR; + } + } + rmw_fastrtps_shared_cpp::SerializedData data; data.is_cdr_buffer = false; data.data = const_cast(ros_response); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index f69a4f245..e0cf15450 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -97,6 +97,9 @@ __rmw_destroy_service( if (info->response_publisher_ != nullptr) { Domain::removePublisher(info->response_publisher_); } + if (info->pub_listener_ != nullptr) { + delete info->pub_listener_; + } if (info->listener_ != nullptr) { delete info->listener_; } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp index 47e49e830..8cecfb203 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp @@ -100,11 +100,22 @@ __rmw_service_server_is_available( return RMW_RET_OK; } - if (0 == client_info->request_publisher_matched_count_.load()) { + if (number_of_request_subscribers != number_of_response_publishers) { // not ready return RMW_RET_OK; } - if (0 == client_info->response_subscriber_matched_count_.load()) { + + size_t matched_request_pubs = client_info->request_publisher_matched_count_.load(); + if (0 == matched_request_pubs) { + // not ready + return RMW_RET_OK; + } + size_t matched_response_subs = client_info->response_subscriber_matched_count_.load(); + if (0 == matched_response_subs) { + // not ready + return RMW_RET_OK; + } + if (matched_request_pubs != matched_response_subs) { // not ready return RMW_RET_OK; }