Skip to content

Commit

Permalink
Add in more of the necessary logic on the publish side. (#2)
Browse files Browse the repository at this point in the history
That is, make sure to deal with inter-process publishing properly.
This requires us to introduce two copies of
do_intra_process_and_return_shared(), one of which deals with the
ROS Message type and the other that deals with the PublishedType.
This is kind of wasteful, and we may get rid of this later on,
but this works for now.

Signed-off-by: Chris Lalancette <[email protected]>

Make sure to add the appropriate library directory. (#3)

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jan 4, 2022
1 parent 1ceabc8 commit 6dc5e4e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 28 deletions.
106 changes: 78 additions & 28 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class Publisher : public PublisherBase
get_subscription_count() > get_intra_process_subscription_count();

if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
auto shared_msg = this->do_intra_process_publish_and_return_shared<T>(std::move(msg));
this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish(std::move(msg));
Expand Down Expand Up @@ -319,15 +319,29 @@ class Publisher : public PublisherBase
>
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.
std::cout << "Publisher LALA" << std::endl;
// auto unique_ros_msg = this->create_ros_message_unique_ptr();
// rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
//this->publish(std::move(unique_ros_msg));
this->do_intra_process_publish(std::move(msg));
if (!intra_process_is_enabled_) {
// If we aren't using intraprocess at all, do the type conversion
// immediately and publish interprocess.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
this->do_inter_process_publish(*unique_ros_msg);
return;
}
// If an interprocess subscription exists, then the unique_ptr is promoted
// to a shared_ptr and published.
// This allows doing the intraprocess publish first and then doing the
// interprocess publish, resulting in lower publish-to-subscribe latency.
// It's not possible to do that with an unique_ptr,
// as do_intra_process_publish takes the ownership of the message.
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();

if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared<T>(std::move(msg));
// this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish(std::move(msg));
}
}

/// Publish a message on the topic.
Expand All @@ -348,26 +362,18 @@ class Publisher : public PublisherBase
>
publish(const T & msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.

// Avoid allocating when not using intra process.
// Avoid double allocation when not using intra process.
if (!intra_process_is_enabled_) {
// Convert to the ROS message equivalent and publish it.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, ros_msg);
// In this case we're not using intra process.
return this->do_inter_process_publish(ros_msg);
this->do_inter_process_publish(ros_msg);
return;
}
// Otherwise we have to allocate memory in a unique_ptr, convert it,
// and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *unique_ros_msg);
this->publish(std::move(unique_ros_msg));
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg);
this->publish(std::move(unique_msg));
}

void
Expand Down Expand Up @@ -492,8 +498,8 @@ class Publisher : public PublisherBase
}
}

void
do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
void
do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
Expand All @@ -511,7 +517,12 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
}


std::shared_ptr<const PublishedType>
template<typename T>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
Expand All @@ -531,6 +542,36 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
published_type_allocator_);
}

template<typename T>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}

ipm->template do_intra_process_publish_and_return_shared<PublishedType,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);

auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);

return unique_ros_msg;
}

/// Return a new unique_ptr using the ROSMessageType of the publisher.
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
create_ros_message_unique_ptr()
Expand All @@ -540,7 +581,7 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}

/// Duplicate a given ros message as a unique_ptr.
/// Duplicate a given ROS message as a unique_ptr.
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg)
{
Expand All @@ -549,6 +590,15 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}

/// Duplicate a given type adapted message as a unique_ptr.
std::unique_ptr<PublishedType, PublishedTypeDeleter>
duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg)
{
auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1);
PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg);
return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_);
}

/// Copy of original options passed during construction.
/**
* It is important to save a copy of this so that the rmw payload which it
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs
# Need the target name to depend on generated interface libraries
rosidl_get_typesupport_target(cpp_typesupport_target "${PROJECT_NAME}_test_msgs" "rosidl_typesupport_cpp")

set(append_library_dirs "${CMAKE_CURRENT_BINARY_DIR}")
if(WIN32)
set(append_library_dirs "${append_library_dirs}/$<CONFIG>")
endif()

ament_add_gtest(test_subscription_publisher_with_same_type_adapter test_subscription_publisher_with_same_type_adapter.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
)
Expand Down

0 comments on commit 6dc5e4e

Please sign in to comment.