Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push the publish conversion down to the intra-process manager #6

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 119 additions & 26 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,20 @@ class IntraProcessManager
* \param message the message that is being stored.
*/
template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
typename PublishedType,
typename Alloc,
typename Deleter = std::default_delete<PublishedType>
>
void
do_intra_process_publish(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
std::unique_ptr<PublishedType, Deleter> message,
typename allocator::AllocRebind<PublishedType, Alloc>::allocator_type & allocator)
{

std::cout << "do_intra_process_publish --- " << std::endl;

using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageAllocTraits = allocator::AllocRebind<PublishedType, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

std::shared_lock<std::shared_timed_mutex> lock(mutex_);
Expand All @@ -202,9 +203,9 @@ class IntraProcessManager

if (sub_ids.take_ownership_subscriptions.empty()) {
// None of the buffers require ownership, so we promote the pointer
std::shared_ptr<MessageT> msg = std::move(message);
std::shared_ptr<PublishedType> msg = std::move(message);

this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
msg, sub_ids.take_shared_subscriptions);
} else {
if (sub_ids.take_shared_subscriptions.size() <= 1) {
Expand All @@ -222,34 +223,50 @@ class IntraProcessManager
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());

this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message),
concatenated_vector,
allocator);
} else {
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
auto shared_msg = std::allocate_shared<PublishedType, MessageAllocatorT>(allocator, *message);

this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
}
}
}

template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
std::shared_ptr<const MessageT>
typename T,
typename PublishedType,
typename ROSMessageType,
typename Alloc,
typename ROSMessageTypeAllocatorTraits,
typename ROSMessageTypeAllocator,
typename ROSMessageTypeDeleter,
typename Deleter = std::default_delete<PublishedType>
>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> message,
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type & allocator)
std::unique_ptr<PublishedType, Deleter> message,
typename allocator::AllocRebind<PublishedType, Alloc>::allocator_type & allocator,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter)
{
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
(void)ros_message_type_allocator;
(void)ros_message_type_deleter;

using MessageAllocTraits = allocator::AllocRebind<PublishedType, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

std::shared_lock<std::shared_timed_mutex> lock(mutex_);
Expand All @@ -266,24 +283,24 @@ class IntraProcessManager

if (sub_ids.take_ownership_subscriptions.empty()) {
// If there are no owning, just convert to shared.
std::shared_ptr<MessageT> shared_msg = std::move(message);
std::shared_ptr<PublishedType> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
}
return shared_msg;
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
auto shared_msg = std::allocate_shared<PublishedType, MessageAllocatorT>(allocator, *message);

if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg,
sub_ids.take_shared_subscriptions);
}

this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message),
sub_ids.take_ownership_subscriptions,
allocator);
Expand All @@ -292,6 +309,80 @@ class IntraProcessManager
}
}

template<
typename MessageT,
typename T,
typename PublishedType,
typename ROSMessageType,
typename Alloc,
typename ROSMessageTypeAllocatorTraits,
typename ROSMessageTypeAllocator,
typename ROSMessageTypeDeleter,
typename Deleter = std::default_delete<PublishedType>
>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
std::unique_ptr<PublishedType, Deleter> message,
typename allocator::AllocRebind<PublishedType, Alloc>::allocator_type & allocator,
ROSMessageTypeAllocator & ros_message_type_allocator,
ROSMessageTypeDeleter & ros_message_type_deleter)
{
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr);
auto unique_ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *unique_ros_msg);

using MessageAllocTraits = allocator::AllocRebind<PublishedType, Alloc>;
using MessageAllocatorT = typename MessageAllocTraits::allocator_type;

std::shared_lock<std::shared_timed_mutex> lock(mutex_);

auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling do_intra_process_publish for invalid or no longer existing publisher id");
return nullptr;
}
const auto & sub_ids = publisher_it->second;

if (sub_ids.take_ownership_subscriptions.empty()) {
// If there are no owning, just convert to shared.
std::shared_ptr<PublishedType> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg, sub_ids.take_shared_subscriptions);
}
} else {
// Construct a new shared pointer from the message for the buffers that
// do not require ownership and to return.
auto shared_msg = std::allocate_shared<PublishedType, MessageAllocatorT>(allocator, *message);

if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<PublishedType, Alloc, Deleter>(
shared_msg,
sub_ids.take_shared_subscriptions);
}

this->template add_owned_msg_to_buffers<PublishedType, Alloc, Deleter>(
std::move(message),
sub_ids.take_ownership_subscriptions,
allocator);
}

// TODO(clalancette): depending on how the publisher and subscriber are setup, we may end
// up doing a conversion more than once; in this function and down in
// add_{shared,owned}_msg_to_buffers(). We should probably push this down further to avoid
// that double conversion.
return unique_ros_msg;
}

/// Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC
bool
Expand Down Expand Up @@ -340,7 +431,8 @@ class IntraProcessManager
template<
typename MessageT,
typename Alloc,
typename Deleter>
typename Deleter
>
void
add_shared_msg_to_buffers(
std::shared_ptr<const MessageT> message,
Expand Down Expand Up @@ -394,8 +486,9 @@ class IntraProcessManager

template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
typename Alloc,
typename Deleter
>
void
add_owned_msg_to_buffers(
std::unique_ptr<MessageT, Deleter> message,
Expand Down
47 changes: 6 additions & 41 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,7 @@ class Publisher : public PublisherBase


template<typename T>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value, std::shared_ptr<const ROSMessageType>
>
std::shared_ptr<const ROSMessageType>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
Expand All @@ -537,44 +533,13 @@ class Publisher : public PublisherBase
throw std::runtime_error("cannot publish msg which is a null pointer");
}

return ipm->template do_intra_process_publish_and_return_shared<PublishedType,
AllocatorT>(
return ipm->template do_intra_process_publish_and_return_shared<MessageT, T, PublishedType,
ROSMessageType, AllocatorT, ROSMessageTypeAllocatorTraits, ROSMessageTypeAllocator, ROSMessageTypeDeleter>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);
}

template<typename T>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}

ipm->template do_intra_process_publish_and_return_shared<PublishedType,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);

// TODO(clalancette): We are doing the conversion at least twice; inside of
// IntraProcessManager::do_intra_process_publish_and_return_shared(), and here.
// We should just do it in the IntraProcessManager and return it here.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);

return unique_ros_msg;
published_type_allocator_,
ros_message_type_allocator_,
ros_message_type_deleter_);
}

/// Return a new unique_ptr using the ROSMessageType of the publisher.
Expand Down