Skip to content

Commit

Permalink
Push the publish conversion down to the intra-process manager (#6)
Browse files Browse the repository at this point in the history
* Remove template defaults that we never use.

Signed-off-by: Chris Lalancette <[email protected]>

* Rename MessageT to PublishedType in the IntraProcessManager.

It makes it easier to determine which type we are talking about.

Signed-off-by: Chris Lalancette <[email protected]>

* Get all the types setup to push the conversion down to IPM.

There really is no functional change here, just preparing to
move the conversion down to the intra-process manager.

Signed-off-by: Chris Lalancette <[email protected]>

* Move conversion down to the intra-process manager.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette committed Jan 4, 2022
1 parent df5f52e commit 54166ee
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 67 deletions.
145 changes: 119 additions & 26 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,20 @@ class IntraProcessManager
* \param message the message that is being stored.
*/
template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
typename PublishedType,
typename Alloc,
typename Deleter = std::default_delete<PublishedType>
>
void
do_intra_process_publish(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
std::unique_ptr<PublishedType, Deleter> message,
typename allocator::AllocRebind<PublishedType, Alloc>::allocator_type & allocator)
{

std::cout << "do_intra_process_publish --- " << std::endl;

using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocTraits = allocator::AllocRebind<PublishedType, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

std::shared_lock<std::shared_timed_mutex> lock(mutex_);
Expand All @@ -206,9 +207,9 @@ class IntraProcessManager

if (sub_ids.take_ownership_subscriptions.empty()) {
// None of the buffers require ownership, so we promote the pointer
std::shared_ptr<MessageT> msg = std::move(message);
std::shared_ptr<PublishedType> msg = std::move(message);

this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
msg, sub_ids.take_shared_subscriptions);
} else {
if (sub_ids.take_shared_subscriptions.size() <= 1) {
Expand All @@ -226,34 +227,50 @@ class IntraProcessManager
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());

this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message),
concatenated_vector,
allocator);
} else {
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
auto shared_msg = std::allocate_shared<PublishedType, MessageAllocatorT>(allocator, *message);

this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
}
}
}

template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
std::shared_ptr<const MessageT>
typename T,
typename PublishedType,
typename ROSMessageType,
typename Alloc,
typename ROSMessageTypeAllocatorTraits,
typename ROSMessageTypeAllocator,
typename ROSMessageTypeDeleter,
typename Deleter = std::default_delete<PublishedType>
>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
std::unique_ptr<PublishedType, Deleter> message,
typename allocator::AllocRebind<PublishedType, Alloc>::allocator_type & allocator,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
(void)ros_message_type_allocator;
(void)ros_message_type_deleter;

using MessageAllocTraits = allocator::AllocRebind<PublishedType, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

std::shared_lock<std::shared_timed_mutex> lock(mutex_);
Expand All @@ -270,24 +287,24 @@ class IntraProcessManager

if (sub_ids.take_ownership_subscriptions.empty()) {
// If there are no owning, just convert to shared.
std::shared_ptr<MessageT> shared_msg = std::move(message);
std::shared_ptr<PublishedType> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
}
return shared_msg;
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
auto shared_msg = std::allocate_shared<PublishedType, MessageAllocatorT>(allocator, *message);

if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg,
sub_ids.take_shared_subscriptions);
}

this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message),
sub_ids.take_ownership_subscriptions,
allocator);
Expand All @@ -296,6 +313,80 @@ class IntraProcessManager
}
}

template<
typename MessageT,
typename T,
typename PublishedType,
typename ROSMessageType,
typename Alloc,
typename ROSMessageTypeAllocatorTraits,
typename ROSMessageTypeAllocator,
typename ROSMessageTypeDeleter,
typename Deleter = std::default_delete<PublishedType>
>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<PublishedType, Deleter> message,
typename allocator::AllocRebind<PublishedType, Alloc>::allocator_type & allocator,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter)
{
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr);
auto unique_ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *unique_ros_msg);

using MessageAllocTraits = allocator::AllocRebind<PublishedType, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

std::shared_lock<std::shared_timed_mutex> lock(mutex_);

auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
return nullptr;
}
const auto & sub_ids = publisher_it->second;

if (sub_ids.take_ownership_subscriptions.empty()) {
// If there are no owning, just convert to shared.
std::shared_ptr<PublishedType> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
}
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<PublishedType, MessageAllocatorT>(allocator, *message);

if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg,
sub_ids.take_shared_subscriptions);
}

this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message),
sub_ids.take_ownership_subscriptions,
allocator);
}

// TODO(clalancette): depending on how the publisher and subscriber are setup, we may end
// up doing a conversion more than once; in this function and down in
// add_{shared,owned}_msg_to_buffers(). We should probably push this down further to avoid
// that double conversion.
return unique_ros_msg;
}

/// Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC
bool
Expand Down Expand Up @@ -344,7 +435,8 @@ class IntraProcessManager
template<
typename MessageT,
typename Alloc,
typename Deleter>
typename Deleter
>
void
add_shared_msg_to_buffers(
std::shared_ptr<const MessageT> message,
Expand Down Expand Up @@ -398,8 +490,9 @@ class IntraProcessManager

template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
typename Alloc,
typename Deleter
>
void
add_owned_msg_to_buffers(
std::unique_ptr<MessageT, Deleter> message,
Expand Down
47 changes: 6 additions & 41 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,7 @@ class Publisher : public PublisherBase


template<typename T>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value, std::shared_ptr<const ROSMessageType>
>
std::shared_ptr<const ROSMessageType>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
Expand All @@ -535,44 +531,13 @@ class Publisher : public PublisherBase
throw std::runtime_error("cannot publish msg which is a null pointer");
}

return ipm->template do_intra_process_publish_and_return_shared<PublishedType,
AllocatorT>(
return ipm->template do_intra_process_publish_and_return_shared<MessageT, T, PublishedType,
ROSMessageType, AllocatorT, ROSMessageTypeAllocatorTraits, ROSMessageTypeAllocator, ROSMessageTypeDeleter>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);
}

template<typename T>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}

ipm->template do_intra_process_publish_and_return_shared<PublishedType,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);

// TODO(clalancette): We are doing the conversion at least twice; inside of
// IntraProcessManager::do_intra_process_publish_and_return_shared(), and here.
// We should just do it in the IntraProcessManager and return it here.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);

return unique_ros_msg;
published_type_allocator_,
ros_message_type_allocator_,
ros_message_type_deleter_);
}

/// Return a new unique_ptr using the ROSMessageType of the publisher.
Expand Down

0 comments on commit 54166ee

Please sign in to comment.