From 6dc5e4eb32a34320da71ef8098b7572084baaf65 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Mon, 20 Dec 2021 15:25:41 -0500 Subject: [PATCH] Add in more of the necessary logic on the publish side. (#2) That is, make sure to deal with inter-process publishing properly. This requires us to introduce two copies of do_intra_process_and_return_shared(), one of which deals with the ROS Message type and the other that deals with the PublishedType. This is kind of wasteful, and we may get rid of this later on, but this works for now. Signed-off-by: Chris Lalancette Make sure to add the appropriate library directory. (#3) Signed-off-by: Chris Lalancette --- rclcpp/include/rclcpp/publisher.hpp | 106 ++++++++++++++++++++-------- rclcpp/test/rclcpp/CMakeLists.txt | 5 ++ 2 files changed, 83 insertions(+), 28 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 73d76e46dc..0f9a77e5bd 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -263,7 +263,7 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); this->do_inter_process_publish(*shared_msg); } else { this->do_intra_process_publish(std::move(msg)); @@ -319,15 +319,29 @@ class Publisher : public PublisherBase > publish(std::unique_ptr msg) { - // TODO(wjwwood): later update this to give the unique_ptr to the intra - // process manager and let it decide if it needs to be converted or not. - // For now, convert it unconditionally and pass it the ROSMessageType - // publish function specialization. - std::cout << "Publisher LALA" << std::endl; - // auto unique_ros_msg = this->create_ros_message_unique_ptr(); - // rclcpp::TypeAdapter::convert_to_ros_message(*msg, *unique_ros_msg); - //this->publish(std::move(unique_ros_msg)); - this->do_intra_process_publish(std::move(msg)); + if (!intra_process_is_enabled_) { + // If we aren't using intraprocess at all, do the type conversion + // immediately and publish interprocess. + auto unique_ros_msg = this->create_ros_message_unique_ptr(); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *unique_ros_msg); + this->do_inter_process_publish(*unique_ros_msg); + return; + } + // If an interprocess subscription exists, then the unique_ptr is promoted + // to a shared_ptr and published. + // This allows doing the intraprocess publish first and then doing the + // interprocess publish, resulting in lower publish-to-subscribe latency. + // It's not possible to do that with an unique_ptr, + // as do_intra_process_publish takes the ownership of the message. + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + // this->do_inter_process_publish(*shared_msg); + } else { + this->do_intra_process_publish(std::move(msg)); + } } /// Publish a message on the topic. @@ -348,26 +362,18 @@ class Publisher : public PublisherBase > publish(const T & msg) { - // TODO(wjwwood): later update this to give the unique_ptr to the intra - // process manager and let it decide if it needs to be converted or not. - // For now, convert it unconditionally and pass it the ROSMessageType - // publish function specialization. - - // Avoid allocating when not using intra process. + // Avoid double allocation when not using intra process. if (!intra_process_is_enabled_) { // Convert to the ROS message equivalent and publish it. ROSMessageType ros_msg; rclcpp::TypeAdapter::convert_to_ros_message(msg, ros_msg); // In this case we're not using intra process. - return this->do_inter_process_publish(ros_msg); + this->do_inter_process_publish(ros_msg); + return; } - // Otherwise we have to allocate memory in a unique_ptr, convert it, - // and pass it along. - // As the message is not const, a copy should be made. - // A shared_ptr could also be constructed here. - auto unique_ros_msg = this->create_ros_message_unique_ptr(); - rclcpp::TypeAdapter::convert_to_ros_message(msg, *unique_ros_msg); - this->publish(std::move(unique_ros_msg)); + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg); + this->publish(std::move(unique_msg)); } void @@ -492,8 +498,8 @@ class Publisher : public PublisherBase } } -void -do_intra_process_publish(std::unique_ptr msg) + void + do_intra_process_publish(std::unique_ptr msg) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -511,7 +517,12 @@ do_intra_process_publish(std::unique_ptr ms } - std::shared_ptr + template + typename + std::enable_if_t< + rosidl_generator_traits::is_message::value && + std::is_same::value, std::shared_ptr + > do_intra_process_publish_and_return_shared( std::unique_ptr msg) { @@ -531,6 +542,36 @@ do_intra_process_publish(std::unique_ptr ms published_type_allocator_); } + template + typename + std::enable_if_t< + rclcpp::TypeAdapter::is_specialized::value && + std::is_same::value, std::shared_ptr + > + do_intra_process_publish_and_return_shared( + std::unique_ptr msg) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + if (!msg) { + throw std::runtime_error("cannot publish msg which is a null pointer"); + } + + ipm->template do_intra_process_publish_and_return_shared( + intra_process_publisher_id_, + std::move(msg), + published_type_allocator_); + + auto unique_ros_msg = this->create_ros_message_unique_ptr(); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *unique_ros_msg); + + return unique_ros_msg; + } + /// Return a new unique_ptr using the ROSMessageType of the publisher. std::unique_ptr create_ros_message_unique_ptr() @@ -540,7 +581,7 @@ do_intra_process_publish(std::unique_ptr ms return std::unique_ptr(ptr, ros_message_type_deleter_); } - /// Duplicate a given ros message as a unique_ptr. + /// Duplicate a given ROS message as a unique_ptr. std::unique_ptr duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg) { @@ -549,6 +590,15 @@ do_intra_process_publish(std::unique_ptr ms return std::unique_ptr(ptr, ros_message_type_deleter_); } + /// Duplicate a given type adapted message as a unique_ptr. + std::unique_ptr + duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg) + { + auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1); + PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg); + return std::unique_ptr(ptr, published_type_deleter_); + } + /// Copy of original options passed during construction. /** * It is important to save a copy of this so that the rmw payload which it diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 4762bc5d27..4ec155e136 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -15,6 +15,11 @@ rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs # Need the target name to depend on generated interface libraries rosidl_get_typesupport_target(cpp_typesupport_target "${PROJECT_NAME}_test_msgs" "rosidl_typesupport_cpp") +set(append_library_dirs "${CMAKE_CURRENT_BINARY_DIR}") +if(WIN32) + set(append_library_dirs "${append_library_dirs}/$") +endif() + ament_add_gtest(test_subscription_publisher_with_same_type_adapter test_subscription_publisher_with_same_type_adapter.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" )