From 1ed21994a62596ef0e68be606e4b95023fa02d3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez=20Moreno?= Date: Wed, 20 Jul 2022 14:15:54 +0200 Subject: [PATCH] Fix wrong usage of take_next_sample with read samples --- .../custom_client_info.hpp | 1 - .../custom_service_info.hpp | 1 - rmw_fastrtps_shared_cpp/src/rmw_request.cpp | 27 +++--- rmw_fastrtps_shared_cpp/src/rmw_response.cpp | 23 +++-- rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 90 +++++++++++++------ 5 files changed, 93 insertions(+), 49 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index b270d7359..2ae8375d4 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -72,7 +72,6 @@ typedef struct CustomClientResponse { eprosima::fastrtps::rtps::SampleIdentity sample_identity_; std::unique_ptr buffer_; - eprosima::fastdds::dds::SampleInfo sample_info_ {}; } CustomClientResponse; class ClientListener : public eprosima::fastdds::dds::DataReaderListener diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index e75c3f07b..f0d32c3a4 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -73,7 +73,6 @@ typedef struct CustomServiceRequest { eprosima::fastrtps::rtps::SampleIdentity sample_identity_; eprosima::fastcdr::FastBuffer * buffer_; - eprosima::fastdds::dds::SampleInfo sample_info_ {}; CustomServiceRequest() : buffer_(nullptr) diff --git a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp index cb8cb02c6..46aebe6f6 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_request.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_request.cpp @@ -18,6 +18,7 @@ #include "fastcdr/FastBuffer.h" #include "fastdds/rtps/common/WriteParams.h" +#include "fastdds/dds/core/StackAllocatedSequence.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -100,22 +101,24 @@ __rmw_take_request( data.is_cdr_buffer = true; data.data = request.buffer_; data.impl = nullptr; // not used when is_cdr_buffer is true - if (info->request_reader_->take_next_sample( - &data, - &request.sample_info_) == ReturnCode_t::RETCODE_OK) - { - if (request.sample_info_.valid_data) { - request.sample_identity_ = request.sample_info_.sample_identity; + + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; + + if (ReturnCode_t::RETCODE_OK == info->request_reader_->take(data_values, info_seq, 1)) { + if (info_seq[0].valid_data) { + request.sample_identity_ = info_seq[0].sample_identity; // Use response subscriber guid (on related_sample_identity) when present. const eprosima::fastrtps::rtps::GUID_t & reader_guid = - request.sample_info_.related_sample_identity.writer_guid(); + info_seq[0].related_sample_identity.writer_guid(); if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown()) { request.sample_identity_.writer_guid() = reader_guid; } // Save both guids in the clients_endpoints map const eprosima::fastrtps::rtps::GUID_t & writer_guid = - request.sample_info_.sample_identity.writer_guid(); + info_seq[0].sample_identity.writer_guid(); info->pub_listener_->endpoint_add_reader_and_writer(reader_guid, writer_guid); auto raw_type_support = dynamic_cast( @@ -132,11 +135,15 @@ __rmw_take_request( request_header->request_id.sequence_number = ((int64_t)request.sample_identity_.sequence_number().high) << 32 | request.sample_identity_.sequence_number().low; - request_header->source_timestamp = request.sample_info_.source_timestamp.to_ns(); - request_header->received_timestamp = request.sample_info_.source_timestamp.to_ns(); + request_header->source_timestamp = info_seq[0].source_timestamp.to_ns(); + request_header->received_timestamp = info_seq[0].source_timestamp.to_ns(); *taken = true; } } + + info->request_reader_->return_loan(data_values, info_seq); + data_values.length(0); + info_seq.length(0); } delete request.buffer_; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp index 371f5b640..c61b143b1 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_response.cpp @@ -17,6 +17,7 @@ #include "fastcdr/Cdr.h" #include "fastdds/rtps/common/WriteParams.h" +#include "fastdds/dds/core/StackAllocatedSequence.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -60,12 +61,14 @@ __rmw_take_response( data.is_cdr_buffer = true; data.data = response.buffer_.get(); data.impl = nullptr; // not used when is_cdr_buffer is true - if (info->response_reader_->take_next_sample( - &data, - &response.sample_info_) == ReturnCode_t::RETCODE_OK) - { - if (response.sample_info_.valid_data) { - response.sample_identity_ = response.sample_info_.related_sample_identity; + + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; + + if (ReturnCode_t::RETCODE_OK == info->response_reader_->take(data_values, info_seq, 1)) { + if (info_seq[0].valid_data) { + response.sample_identity_ = info_seq[0].related_sample_identity; if (response.sample_identity_.writer_guid() == info->reader_guid_ || response.sample_identity_.writer_guid() == info->writer_guid_) @@ -79,8 +82,8 @@ __rmw_take_response( if (raw_type_support->deserializeROSmessage( deser, ros_response, info->response_type_support_impl_)) { - request_header->source_timestamp = response.sample_info_.source_timestamp.to_ns(); - request_header->received_timestamp = response.sample_info_.reception_timestamp.to_ns(); + request_header->source_timestamp = info_seq[0].source_timestamp.to_ns(); + request_header->received_timestamp = info_seq[0].reception_timestamp.to_ns(); request_header->request_id.sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) << 32 | response.sample_identity_.sequence_number().low; @@ -89,6 +92,10 @@ __rmw_take_response( } } } + + info->response_reader_->return_loan(data_values, info_seq); + data_values.length(0); + info_seq.length(0); } return RMW_RET_OK; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index c03d46b36..48bfc1870 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -21,6 +21,7 @@ #include "rmw/rmw.h" #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/core/StackAllocatedSequence.hpp" #include "fastrtps/utils/collections/ResourceLimitedVector.hpp" @@ -83,33 +84,50 @@ _take( auto info = static_cast(subscription->data); RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR); - eprosima::fastdds::dds::SampleInfo sinfo; - rmw_fastrtps_shared_cpp::SerializedData data; - data.is_cdr_buffer = false; data.data = ros_message; data.impl = info->type_support_impl_; - while (0 < info->data_reader_->get_unread_count()) { - if (info->data_reader_->take_next_sample(&data, &sinfo) == ReturnCode_t::RETCODE_OK) { - if (subscription->options.ignore_local_publications) { - auto sample_writer_guid = - eprosima::fastrtps::rtps::iHandle2GUID(sinfo.publication_handle); + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; - if (sample_writer_guid.guidPrefix == info->data_reader_->guid().guidPrefix) { - // This is a local publication. Ignore it - continue; - } + while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(data_values, info_seq, 1)) { + class ReturnLoan + { +public: + ReturnLoan(std::function functor) + : functor_(functor) {} + + ~ReturnLoan() {functor_();} + +private: + std::function functor_; + } return_loan( + [&]() + { + info->data_reader_->return_loan(data_values, info_seq); + data_values.length(0); + info_seq.length(0); + }); + + if (subscription->options.ignore_local_publications) { + auto sample_writer_guid = + eprosima::fastrtps::rtps::iHandle2GUID(info_seq[0].publication_handle); + + if (sample_writer_guid.guidPrefix == info->data_reader_->guid().guidPrefix) { + // This is a local publication. Ignore it + continue; } + } - if (sinfo.valid_data) { - if (message_info) { - _assign_message_info(identifier, message_info, &sinfo); - } - *taken = true; - break; + if (info_seq[0].valid_data) { + if (message_info) { + _assign_message_info(identifier, message_info, &info_seq[0]); } + *taken = true; + break; } } @@ -144,14 +162,6 @@ _take_sequence( auto info = static_cast(subscription->data); RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR); - // Limit the upper bound of reads to the number unread at the beginning. - // This prevents any samples that are added after the beginning of the - // _take_sequence call from being read. - auto unread_count = info->data_reader_->get_unread_count(); - if (unread_count < count) { - count = unread_count; - } - for (size_t ii = 0; ii < count; ++ii) { taken_flag = false; ret = _take( @@ -315,8 +325,30 @@ _take_serialized_message( data.data = &buffer; data.impl = nullptr; // not used when is_cdr_buffer is true - if (info->data_reader_->take_next_sample(&data, &sinfo) == ReturnCode_t::RETCODE_OK) { - if (sinfo.valid_data) { + eprosima::fastdds::dds::StackAllocatedSequence data_values; + const_cast(data_values.buffer())[0] = &data; + eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; + + while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(data_values, info_seq, 1)) { + class ReturnLoan + { +public: + ReturnLoan(std::function functor) + : functor_(functor) {} + + ~ReturnLoan() {functor_();} + +private: + std::function functor_; + } return_loan( + [&]() + { + info->data_reader_->return_loan(data_values, info_seq); + data_values.length(0); + info_seq.length(0); + }); + + if (info_seq[0].valid_data) { auto buffer_size = static_cast(buffer.getBufferSize()); if (serialized_message->buffer_capacity < buffer_size) { auto ret = rmw_serialized_message_resize(serialized_message, buffer_size); @@ -328,7 +360,7 @@ _take_serialized_message( memcpy(serialized_message->buffer, buffer.getBuffer(), serialized_message->buffer_length); if (message_info) { - _assign_message_info(identifier, message_info, &sinfo); + _assign_message_info(identifier, message_info, &info_seq[0]); } *taken = true; }