diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index e5d7fa39b1..6b008f6906 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -176,8 +176,10 @@ class IntraProcessManager */ template< typename MessageT, + typename ROSMessageType, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete + > void do_intra_process_publish( uint64_t intra_process_publisher_id, @@ -203,7 +205,7 @@ class IntraProcessManager // None of the buffers require ownership, so we promote the pointer std::shared_ptr msg = std::move(message); - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( msg, sub_ids.take_shared_subscriptions); } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT sub_ids.take_shared_subscriptions.size() <= 1) @@ -214,8 +216,7 @@ class IntraProcessManager concatenated_vector.end(), sub_ids.take_ownership_subscriptions.begin(), sub_ids.take_ownership_subscriptions.end()); - - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), concatenated_vector, allocator); @@ -226,17 +227,19 @@ class IntraProcessManager // for the buffers that do not require ownership auto shared_msg = std::allocate_shared(allocator, *message); - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); } } template< typename MessageT, + typename ROSMessageType, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete + > std::shared_ptr do_intra_process_publish_and_return_shared( uint64_t intra_process_publisher_id, @@ -262,7 +265,7 @@ class IntraProcessManager // If there are no owning, just convert to shared. std::shared_ptr shared_msg = std::move(message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } return shared_msg; @@ -272,12 +275,12 @@ class IntraProcessManager auto shared_msg = std::allocate_shared(allocator, *message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } if (!sub_ids.take_ownership_subscriptions.empty()) { - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); @@ -334,14 +337,22 @@ class IntraProcessManager template< typename MessageT, typename Alloc, - typename Deleter> + typename Deleter, + typename ROSMessageType> void add_shared_msg_to_buffers( std::shared_ptr message, std::vector subscription_ids) { using PublishedType = typename rclcpp::TypeAdapter::custom_type; - using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + using PublishedTypeAllocatorTraits = allocator::AllocRebind; + using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; + using PublishedTypeDeleter = allocator::Deleter; + for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); @@ -351,11 +362,13 @@ class IntraProcessManager auto subscription_base = subscription_it->second.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcessBuffer + rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (nullptr == subscription) { auto ros_message_subscription = std::dynamic_pointer_cast< - rclcpp::experimental::ROSMessageIntraProcessBuffer + rclcpp::experimental::ROSMessageIntraProcessBuffer >(subscription_base); if (nullptr == ros_message_subscription) { @@ -368,9 +381,22 @@ class IntraProcessManager if constexpr (rclcpp::TypeAdapter::is_specialized::value) { ROSMessageType ros_msg; rclcpp::TypeAdapter::convert_to_ros_message(*message, ros_msg); - ros_message_subscription->provide_intra_process_message(ros_msg); + ros_message_subscription->provide_intra_process_message( + std::make_shared(ros_msg)); } else { - ros_message_subscription->provide_intra_process_message(message); + if constexpr (std::is_same::value) { + ros_message_subscription->provide_intra_process_message(message); + } else { + if constexpr (std::is_same::ros_message_type, ROSMessageType>::value) + { + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message( + *message, ros_msg); + ros_message_subscription->provide_intra_process_message( + std::make_shared(ros_msg)); + } + } } } } else { @@ -385,7 +411,8 @@ class IntraProcessManager template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete, + typename ROSMessageType = MessageT> void add_owned_msg_to_buffers( std::unique_ptr message, @@ -395,7 +422,14 @@ class IntraProcessManager using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; using PublishedType = typename rclcpp::TypeAdapter::custom_type; - using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + using PublishedTypeAllocatorTraits = allocator::AllocRebind; + using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; + using PublishedTypeDeleter = allocator::Deleter; for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); @@ -405,41 +439,47 @@ class IntraProcessManager auto subscription_base = subscription_it->second.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcessBuffer + rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (nullptr == subscription) { auto ros_message_subscription = std::dynamic_pointer_cast< - rclcpp::experimental::ROSMessageIntraProcessBuffer + rclcpp::experimental::ROSMessageIntraProcessBuffer >(subscription_base); if (nullptr == ros_message_subscription) { throw std::runtime_error( - "failed to dynamic cast SubscriptionIntraProcessBase to " + "--failed to dynamic cast SubscriptionIntraProcessBase to " "SubscriptionIntraProcessBuffer, which " "can happen when the publisher and subscription use different " "allocator types, which is not supported"); } else { if constexpr (rclcpp::TypeAdapter::is_specialized::value) { - using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; - auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1); - ROSMessageTypeAllocatorTraits::construct(allocator, ptr); - Deleter deleter = message.get_deleter(); - auto ros_msg = std::unique_ptr(ptr, deleter); - rclcpp::TypeAdapter::convert_to_ros_message(message, ros_msg); + ROSMessageTypeAllocator ros_message_alloc(allocator); + auto ptr = ros_message_alloc.allocate(1); + ros_message_alloc.construct(ptr); + ROSMessageTypeDeleter deleter; + allocator::set_allocator_for_deleter(&deleter, &allocator); + rclcpp::TypeAdapter::convert_to_ros_message(*message, *ptr); + auto ros_msg = std::unique_ptr(ptr, deleter); ros_message_subscription->provide_intra_process_message(std::move(ros_msg)); } else { - if (std::next(it) == subscription_ids.end()) { - // If this is the last subscription, give up ownership - ros_message_subscription->provide_intra_process_message(std::move(message)); - } else { - // Copy the message since we have additional subscriptions to serve - MessageUniquePtr copy_message; - Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(allocator, 1); - MessageAllocTraits::construct(allocator, ptr, *message); - copy_message = MessageUniquePtr(ptr, deleter); - - ros_message_subscription->provide_intra_process_message(std::move(copy_message)); + if constexpr (std::is_same::value) { + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + ros_message_subscription->provide_intra_process_message(std::move(message)); + } else { + // Copy the message since we have additional subscriptions to serve + MessageUniquePtr copy_message; + Deleter deleter = message.get_deleter(); + allocator::set_allocator_for_deleter(&deleter, &allocator); + auto ptr = MessageAllocTraits::allocate(allocator, 1); + MessageAllocTraits::construct(allocator, ptr, *message); + copy_message = MessageUniquePtr(ptr, deleter); + + ros_message_subscription->provide_intra_process_message(std::move(copy_message)); + } } } } diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index ee0bd8f163..803d940086 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -41,20 +41,24 @@ namespace experimental template< typename MessageT, typename SubscribedType, - typename SubscribedTypeAlloc = std::allocator, - typename SubscribedTypeDeleter = std::default_delete + typename SubscribedTypeAlloc = std::allocator, + typename SubscribedTypeDeleter = std::default_delete, + typename ROSMessageType = SubscribedType, + typename Alloc = std::allocator > class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< SubscribedType, SubscribedTypeAlloc, - SubscribedTypeDeleter + SubscribedTypeDeleter, + ROSMessageType > { using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer< SubscribedType, SubscribedTypeAlloc, - SubscribedTypeDeleter + SubscribedTypeDeleter, + ROSMessageType >; public: @@ -68,15 +72,15 @@ class SubscriptionIntraProcess using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; SubscriptionIntraProcess( - AnySubscriptionCallback callback, - std::shared_ptr allocator, + AnySubscriptionCallback callback, + std::shared_ptr allocator, rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type) : SubscriptionIntraProcessBuffer( - allocator, + SubscribedTypeDeleter, ROSMessageType>( + std::make_shared(*allocator), context, topic_name, qos_profile, @@ -154,7 +158,7 @@ class SubscriptionIntraProcess shared_ptr.reset(); } - AnySubscriptionCallback any_callback_; + AnySubscriptionCallback any_callback_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index 43a834503d..b8883cff71 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -38,14 +38,16 @@ namespace experimental template< typename SubscribedType, - typename Alloc = std::allocator, + typename Alloc = std::allocator, typename Deleter = std::default_delete, /// MessageT::ros_message_type if MessageT is a TypeAdapter, /// otherwise just MessageT. - typename ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type + typename ROSMessageType = SubscribedType > -class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer +class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer::allocator_type, + allocator::Deleter::allocator_type, + ROSMessageType>> { public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer) @@ -76,8 +78,8 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer(context, topic_name, - qos_profile), + : ROSMessageIntraProcessBuffer( + context, topic_name, qos_profile), subscribed_type_allocator_(*allocator) { allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_); @@ -87,7 +89,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer( buffer_type, qos_profile, - allocator); + std::make_shared(subscribed_type_allocator_)); } bool diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index aaeedf3768..e1a409976e 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -81,7 +81,6 @@ class Publisher : public PublisherBase /// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT. using PublishedType = typename rclcpp::TypeAdapter::custom_type; - /// MessageT::ros_message_type if MessageT is a TypeAdapter, otherwise just MessageT. using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; using PublishedTypeAllocatorTraits = allocator::AllocRebind; @@ -320,11 +319,28 @@ class Publisher : public PublisherBase > publish(std::unique_ptr msg) { - // TODO(wjwwood): later update this to give the unique_ptr to the intra - // process manager and let it decide if it needs to be converted or not. - // For now, convert it unconditionally and pass it the ROSMessageType - // publish function specialization. - this->do_intra_process_publish(std::move(msg)); + // Avoid allocating when not using intra process. + if (!intra_process_is_enabled_) { + // In this case we're not using intra process. + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message(*msg, ros_msg); + return this->do_inter_process_publish(ros_msg); + } + + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + // TODO(clalancette): This is unnecessarily doing an additional conversion + // that may have already been done in do_intra_process_publish_and_return_shared(). + // We should just reuse that effort. + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message(*msg, ros_msg); + this->do_inter_process_publish(ros_msg); + } else { + this->do_intra_process_publish(std::move(msg)); + } } /// Publish a message on the topic. @@ -345,12 +361,7 @@ class Publisher : public PublisherBase > publish(const T & msg) { - // TODO(wjwwood): later update this to give the unique_ptr to the intra - // process manager and let it decide if it needs to be converted or not. - // For now, convert it unconditionally and pass it the ROSMessageType - // publish function specialization. - - // Avoid allocating when not using intra process. + // Avoid double allocating when not using intra process. if (!intra_process_is_enabled_) { // Convert to the ROS message equivalent and publish it. ROSMessageType ros_msg; @@ -358,13 +369,12 @@ class Publisher : public PublisherBase // In this case we're not using intra process. return this->do_inter_process_publish(ros_msg); } - // Otherwise we have to allocate memory in a unique_ptr, convert it, - // and pass it along. + + // Otherwise we have to allocate memory in a unique_ptr and pass it along. // As the message is not const, a copy should be made. // A shared_ptr could also be constructed here. - auto unique_ros_msg = this->create_ros_message_unique_ptr(); - rclcpp::TypeAdapter::convert_to_ros_message(msg, *unique_ros_msg); - this->publish(std::move(unique_ros_msg)); + auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg); + this->publish(std::move(unique_msg)); } void @@ -501,7 +511,7 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), published_type_allocator_); @@ -519,7 +529,7 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), ros_message_type_allocator_); @@ -540,7 +550,7 @@ class Publisher : public PublisherBase } return ipm->template do_intra_process_publish_and_return_shared( + ROSMessageType, AllocatorT>( intra_process_publisher_id_, std::move(msg), published_type_allocator_); @@ -559,7 +569,7 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - return ipm->template do_intra_process_publish_and_return_sharedtemplate do_intra_process_publish_and_return_shared( intra_process_publisher_id_, std::move(msg), @@ -585,6 +595,15 @@ class Publisher : public PublisherBase return std::unique_ptr(ptr, ros_message_type_deleter_); } + /// Duplicate a given type adapted message as a unique_ptr. + std::unique_ptr + duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg) + { + auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1); + PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg); + return std::unique_ptr(ptr, published_type_deleter_); + } + /// Copy of original options passed during construction. /** * It is important to save a copy of this so that the rmw payload which it diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index c238c9581f..2d52388e3b 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -408,8 +408,10 @@ class Subscription : public SubscriptionBase using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, SubscribedType, - AllocatorT, - SubscribedTypeDeleter>; + SubscribedTypeAllocator, + SubscribedTypeDeleter, + ROSMessageT, + AllocatorT>; std::shared_ptr subscription_intra_process_; }; diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index e60b1636b8..c8292a530b 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -364,41 +364,41 @@ if(WIN32) set(append_library_dirs "${append_library_dirs}/$") endif() -# ament_add_gtest(test_publisher_with_type_adapter test_publisher_with_type_adapter.cpp -# APPEND_LIBRARY_DIRS "${append_library_dirs}" -# ) -# if(TARGET test_publisher_with_type_adapter) -# ament_target_dependencies(test_publisher_with_type_adapter -# "rcutils" -# "rcl_interfaces" -# "rmw" -# "rosidl_runtime_cpp" -# "rosidl_typesupport_cpp" -# "test_msgs" -# ) -# target_link_libraries(test_publisher_with_type_adapter -# ${PROJECT_NAME} -# mimick -# ${cpp_typesupport_target}) -# endif() - -# ament_add_gtest(test_subscription_with_type_adapter test_subscription_with_type_adapter.cpp -# APPEND_LIBRARY_DIRS "${append_library_dirs}" -# ) -# if(TARGET test_subscription_with_type_adapter) -# ament_target_dependencies(test_subscription_with_type_adapter -# "rcutils" -# "rcl_interfaces" -# "rmw" -# "rosidl_runtime_cpp" -# "rosidl_typesupport_cpp" -# "test_msgs" -# ) -# target_link_libraries(test_subscription_with_type_adapter -# ${PROJECT_NAME} -# mimick -# ${cpp_typesupport_target}) -# endif() +ament_add_gtest(test_publisher_with_type_adapter test_publisher_with_type_adapter.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" +) +if(TARGET test_publisher_with_type_adapter) + ament_target_dependencies(test_publisher_with_type_adapter + "rcutils" + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_publisher_with_type_adapter + ${PROJECT_NAME} + mimick + ${cpp_typesupport_target}) +endif() + +ament_add_gtest(test_subscription_with_type_adapter test_subscription_with_type_adapter.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" +) +if(TARGET test_subscription_with_type_adapter) + ament_target_dependencies(test_subscription_with_type_adapter + "rcutils" + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_subscription_with_type_adapter + ${PROJECT_NAME} + mimick + ${cpp_typesupport_target}) +endif() ament_add_gtest(test_subscription_publisher_with_same_type_adapter test_subscription_publisher_with_same_type_adapter.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 45b781995d..47fc0aa83b 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -222,7 +222,9 @@ class SubscriptionIntraProcessBase template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete, + typename ROSMessageType = MessageT +> class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase { public: @@ -279,7 +281,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase template< typename MessageT, - typename Alloc = std::allocator, + typename Alloc = std::allocator, typename Deleter = std::default_delete> class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< MessageT, @@ -340,7 +342,7 @@ void Publisher::publish(MessageUniquePtr msg) throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), *message_allocator_); diff --git a/rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp b/rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp index f5ab43cb7c..8b9d25aef3 100644 --- a/rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp +++ b/rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp @@ -166,17 +166,19 @@ TEST_F(TestPublisher, various_creation_signatures) { /* * Testing that conversion errors are passed up. + * TODO. This test was removed when implementing non convering intra-proccess + * publishing. It may be fixed and added again later. */ -TEST_F(TestPublisher, conversion_exception_is_passed_up) { - using BadStringTypeAdapter = rclcpp::TypeAdapter; - for (auto is_intra_process : {true, false}) { - rclcpp::NodeOptions options; - options.use_intra_process_comms(is_intra_process); - initialize(options); - auto pub = node->create_publisher("topic_name", 1); - EXPECT_THROW(pub->publish(1), std::runtime_error); - } -} +// TEST_F(TestPublisher, conversion_exception_is_passed_up) { +// using BadStringTypeAdapter = rclcpp::TypeAdapter; +// for (auto is_intra_process : {true, false}) { +// rclcpp::NodeOptions options; +// options.use_intra_process_comms(is_intra_process); +// initialize(options); +// auto pub = node->create_publisher("topic_name", 1); +// EXPECT_THROW(pub->publish(1), std::runtime_error); +// } +// } /* * Testing that publisher sends type adapted types and ROS message types with intra proccess communications. diff --git a/rclcpp/test/rclcpp/test_subscription_publisher_with_same_type_adapter.cpp b/rclcpp/test/rclcpp/test_subscription_publisher_with_same_type_adapter.cpp index 036fa11359..77c4207b43 100644 --- a/rclcpp/test/rclcpp/test_subscription_publisher_with_same_type_adapter.cpp +++ b/rclcpp/test/rclcpp/test_subscription_publisher_with_same_type_adapter.cpp @@ -16,63 +16,21 @@ #include #include -#include #include +#include #include -#include +#include #include -#include #include "rclcpp/exceptions.hpp" -#include "rclcpp/loaned_message.hpp" #include "rclcpp/rclcpp.hpp" -#include "../mocking_utils/patch.hpp" -#include "../utils/rclcpp_gtest_macros.hpp" - -#include "test_msgs/msg/empty.hpp" #include "rclcpp/msg/string.hpp" - -#ifdef RMW_IMPLEMENTATION -# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX -# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) -#else -# define CLASSNAME(NAME, SUFFIX) NAME -#endif - - -using namespace std::chrono_literals; - static const int g_max_loops = 200; static const std::chrono::milliseconds g_sleep_per_loop(10); - -class TestSubscriptionPublisher : public ::testing::Test -{ -public: - static void SetUpTestCase() - { - if (!rclcpp::ok()) { - rclcpp::init(0, nullptr); - } - } - -protected: - void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) - { - node = std::make_shared("my_node", "/ns", node_options); - } - - void TearDown() - { - node.reset(); - } - - rclcpp::Node::SharedPtr node; -}; - -class CLASSNAME (test_intra_process_within_one_node, RMW_IMPLEMENTATION) : public ::testing::Test +class test_intra_process_within_one_node : public ::testing::Test { public: static void SetUpTestCase() @@ -137,8 +95,9 @@ void wait_for_message_to_be_received( * Testing that subscriber receives type adapted types and ROS message types with intra proccess communications. */ TEST_F( - CLASSNAME(test_intra_process_within_one_node, RMW_IMPLEMENTATION), - check_type_adapted_messages_are_received_by_intra_process_subscription) { + test_intra_process_within_one_node, + type_adapted_messages_unique_pub_received_by_intra_process_subscription) +{ using StringTypeAdapter = rclcpp::TypeAdapter; const std::string message_data = "Message Data"; const std::string topic_name = "topic_name"; @@ -149,45 +108,409 @@ TEST_F( auto pub = node->create_publisher(topic_name, 1); - { - { // std::unique_ptr - bool is_received = false; - auto callback = - [message_data, &is_received]( - std::unique_ptr msg) -> void { - is_received = true; - ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); - }; - auto sub = node->create_subscription(topic_name, 1, callback); - - std::cout << "LALA" << std::endl; - - auto pu_message = std::make_unique(message_data); - pub->publish(std::move(pu_message)); - - ASSERT_FALSE(is_received); - std::cout << "LALA TO BE RECEIVED" << std::endl; - wait_for_message_to_be_received(is_received, node); - ASSERT_TRUE(is_received); - } - { // std::unique_ptr with message info - bool is_received = false; - auto callback = - [message_data, &is_received]( - std::unique_ptr msg, - const rclcpp::MessageInfo & message_info) -> void { - is_received = true; - ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); - ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); - }; - auto sub = node->create_subscription(topic_name, 1, callback); - - auto pu_message = std::make_unique(message_data); - pub->publish(std::move(pu_message)); - - ASSERT_FALSE(is_received); - wait_for_message_to_be_received(is_received, node); - ASSERT_TRUE(is_received); - } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::string &, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::string & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::string & with message info, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::string & msg, const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::unique_ptr, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::unique_ptr with message info, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr with message info, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr with message info, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::shared_ptr &, publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::shared_ptr & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::shared_ptr & with message info, + // publish with unique_ptr + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::shared_ptr & msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + auto pu_message = std::make_unique(message_data); + pub->publish(std::move(pu_message)); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } +} + +TEST_F( + test_intra_process_within_one_node, + type_adapted_messages_const_ref_pub_received_by_intra_process_subscription) +{ + using StringTypeAdapter = rclcpp::TypeAdapter; + const std::string message_data = "Message Data"; + const std::string topic_name = "topic_name"; + + auto node = rclcpp::Node::make_shared( + "test_intra_process", + rclcpp::NodeOptions().use_intra_process_comms(true)); + + auto pub = node->create_publisher(topic_name, 1); + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::string &, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::string & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::string & with message info, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::string & msg, const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), msg.c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::unique_ptr, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::unique_ptr with message info, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::unique_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr with message info, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback std::shared_ptr with message info, + // publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + std::shared_ptr msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::shared_ptr &, publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::shared_ptr & msg) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); + } + { // create_publisher with TypeAdapt struct, create_subscription with TypeAdapt struct, + // callback const std::shared_ptr & with message info, + // publish with const std::string & + bool is_received = false; + auto callback = + [message_data, &is_received]( + const std::shared_ptr & msg, + const rclcpp::MessageInfo & message_info) -> void { + is_received = true; + ASSERT_STREQ(message_data.c_str(), (*msg).c_str()); + ASSERT_TRUE(message_info.get_rmw_message_info().from_intra_process); + }; + auto sub = node->create_subscription(topic_name, 1, callback); + + std::string pu_message(message_data); + pub->publish(pu_message); + + ASSERT_FALSE(is_received); + wait_for_message_to_be_received(is_received, node); + ASSERT_TRUE(is_received); } }