From 1d599f257bf3afd2de221e62f8c2aa757b2b079a Mon Sep 17 00:00:00 2001 From: MiguelCompany Date: Thu, 21 Jun 2018 06:57:11 +0200 Subject: [PATCH] Avoid allocations (#211) * Refs #3054. Fixed alignment calculations. * Refs #3054. Temporary FastBuffer removed on serialization. * Refs #3055. Temporary FastBuffer removed on rmw_take and rmw_take_with_info. * Refs #3054. Avoid allocations on rmw_serialize. --- .../MessageTypeSupport_impl.hpp | 2 +- .../ServiceTypeSupport_impl.hpp | 4 +- .../include/rmw_fastrtps_cpp/TypeSupport.hpp | 7 ++ .../rmw_fastrtps_cpp/TypeSupport_impl.hpp | 74 ++++++++++++++----- .../rmw_fastrtps_cpp/custom_client_info.hpp | 6 +- .../rmw_fastrtps_cpp/custom_service_info.hpp | 6 +- rmw_fastrtps_cpp/src/rmw_publish.cpp | 21 +++--- rmw_fastrtps_cpp/src/rmw_request.cpp | 27 +++---- rmw_fastrtps_cpp/src/rmw_response.cpp | 13 ++-- rmw_fastrtps_cpp/src/rmw_serialize.cpp | 14 ++-- rmw_fastrtps_cpp/src/rmw_take.cpp | 16 ++-- .../src/ros_message_serialization.cpp | 17 +++++ .../src/ros_message_serialization.hpp | 6 ++ 13 files changed, 140 insertions(+), 73 deletions(-) diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.hpp index 0582a8bf8..b629ab872 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.hpp @@ -43,7 +43,7 @@ MessageTypeSupport::MessageTypeSupport(const MembersType * members) // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { - this->m_typeSize += static_cast(this->calculateMaxSerializedSize(members, 4)); + this->m_typeSize += static_cast(this->calculateMaxSerializedSize(members, 0)); } else { this->m_typeSize++; } diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/ServiceTypeSupport_impl.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/ServiceTypeSupport_impl.hpp index 30062d160..bac9db48c 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/ServiceTypeSupport_impl.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/ServiceTypeSupport_impl.hpp @@ -47,7 +47,7 @@ RequestTypeSupport::RequestTypeSupport( // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { - this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 4)); + this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 0)); } else { this->m_typeSize++; } @@ -69,7 +69,7 @@ ResponseTypeSupport::ResponseTypeSupport // Encapsulation size this->m_typeSize = 4; if (this->members_->member_count_ != 0) { - this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 4)); + this->m_typeSize += static_cast(this->calculateMaxSerializedSize(this->members_, 0)); } else { this->m_typeSize++; } diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp index 4998cc32b..30ec97fe2 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp @@ -42,6 +42,13 @@ namespace rmw_fastrtps_cpp { +// Publishers write method will receive a pointer to this struct +struct SerializedData +{ + bool is_cdr_buffer; // Whether next field is a pointer to a Cdr or to a plain ros message + void * data; +}; + // Helper class that uses template specialization to read/write string types to/from a // eprosima::fastcdr::Cdr template diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp index 9985f5a7d..83f746e6a 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp @@ -420,8 +420,11 @@ size_t next_field_align( std::vector & data = *reinterpret_cast *>(field); current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, padding); current_alignment += padding; - current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, item_size); - current_alignment += item_size * data.size(); + auto num_elems = data.size(); + if (num_elems > 0) { + current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, item_size); + current_alignment += item_size * data.size(); + } } return current_alignment; } @@ -437,7 +440,7 @@ size_t next_field_align( if (!member->is_array_) { current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, padding); current_alignment += padding; - auto str = *static_cast(field); + auto & str = *static_cast(field); current_alignment += str.size() + 1; } else if (member->array_size_ && !member->is_upper_bound_) { auto str_arr = static_cast(field); @@ -935,7 +938,7 @@ size_t TypeSupport::getEstimatedSerializedSize( size_t ret_val = 4; if (members_->member_count_ != 0) { - ret_val += TypeSupport::getEstimatedSerializedSize(members_, ros_message, ret_val); + ret_val += TypeSupport::getEstimatedSerializedSize(members_, ros_message, 0); } else { ret_val += 1; } @@ -994,13 +997,28 @@ bool TypeSupport::serialize( assert(data); assert(payload); - auto ser = static_cast(data); - if (payload->max_size >= ser->getSerializedDataLength()) { - payload->length = static_cast(ser->getSerializedDataLength()); - payload->encapsulation = ser->endianness() == - eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; - memcpy(payload->data, ser->getBufferPointer(), ser->getSerializedDataLength()); - return true; + auto ser_data = static_cast(data); + if (ser_data->is_cdr_buffer) { + auto ser = static_cast(ser_data->data); + if (payload->max_size >= ser->getSerializedDataLength()) { + payload->length = static_cast(ser->getSerializedDataLength()); + payload->encapsulation = ser->endianness() == + eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; + memcpy(payload->data, ser->getBufferPointer(), ser->getSerializedDataLength()); + return true; + } + } else { + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(payload->data), + payload->max_size); // Object that manages the raw buffer. + eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); // Object that serializes the data. + if (this->serializeROSmessage(ser_data->data, ser)) { + payload->encapsulation = ser.endianness() == + eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE; + payload->length = (uint32_t)ser.getSerializedDataLength(); + return true; + } } return false; @@ -1014,12 +1032,24 @@ bool TypeSupport::deserialize( assert(data); assert(payload); - auto buffer = static_cast(data); - if (!buffer->reserve(payload->length)) { - return false; + auto ser_data = static_cast(data); + if (ser_data->is_cdr_buffer) { + auto buffer = static_cast(ser_data->data); + if (!buffer->reserve(payload->length)) { + return false; + } + memcpy(buffer->getBuffer(), payload->data, payload->length); + return true; } - memcpy(buffer->getBuffer(), payload->data, payload->length); - return true; + + eprosima::fastcdr::FastBuffer fastbuffer( + reinterpret_cast(payload->data), + payload->length); + eprosima::fastcdr::Cdr deser( + fastbuffer, + eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); + return deserializeROSmessage(deser, ser_data->data); } template @@ -1027,8 +1057,16 @@ std::function TypeSupport::getSerializedSizeProvider(vo { assert(data); - auto ser = static_cast(data); - return [ser]() -> uint32_t {return static_cast(ser->getSerializedDataLength());}; + auto ser_data = static_cast(data); + auto ser_size = [this, ser_data]() -> uint32_t + { + if (ser_data->is_cdr_buffer) { + auto ser = static_cast(ser_data->data); + return static_cast(ser->getSerializedDataLength()); + } + return static_cast(this->getEstimatedSerializedSize(ser_data->data)); + }; + return ser_size; } } // namespace rmw_fastrtps_cpp diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp index dbb4f3d20..1074bd058 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp @@ -27,6 +27,7 @@ #include "fastrtps/subscriber/SubscriberListener.h" #include "fastrtps/participant/Participant.h" #include "fastrtps/publisher/Publisher.h" +#include "rmw_fastrtps_cpp/TypeSupport.hpp" class ClientListener; @@ -66,7 +67,10 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener response.buffer_.reset(new eprosima::fastcdr::FastBuffer()); eprosima::fastrtps::SampleInfo_t sinfo; - if (sub->takeNextData(response.buffer_.get(), &sinfo)) { + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = true; + data.data = response.buffer_.get(); + if (sub->takeNextData(&data, &sinfo)) { if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) { response.sample_identity_ = sinfo.related_sample_identity; diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp index 0ca083c46..8423dfdd3 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp @@ -26,6 +26,7 @@ #include "fastrtps/subscriber/Subscriber.h" #include "fastrtps/subscriber/SubscriberListener.h" #include "fastrtps/subscriber/SampleInfo.h" +#include "rmw_fastrtps_cpp/TypeSupport.hpp" class ServiceListener; @@ -69,7 +70,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener request.buffer_ = new eprosima::fastcdr::FastBuffer(); eprosima::fastrtps::SampleInfo_t sinfo; - if (sub->takeNextData(request.buffer_, &sinfo)) { + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = true; + data.data = request.buffer_; + if (sub->takeNextData(&data, &sinfo)) { if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) { request.sample_identity_ = sinfo.sample_identity; diff --git a/rmw_fastrtps_cpp/src/rmw_publish.cpp b/rmw_fastrtps_cpp/src/rmw_publish.cpp index 7b334a03a..9990a395b 100644 --- a/rmw_fastrtps_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publish.cpp @@ -22,6 +22,7 @@ #include "rmw_fastrtps_cpp/custom_publisher_info.hpp" #include "rmw_fastrtps_cpp/identifier.hpp" #include "rmw_fastrtps_cpp/macros.hpp" +#include "rmw_fastrtps_cpp/TypeSupport.hpp" #include "./ros_message_serialization.hpp" @@ -45,17 +46,10 @@ rmw_publish(const rmw_publisher_t * publisher, const void * ros_message) RCUTILS_CHECK_FOR_NULL_WITH_MSG( info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator); - eprosima::fastcdr::FastBuffer buffer; - eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); - - if (!_serialize_ros_message(ros_message, buffer, ser, info->type_support_, - info->typesupport_identifier_)) - { - RMW_SET_ERROR_MSG("cannot serialize data"); - return RMW_RET_ERROR; - } - if (!info->publisher_->write(&ser)) { + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = false; + data.data = const_cast(ros_message); + if (!info->publisher_->write(&data)) { RMW_SET_ERROR_MSG("cannot publish data"); return RMW_RET_ERROR; } @@ -92,7 +86,10 @@ rmw_publish_serialized_message( return RMW_RET_ERROR; } - if (!info->publisher_->write(&ser)) { + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = true; + data.data = &ser; + if (!info->publisher_->write(&data)) { RMW_SET_ERROR_MSG("cannot publish data"); return RMW_RET_ERROR; } diff --git a/rmw_fastrtps_cpp/src/rmw_request.cpp b/rmw_fastrtps_cpp/src/rmw_request.cpp index a5a202708..faef0cf62 100644 --- a/rmw_fastrtps_cpp/src/rmw_request.cpp +++ b/rmw_fastrtps_cpp/src/rmw_request.cpp @@ -26,6 +26,7 @@ #include "rmw_fastrtps_cpp/custom_client_info.hpp" #include "rmw_fastrtps_cpp/custom_service_info.hpp" #include "rmw_fastrtps_cpp/identifier.hpp" +#include "rmw_fastrtps_cpp/TypeSupport.hpp" #include "ros_message_serialization.hpp" extern "C" @@ -50,24 +51,16 @@ rmw_send_request( auto info = static_cast(client->data); assert(info); - eprosima::fastcdr::FastBuffer buffer; - eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); - - if (_serialize_ros_message(ros_request, buffer, ser, info->request_type_support_, - info->typesupport_identifier_)) - { - eprosima::fastrtps::rtps::WriteParams wparams; - - if (info->request_publisher_->write(&ser, wparams)) { - returnedValue = RMW_RET_OK; - *sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 | - wparams.sample_identity().sequence_number().low; - } else { - RMW_SET_ERROR_MSG("cannot publish data"); - } + eprosima::fastrtps::rtps::WriteParams wparams; + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = false; + data.data = const_cast(ros_request); + if (info->request_publisher_->write(&data, wparams)) { + returnedValue = RMW_RET_OK; + *sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 | + wparams.sample_identity().sequence_number().low; } else { - RMW_SET_ERROR_MSG("cannot serialize data"); + RMW_SET_ERROR_MSG("cannot publish data"); } return returnedValue; diff --git a/rmw_fastrtps_cpp/src/rmw_response.cpp b/rmw_fastrtps_cpp/src/rmw_response.cpp index d8142f818..c84f0c401 100644 --- a/rmw_fastrtps_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_cpp/src/rmw_response.cpp @@ -24,6 +24,7 @@ #include "rmw_fastrtps_cpp/custom_client_info.hpp" #include "rmw_fastrtps_cpp/custom_service_info.hpp" #include "rmw_fastrtps_cpp/identifier.hpp" +#include "rmw_fastrtps_cpp/TypeSupport.hpp" #include "ros_message_serialization.hpp" extern "C" @@ -89,12 +90,6 @@ rmw_send_response( auto info = static_cast(service->data); assert(info); - eprosima::fastcdr::FastBuffer buffer; - eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); - - _serialize_ros_message(ros_response, buffer, ser, info->response_type_support_, - info->typesupport_identifier_); eprosima::fastrtps::rtps::WriteParams wparams; memcpy(&wparams.related_sample_identity().writer_guid(), request_header->writer_guid, sizeof(eprosima::fastrtps::rtps::GUID_t)); @@ -103,7 +98,11 @@ rmw_send_response( wparams.related_sample_identity().sequence_number().low = (int32_t)(request_header->sequence_number & 0xFFFFFFFF); - if (info->response_publisher_->write(&ser, wparams)) { + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = false; + data.data = const_cast(ros_response); + + if (info->response_publisher_->write(&data, wparams)) { returnedValue = RMW_RET_OK; } else { RMW_SET_ERROR_MSG("cannot publish data"); diff --git a/rmw_fastrtps_cpp/src/rmw_serialize.cpp b/rmw_fastrtps_cpp/src/rmw_serialize.cpp index f567ad7c1..6e9896cda 100644 --- a/rmw_fastrtps_cpp/src/rmw_serialize.cpp +++ b/rmw_fastrtps_cpp/src/rmw_serialize.cpp @@ -41,19 +41,19 @@ rmw_serialize( } auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier); - eprosima::fastcdr::FastBuffer buffer; - eprosima::fastcdr::Cdr ser( - buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); - - auto ret = _serialize_ros_message(ros_message, buffer, ser, tss, ts->typesupport_identifier); - auto data_length = static_cast(ser.getSerializedDataLength()); + auto data_length = _get_serialized_size(ros_message, tss, ts->typesupport_identifier); if (serialized_message->buffer_capacity < data_length) { if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) { RMW_SET_ERROR_MSG("unable to dynamically resize serialized message"); return RMW_RET_ERROR; } } - memcpy(serialized_message->buffer, ser.getBufferPointer(), data_length); + + eprosima::fastcdr::FastBuffer buffer(serialized_message->buffer, data_length); + eprosima::fastcdr::Cdr ser( + buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); + + auto ret = _serialize_ros_message(ros_message, buffer, ser, tss, ts->typesupport_identifier); serialized_message->buffer_length = data_length; serialized_message->buffer_capacity = data_length; _delete_typesupport(tss, ts->typesupport_identifier); diff --git a/rmw_fastrtps_cpp/src/rmw_take.cpp b/rmw_fastrtps_cpp/src/rmw_take.cpp index 782cb050f..5eb49b901 100644 --- a/rmw_fastrtps_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_cpp/src/rmw_take.cpp @@ -27,6 +27,7 @@ #include "rmw_fastrtps_cpp/custom_subscriber_info.hpp" #include "rmw_fastrtps_cpp/identifier.hpp" #include "rmw_fastrtps_cpp/macros.hpp" +#include "rmw_fastrtps_cpp/TypeSupport.hpp" #include "./ros_message_serialization.hpp" @@ -63,17 +64,15 @@ _take( RCUTILS_CHECK_FOR_NULL_WITH_MSG( info, "custom subscriber info is null", return RMW_RET_ERROR, error_msg_allocator); - eprosima::fastcdr::FastBuffer buffer; eprosima::fastrtps::SampleInfo_t sinfo; - if (info->subscriber_->takeNextData(&buffer, &sinfo)) { + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = false; + data.data = ros_message; + if (info->subscriber_->takeNextData(&data, &sinfo)) { info->listener_->data_taken(); if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) { - eprosima::fastcdr::Cdr deser( - buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); - _deserialize_ros_message(deser, ros_message, info->type_support_, - info->typesupport_identifier_); if (message_info) { _assign_message_info(message_info, &sinfo); } @@ -140,7 +139,10 @@ _take_serialized_message( eprosima::fastcdr::FastBuffer buffer; eprosima::fastrtps::SampleInfo_t sinfo; - if (info->subscriber_->takeNextData(&buffer, &sinfo)) { + rmw_fastrtps_cpp::SerializedData data; + data.is_cdr_buffer = true; + data.data = &buffer; + if (info->subscriber_->takeNextData(&data, &sinfo)) { info->listener_->data_taken(); if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) { diff --git a/rmw_fastrtps_cpp/src/ros_message_serialization.cpp b/rmw_fastrtps_cpp/src/ros_message_serialization.cpp index f12bc0a05..74c5d023d 100644 --- a/rmw_fastrtps_cpp/src/ros_message_serialization.cpp +++ b/rmw_fastrtps_cpp/src/ros_message_serialization.cpp @@ -17,6 +17,23 @@ #include "./type_support_common.hpp" +size_t +_get_serialized_size( + const void * ros_message, + void * untyped_typesupport, + const char * typesupport_identifier) +{ + if (using_introspection_c_typesupport(typesupport_identifier)) { + auto typed_typesupport = static_cast(untyped_typesupport); + return typed_typesupport->getEstimatedSerializedSize(ros_message); + } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { + auto typed_typesupport = static_cast(untyped_typesupport); + return typed_typesupport->getEstimatedSerializedSize(ros_message); + } + RMW_SET_ERROR_MSG("Unknown typesupport identifier"); + return 0; +} + bool _serialize_ros_message( const void * ros_message, diff --git a/rmw_fastrtps_cpp/src/ros_message_serialization.hpp b/rmw_fastrtps_cpp/src/ros_message_serialization.hpp index fb1a75976..f39692863 100644 --- a/rmw_fastrtps_cpp/src/ros_message_serialization.hpp +++ b/rmw_fastrtps_cpp/src/ros_message_serialization.hpp @@ -24,6 +24,12 @@ class FastBuffer; } // namespace fastcdr } // namespace eprosima +size_t +_get_serialized_size( + const void * ros_message, + void * untyped_typesupport, + const char * typesupport_identifier); + bool _serialize_ros_message( const void * ros_message,