Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non transform capabilities for intra-process #1849

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,10 @@ class AnySubscriptionCallback
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ptr);
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
} else {
throw std::runtime_error(
"convert_custom_type_to_ros_message_unique_ptr "
"unexpectedly called without TypeAdapter");
static_assert(
clalancette marked this conversation as resolved.
Show resolved Hide resolved
!sizeof(MessageT *),
"convert_custom_type_to_ros_message_unique_ptr() "
"unexpectedly called without specialized TypeAdapter");
}
}

Expand Down Expand Up @@ -817,6 +818,11 @@ class AnySubscriptionCallback
// Dispatch.
std::visit(
[&message, &message_info, this](auto && callback) {
// clang complains that 'this' lambda capture is unused, which is true
// in *some* specializations of this template, but not others. Just
// quiet it down.
(void)this;

using T = std::decay_t<decltype(callback)>;
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;

Expand Down
224 changes: 134 additions & 90 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ template<
class ROSMessageIntraProcessBuffer : public SubscriptionIntraProcessBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(ROSMessageIntraProcessBuffer)

using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<RosMessageT, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, RosMessageT>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,24 @@ namespace experimental
template<
typename MessageT,
typename SubscribedType,
typename SubscribedTypeAlloc = std::allocator<void>,
typename SubscribedTypeDeleter = std::default_delete<SubscribedType>
typename SubscribedTypeAlloc = std::allocator<SubscribedType>,
typename SubscribedTypeDeleter = std::default_delete<SubscribedType>,
typename ROSMessageType = SubscribedType,
typename Alloc = std::allocator<void>
>
class SubscriptionIntraProcess
: public SubscriptionIntraProcessBuffer<
SubscribedType,
SubscribedTypeAlloc,
SubscribedTypeDeleter
SubscribedTypeDeleter,
ROSMessageType
>
{
using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer<
SubscribedType,
SubscribedTypeAlloc,
SubscribedTypeDeleter
SubscribedTypeDeleter,
ROSMessageType
>;

public:
Expand All @@ -71,15 +75,15 @@ class SubscriptionIntraProcess
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;

SubscriptionIntraProcess(
AnySubscriptionCallback<MessageT, SubscribedTypeAlloc> callback,
std::shared_ptr<SubscribedTypeAlloc> allocator,
AnySubscriptionCallback<MessageT, Alloc> callback,
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
SubscribedTypeDeleter>(
allocator,
SubscribedTypeDeleter, ROSMessageType>(
std::make_shared<SubscribedTypeAlloc>(*allocator),
context,
topic_name,
qos_profile,
Expand Down Expand Up @@ -157,7 +161,7 @@ class SubscriptionIntraProcess
shared_ptr.reset();
}

AnySubscriptionCallback<MessageT, SubscribedTypeAlloc> any_callback_;
AnySubscriptionCallback<MessageT, Alloc> any_callback_;
};

} // namespace experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ namespace experimental

template<
typename SubscribedType,
typename Alloc = std::allocator<void>,
typename Alloc = std::allocator<SubscribedType>,
typename Deleter = std::default_delete<SubscribedType>,
/// MessageT::ros_message_type if MessageT is a TypeAdapter,
/// otherwise just MessageT.
typename ROSMessageType = typename rclcpp::TypeAdapter<SubscribedType>::ros_message_type
typename ROSMessageType = SubscribedType
>
class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMessageType, Alloc,
Deleter>
class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMessageType,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the logic with the hierarchy in the Subscription IPC class.
We have that:

SubscriptionIntraProcess is a
SubscriptionIntraProcessBuffer is a
ROSMessageIntraProcessBuffer is a
SubscriptionIntraProcessBase is a
Waitable

Do we really need all these layers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree this is confusing. The most confusing part to me is going from Subscription to ROSMessage back to Subscription. Maybe rename ROSMessageIntraProcessBuffer to something with subscription in it, or fold it in to another class in the inheritance hierarchy?

SubscriptionIntraProcessBuffer is a
ROSMessageIntraProcessBuffer is a
SubscriptionIntraProcessBase is a

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need all these layers?

I agree that this is confusing, but I think we do need all of these layers.

So at the base, the whole thing needs to be a Waitable because it needs to be waited on by the executors. We need the SubscriptionIntraProcessBase as a non-templated type so that we can store essentially untyped subscriptions in the IntraProcessManager (

using SubscriptionMap =
std::unordered_map<uint64_t, rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr>;
). The split between ROSMessageIntraProcessBuffer and SubscriptionIntraProcessBuffer is how the subscriber ultimately communicates whether it is a ROS type or not to the IntraProcessManager. So we need both of those layers to distinguish between those two cases. And finally, we need SubscriptionIntraProcess as the thing that the Subscription class interacts with.

I'll note that this hierarchy was largely here already; the only thing this PR does is to add in the ROSMessageIntraProcessBuffer.

One thing we might be able to do is collapse the SubscriptionIntraProcess and SubscriptionIntraProcessBuffer classes into one. Neither of them does too much on its own, and it isn't entirely clear why they are separated. If you'd like me to try to do that, I'll give that a whirl. I think we need the rest of the layers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll note that this hierarchy was largely here already; the only thing this PR does is to add in the ROSMessageIntraProcessBuffer.

Do you think that it makes sense to rename ROSMessageIntraProcessBuffer to something like SubscriptionROSMessageIntraProcessBuffer, or something similar? It is just a little odd to me that it's in the hierarchy but doesn't start with Subscription, like the other classes after waitable.

If you'd like me to try to do that, I'll give that a whirl. I think we need the rest of the layers.

Maybe make an issue and possibly do it in another PR? This isn't that high priority for me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think that it makes sense to rename ROSMessageIntraProcessBuffer to something like SubscriptionROSMessageIntraProcessBuffer, or something similar? It is just a little odd to me that it's in the hierarchy but doesn't start with Subscription, like the other classes after waitable.

That makes sense to me. I'll go ahead and rename it (probably to SubscriptionROSMsgIntraProcessBuffer, just to shorten it a bit).

typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
ROSMessageType>>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer)
Expand Down Expand Up @@ -81,8 +83,8 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: ROSMessageIntraProcessBuffer<ROSMessageType, Alloc, Deleter>(context, topic_name,
qos_profile),
: ROSMessageIntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>(
context, topic_name, qos_profile),
subscribed_type_allocator_(*allocator)
{
allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
Expand All @@ -92,7 +94,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
SubscribedTypeDeleter>(
buffer_type,
qos_profile,
allocator);
std::make_shared<Alloc>(subscribed_type_allocator_));
}

bool
Expand All @@ -118,7 +120,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
}

void
provide_intra_process_message(ConstMessageSharedPtr message)
provide_intra_process_message(ConstMessageSharedPtr message) override
{
if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
buffer_->add_shared(std::move(message));
Expand All @@ -130,7 +132,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
}

void
provide_intra_process_message(MessageUniquePtr message)
provide_intra_process_message(MessageUniquePtr message) override
{
if constexpr (std::is_same<SubscribedType, ROSMessageType>::value) {
buffer_->add_unique(std::move(message));
Expand Down
80 changes: 39 additions & 41 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class Publisher : public PublisherBase

/// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT.
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
/// MessageT::ros_message_type if MessageT is a TypeAdapter, otherwise just MessageT.
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;

using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, AllocatorT>;
Expand Down Expand Up @@ -320,11 +319,28 @@ class Publisher : public PublisherBase
>
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.
this->do_intra_process_publish(std::move(msg));
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, ros_msg);
return this->do_inter_process_publish(ros_msg);
}

bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();

if (inter_process_publish_needed) {
ROSMessageType ros_msg;
// TODO(clalancette): This is unnecessarily doing an additional conversion
// that may have already been done in do_intra_process_publish_and_return_shared().
// We should just reuse that effort.
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, ros_msg);
this->do_intra_process_publish(std::move(msg));
this->do_inter_process_publish(ros_msg);
} else {
this->do_intra_process_publish(std::move(msg));
}
}

/// Publish a message on the topic.
Expand All @@ -345,26 +361,20 @@ class Publisher : public PublisherBase
>
publish(const T & msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.

// Avoid allocating when not using intra process.
// Avoid double allocating when not using intra process.
if (!intra_process_is_enabled_) {
// Convert to the ROS message equivalent and publish it.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, ros_msg);
// In this case we're not using intra process.
return this->do_inter_process_publish(ros_msg);
}
// Otherwise we have to allocate memory in a unique_ptr, convert it,
// and pass it along.

// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *unique_ros_msg);
this->publish(std::move(unique_ros_msg));
auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg);
this->publish(std::move(unique_msg));
}

void
Expand Down Expand Up @@ -501,7 +511,7 @@ class Publisher : public PublisherBase
throw std::runtime_error("cannot publish msg which is a null pointer");
}

ipm->template do_intra_process_publish<PublishedType, AllocatorT>(
ipm->template do_intra_process_publish<PublishedType, ROSMessageType, AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);
Expand All @@ -519,33 +529,12 @@ class Publisher : public PublisherBase
throw std::runtime_error("cannot publish msg which is a null pointer");
}

ipm->template do_intra_process_publish<ROSMessageType, AllocatorT>(
ipm->template do_intra_process_publish<ROSMessageType, ROSMessageType, AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
ros_message_type_allocator_);
}


std::shared_ptr<const PublishedType>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}

return ipm->template do_intra_process_publish_and_return_shared<PublishedType,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);
}

std::shared_ptr<const ROSMessageType>
do_intra_process_ros_message_publish_and_return_shared(
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> msg)
Expand All @@ -559,7 +548,7 @@ class Publisher : public PublisherBase
throw std::runtime_error("cannot publish msg which is a null pointer");
}

return ipm->template do_intra_process_publish_and_return_shared<ROSMessageType,
return ipm->template do_intra_process_publish_and_return_shared<ROSMessageType, ROSMessageType,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
Expand All @@ -585,6 +574,15 @@ class Publisher : public PublisherBase
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}

/// Duplicate a given type adapted message as a unique_ptr.
std::unique_ptr<PublishedType, PublishedTypeDeleter>
duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg)
{
auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1);
PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg);
return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_);
}

/// Copy of original options passed during construction.
/**
* It is important to save a copy of this so that the rmw payload which it
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,10 @@ class Subscription : public SubscriptionBase
using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
MessageT,
SubscribedType,
AllocatorT,
SubscribedTypeDeleter>;
SubscribedTypeAllocator,
SubscribedTypeDeleter,
ROSMessageT,
AllocatorT>;
std::shared_ptr<SubscriptionIntraProcessT> subscription_intra_process_;
};

Expand Down
Loading