Skip to content

Commit

Permalink
Avoid allocations (#211)
Browse files Browse the repository at this point in the history
* Refs #3054. Fixed alignment calculations.

* Refs #3054. Temporary FastBuffer removed on serialization.

* Refs #3055. Temporary FastBuffer removed on rmw_take and rmw_take_with_info.

* Refs #3054. Avoid allocations on rmw_serialize.
  • Loading branch information
MiguelCompany authored and dirk-thomas committed Jun 21, 2018
1 parent 3e653ec commit 1d599f2
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(const MembersType * members)
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 4));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0));
} else {
this->m_typeSize++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 4));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 0));
} else {
this->m_typeSize++;
}
Expand All @@ -69,7 +69,7 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 4));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 0));
} else {
this->m_typeSize++;
}
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
namespace rmw_fastrtps_cpp
{

// Publishers write method will receive a pointer to this struct
struct SerializedData
{
bool is_cdr_buffer; // Whether next field is a pointer to a Cdr or to a plain ros message
void * data;
};

// Helper class that uses template specialization to read/write string types to/from a
// eprosima::fastcdr::Cdr
template<typename MembersType>
Expand Down
74 changes: 56 additions & 18 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,11 @@ size_t next_field_align(
std::vector<T> & data = *reinterpret_cast<std::vector<T> *>(field);
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, padding);
current_alignment += padding;
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, item_size);
current_alignment += item_size * data.size();
auto num_elems = data.size();
if (num_elems > 0) {
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, item_size);
current_alignment += item_size * data.size();
}
}
return current_alignment;
}
Expand All @@ -437,7 +440,7 @@ size_t next_field_align<std::string>(
if (!member->is_array_) {
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, padding);
current_alignment += padding;
auto str = *static_cast<std::string *>(field);
auto & str = *static_cast<std::string *>(field);
current_alignment += str.size() + 1;
} else if (member->array_size_ && !member->is_upper_bound_) {
auto str_arr = static_cast<std::string *>(field);
Expand Down Expand Up @@ -935,7 +938,7 @@ size_t TypeSupport<MembersType>::getEstimatedSerializedSize(
size_t ret_val = 4;

if (members_->member_count_ != 0) {
ret_val += TypeSupport::getEstimatedSerializedSize(members_, ros_message, ret_val);
ret_val += TypeSupport::getEstimatedSerializedSize(members_, ros_message, 0);
} else {
ret_val += 1;
}
Expand Down Expand Up @@ -994,13 +997,28 @@ bool TypeSupport<MembersType>::serialize(
assert(data);
assert(payload);

auto ser = static_cast<eprosima::fastcdr::Cdr *>(data);
if (payload->max_size >= ser->getSerializedDataLength()) {
payload->length = static_cast<uint32_t>(ser->getSerializedDataLength());
payload->encapsulation = ser->endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
memcpy(payload->data, ser->getBufferPointer(), ser->getSerializedDataLength());
return true;
auto ser_data = static_cast<SerializedData *>(data);
if (ser_data->is_cdr_buffer) {
auto ser = static_cast<eprosima::fastcdr::Cdr *>(ser_data->data);
if (payload->max_size >= ser->getSerializedDataLength()) {
payload->length = static_cast<uint32_t>(ser->getSerializedDataLength());
payload->encapsulation = ser->endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
memcpy(payload->data, ser->getBufferPointer(), ser->getSerializedDataLength());
return true;
}
} else {
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(payload->data),
payload->max_size); // Object that manages the raw buffer.
eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR); // Object that serializes the data.
if (this->serializeROSmessage(ser_data->data, ser)) {
payload->encapsulation = ser.endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
payload->length = (uint32_t)ser.getSerializedDataLength();
return true;
}
}

return false;
Expand All @@ -1014,21 +1032,41 @@ bool TypeSupport<MembersType>::deserialize(
assert(data);
assert(payload);

auto buffer = static_cast<eprosima::fastcdr::FastBuffer *>(data);
if (!buffer->reserve(payload->length)) {
return false;
auto ser_data = static_cast<SerializedData *>(data);
if (ser_data->is_cdr_buffer) {
auto buffer = static_cast<eprosima::fastcdr::FastBuffer *>(ser_data->data);
if (!buffer->reserve(payload->length)) {
return false;
}
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;
}
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;

eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(payload->data),
payload->length);
eprosima::fastcdr::Cdr deser(
fastbuffer,
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
return deserializeROSmessage(deser, ser_data->data);
}

template<typename MembersType>
std::function<uint32_t()> TypeSupport<MembersType>::getSerializedSizeProvider(void * data)
{
assert(data);

auto ser = static_cast<eprosima::fastcdr::Cdr *>(data);
return [ser]() -> uint32_t {return static_cast<uint32_t>(ser->getSerializedDataLength());};
auto ser_data = static_cast<SerializedData *>(data);
auto ser_size = [this, ser_data]() -> uint32_t
{
if (ser_data->is_cdr_buffer) {
auto ser = static_cast<eprosima::fastcdr::Cdr *>(ser_data->data);
return static_cast<uint32_t>(ser->getSerializedDataLength());
}
return static_cast<uint32_t>(this->getEstimatedSerializedSize(ser_data->data));
};
return ser_size;
}

} // namespace rmw_fastrtps_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

class ClientListener;

Expand Down Expand Up @@ -66,7 +67,10 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(response.buffer_.get(), &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = response.buffer_.get();
if (sub->takeNextData(&data, &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
response.sample_identity_ = sinfo.related_sample_identity;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/subscriber/SampleInfo.h"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

class ServiceListener;

Expand Down Expand Up @@ -69,7 +70,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
request.buffer_ = new eprosima::fastcdr::FastBuffer();
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(request.buffer_, &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = request.buffer_;
if (sub->takeNextData(&data, &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
request.sample_identity_ = sinfo.sample_identity;

Expand Down
21 changes: 9 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rmw_fastrtps_cpp/custom_publisher_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/macros.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

#include "./ros_message_serialization.hpp"

Expand All @@ -45,17 +46,10 @@ rmw_publish(const rmw_publisher_t * publisher, const void * ros_message)
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (!_serialize_ros_message(ros_message, buffer, ser, info->type_support_,
info->typesupport_identifier_))
{
RMW_SET_ERROR_MSG("cannot serialize data");
return RMW_RET_ERROR;
}
if (!info->publisher_->write(&ser)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_message);
if (!info->publisher_->write(&data)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}
Expand Down Expand Up @@ -92,7 +86,10 @@ rmw_publish_serialized_message(
return RMW_RET_ERROR;
}

if (!info->publisher_->write(&ser)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = &ser;
if (!info->publisher_->write(&data)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}
Expand Down
27 changes: 10 additions & 17 deletions rmw_fastrtps_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "rmw_fastrtps_cpp/custom_client_info.hpp"
#include "rmw_fastrtps_cpp/custom_service_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"
#include "ros_message_serialization.hpp"

extern "C"
Expand All @@ -50,24 +51,16 @@ rmw_send_request(
auto info = static_cast<CustomClientInfo *>(client->data);
assert(info);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (_serialize_ros_message(ros_request, buffer, ser, info->request_type_support_,
info->typesupport_identifier_))
{
eprosima::fastrtps::rtps::WriteParams wparams;

if (info->request_publisher_->write(&ser, wparams)) {
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 |
wparams.sample_identity().sequence_number().low;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
}
eprosima::fastrtps::rtps::WriteParams wparams;
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_request);
if (info->request_publisher_->write(&data, wparams)) {
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 |
wparams.sample_identity().sequence_number().low;
} else {
RMW_SET_ERROR_MSG("cannot serialize data");
RMW_SET_ERROR_MSG("cannot publish data");
}

return returnedValue;
Expand Down
13 changes: 6 additions & 7 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rmw_fastrtps_cpp/custom_client_info.hpp"
#include "rmw_fastrtps_cpp/custom_service_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"
#include "ros_message_serialization.hpp"

extern "C"
Expand Down Expand Up @@ -89,12 +90,6 @@ rmw_send_response(
auto info = static_cast<CustomServiceInfo *>(service->data);
assert(info);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

_serialize_ros_message(ros_response, buffer, ser, info->response_type_support_,
info->typesupport_identifier_);
eprosima::fastrtps::rtps::WriteParams wparams;
memcpy(&wparams.related_sample_identity().writer_guid(), request_header->writer_guid,
sizeof(eprosima::fastrtps::rtps::GUID_t));
Expand All @@ -103,7 +98,11 @@ rmw_send_response(
wparams.related_sample_identity().sequence_number().low =
(int32_t)(request_header->sequence_number & 0xFFFFFFFF);

if (info->response_publisher_->write(&ser, wparams)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_response);

if (info->response_publisher_->write(&data, wparams)) {
returnedValue = RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
Expand Down
14 changes: 7 additions & 7 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ rmw_serialize(
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _serialize_ros_message(ros_message, buffer, ser, tss, ts->typesupport_identifier);
auto data_length = static_cast<size_t>(ser.getSerializedDataLength());
auto data_length = _get_serialized_size(ros_message, tss, ts->typesupport_identifier);
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("unable to dynamically resize serialized message");
return RMW_RET_ERROR;
}
}
memcpy(serialized_message->buffer, ser.getBufferPointer(), data_length);

eprosima::fastcdr::FastBuffer buffer(serialized_message->buffer, data_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _serialize_ros_message(ros_message, buffer, ser, tss, ts->typesupport_identifier);
serialized_message->buffer_length = data_length;
serialized_message->buffer_capacity = data_length;
_delete_typesupport(tss, ts->typesupport_identifier);
Expand Down
16 changes: 9 additions & 7 deletions rmw_fastrtps_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rmw_fastrtps_cpp/custom_subscriber_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/macros.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

#include "./ros_message_serialization.hpp"

Expand Down Expand Up @@ -63,17 +64,15 @@ _take(
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "custom subscriber info is null", return RMW_RET_ERROR, error_msg_allocator);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastrtps::SampleInfo_t sinfo;

if (info->subscriber_->takeNextData(&buffer, &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = ros_message;
if (info->subscriber_->takeNextData(&data, &sinfo)) {
info->listener_->data_taken();

if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
eprosima::fastcdr::Cdr deser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
_deserialize_ros_message(deser, ros_message, info->type_support_,
info->typesupport_identifier_);
if (message_info) {
_assign_message_info(message_info, &sinfo);
}
Expand Down Expand Up @@ -140,7 +139,10 @@ _take_serialized_message(
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastrtps::SampleInfo_t sinfo;

if (info->subscriber_->takeNextData(&buffer, &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = &buffer;
if (info->subscriber_->takeNextData(&data, &sinfo)) {
info->listener_->data_taken();

if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
Expand Down
Loading

0 comments on commit 1d599f2

Please sign in to comment.