From 3d3c9bd281a15035717c24bc292c3b801153ca7d Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Wed, 8 Dec 2021 13:20:10 -0500 Subject: [PATCH] Push the publish conversion down to the intra-process manager (#6) * Remove template defaults that we never use. Signed-off-by: Chris Lalancette * Rename MessageT to PublishedType in the IntraProcessManager. It makes it easier to determine which type we are talking about. Signed-off-by: Chris Lalancette * Get all the types setup to push the conversion down to IPM. There really is no functional change here, just preparing to move the conversion down to the intra-process manager. Signed-off-by: Chris Lalancette * Move conversion down to the intra-process manager. Signed-off-by: Chris Lalancette --- .../experimental/intra_process_manager.hpp | 145 ++++++++++++++---- rclcpp/include/rclcpp/publisher.hpp | 47 +----- 2 files changed, 125 insertions(+), 67 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 1475a19183..b4b5a30b6b 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -173,19 +173,20 @@ class IntraProcessManager * \param message the message that is being stored. */ template< - typename MessageT, - typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename PublishedType, + typename Alloc, + typename Deleter = std::default_delete + > void do_intra_process_publish( uint64_t intra_process_publisher_id, - std::unique_ptr message, - typename allocator::AllocRebind::allocator_type & allocator) + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator) { std::cout << "do_intra_process_publish --- " << std::endl; - using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; std::shared_lock lock(mutex_); @@ -202,9 +203,9 @@ class IntraProcessManager if (sub_ids.take_ownership_subscriptions.empty()) { // None of the buffers require ownership, so we promote the pointer - std::shared_ptr msg = std::move(message); + std::shared_ptr msg = std::move(message); - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( msg, sub_ids.take_shared_subscriptions); } else { if (sub_ids.take_shared_subscriptions.size() <= 1) { @@ -222,18 +223,18 @@ class IntraProcessManager sub_ids.take_ownership_subscriptions.begin(), sub_ids.take_ownership_subscriptions.end()); - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), concatenated_vector, allocator); } else { // Construct a new shared pointer from the message // for the buffers that do not require ownership - auto shared_msg = std::allocate_shared(allocator, *message); + auto shared_msg = std::allocate_shared(allocator, *message); - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); } } @@ -241,15 +242,31 @@ class IntraProcessManager template< typename MessageT, - typename Alloc = std::allocator, - typename Deleter = std::default_delete> - std::shared_ptr + typename T, + typename PublishedType, + typename ROSMessageType, + typename Alloc, + typename ROSMessageTypeAllocatorTraits, + typename ROSMessageTypeAllocator, + typename ROSMessageTypeDeleter, + typename Deleter = std::default_delete + > + typename + std::enable_if_t< + rosidl_generator_traits::is_message::value && + std::is_same::value, std::shared_ptr + > do_intra_process_publish_and_return_shared( uint64_t intra_process_publisher_id, - std::unique_ptr message, - typename allocator::AllocRebind::allocator_type & allocator) + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + ROSMessageTypeAllocator & ros_message_type_allocator, + ROSMessageTypeDeleter & ros_message_type_deleter) { - using MessageAllocTraits = allocator::AllocRebind; + (void)ros_message_type_allocator; + (void)ros_message_type_deleter; + + using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; std::shared_lock lock(mutex_); @@ -266,24 +283,24 @@ class IntraProcessManager if (sub_ids.take_ownership_subscriptions.empty()) { // If there are no owning, just convert to shared. - std::shared_ptr shared_msg = std::move(message); + std::shared_ptr shared_msg = std::move(message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } return shared_msg; } else { // Construct a new shared pointer from the message for the buffers that // do not require ownership and to return. - auto shared_msg = std::allocate_shared(allocator, *message); + auto shared_msg = std::allocate_shared(allocator, *message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); @@ -292,6 +309,80 @@ class IntraProcessManager } } + template< + typename MessageT, + typename T, + typename PublishedType, + typename ROSMessageType, + typename Alloc, + typename ROSMessageTypeAllocatorTraits, + typename ROSMessageTypeAllocator, + typename ROSMessageTypeDeleter, + typename Deleter = std::default_delete + > + typename + std::enable_if_t< + rclcpp::TypeAdapter::is_specialized::value && + std::is_same::value, std::shared_ptr + > + do_intra_process_publish_and_return_shared( + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + ROSMessageTypeAllocator & ros_message_type_allocator, + ROSMessageTypeDeleter & ros_message_type_deleter) + { + auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1); + ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr); + auto unique_ros_msg = std::unique_ptr(ptr, ros_message_type_deleter); + rclcpp::TypeAdapter::convert_to_ros_message(*message, *unique_ros_msg); + + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + + std::shared_lock lock(mutex_); + + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end()) { + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling do_intra_process_publish for invalid or no longer existing publisher id"); + return nullptr; + } + const auto & sub_ids = publisher_it->second; + + if (sub_ids.take_ownership_subscriptions.empty()) { + // If there are no owning, just convert to shared. + std::shared_ptr shared_msg = std::move(message); + if (!sub_ids.take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, sub_ids.take_shared_subscriptions); + } + } else { + // Construct a new shared pointer from the message for the buffers that + // do not require ownership and to return. + auto shared_msg = std::allocate_shared(allocator, *message); + + if (!sub_ids.take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, + sub_ids.take_shared_subscriptions); + } + + this->template add_owned_msg_to_buffers( + std::move(message), + sub_ids.take_ownership_subscriptions, + allocator); + } + + // TODO(clalancette): depending on how the publisher and subscriber are setup, we may end + // up doing a conversion more than once; in this function and down in + // add_{shared,owned}_msg_to_buffers(). We should probably push this down further to avoid + // that double conversion. + return unique_ros_msg; + } + /// Return true if the given rmw_gid_t matches any stored Publishers. RCLCPP_PUBLIC bool @@ -340,7 +431,8 @@ class IntraProcessManager template< typename MessageT, typename Alloc, - typename Deleter> + typename Deleter + > void add_shared_msg_to_buffers( std::shared_ptr message, @@ -394,8 +486,9 @@ class IntraProcessManager template< typename MessageT, - typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Alloc, + typename Deleter + > void add_owned_msg_to_buffers( std::unique_ptr message, diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 63c26fa12d..1ee3f32241 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -520,11 +520,7 @@ class Publisher : public PublisherBase template - typename - std::enable_if_t< - rosidl_generator_traits::is_message::value && - std::is_same::value, std::shared_ptr - > + std::shared_ptr do_intra_process_publish_and_return_shared( std::unique_ptr msg) { @@ -537,44 +533,13 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - return ipm->template do_intra_process_publish_and_return_shared( + return ipm->template do_intra_process_publish_and_return_shared( intra_process_publisher_id_, std::move(msg), - published_type_allocator_); - } - - template - typename - std::enable_if_t< - rclcpp::TypeAdapter::is_specialized::value && - std::is_same::value, std::shared_ptr - > - do_intra_process_publish_and_return_shared( - std::unique_ptr msg) - { - auto ipm = weak_ipm_.lock(); - if (!ipm) { - throw std::runtime_error( - "intra process publish called after destruction of intra process manager"); - } - if (!msg) { - throw std::runtime_error("cannot publish msg which is a null pointer"); - } - - ipm->template do_intra_process_publish_and_return_shared( - intra_process_publisher_id_, - std::move(msg), - published_type_allocator_); - - // TODO(clalancette): We are doing the conversion at least twice; inside of - // IntraProcessManager::do_intra_process_publish_and_return_shared(), and here. - // We should just do it in the IntraProcessManager and return it here. - auto unique_ros_msg = this->create_ros_message_unique_ptr(); - rclcpp::TypeAdapter::convert_to_ros_message(*msg, *unique_ros_msg); - - return unique_ros_msg; + published_type_allocator_, + ros_message_type_allocator_, + ros_message_type_deleter_); } /// Return a new unique_ptr using the ROSMessageType of the publisher.