Skip to content

Commit

Permalink
Add non transform capabilities for intra-process (#1849)
Browse files Browse the repository at this point in the history
That is, make it so that if both a publisher and a subscriber using intra-process
and using TypeAdaptation are the same type, we don't need to convert to
a ROS Message type on the publisher and from a ROS Message type before
delivering to the subscriber.  Instead, we store the original message type
in the subscription buffer, and deliver it straight through to the subscriber.

This has two main benefits:
1. It is better for performance; we are eliding unnecessary work.
2. We can use TypeAdaptation to pass custom handles (like hardware accelerator
    handles) between a publisher and subscriber.  That means we can avoid
    unnecessary overhead when using a hardware accelerator, like synchronizing
    between the hardware accelerator and the main CPU.

The main way this is accomplished is by causing the SubscriptionIntraProcessBuffer
to store messages of the Subscribed Type (which in the case of a ROS message type
subscriber is a ROS message type, but in the case of a TypeAdapter type is the
custom type).  When a publish with a Type Adapted type happens, it delays conversion
of the message, and passes the message type from the user down into the IntraProcessManager.

The IntraProcessManager checks to see if it can cast the SubscriptionBuffer
to the custom type being published first.  If it can, then it knows it can deliver the message directly
into the SubscriptionBuffer with no conversion necessary.  If it can't, it then sees if
the SubscriptionBuffer is of a ROS message type.  If it is, then it performs the conversion
as necessary, and then stores it.  In all cases, the Subscription is then triggered underneath
so that the message will eventually be delivered by an executor.

We also add tests to ensure that in the case where the publisher and subscriber
and using the same type adapter, no conversion happens when delivering the
message.

Signed-off-by: Chris Lalancette <[email protected]>
Signed-off-by: Gonzalo de Pedro <[email protected]>
  • Loading branch information
gonzodepedro authored Jan 7, 2022
1 parent 8093648 commit 6b321ed
Show file tree
Hide file tree
Showing 13 changed files with 1,603 additions and 217 deletions.
140 changes: 104 additions & 36 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,22 @@ class AnySubscriptionCallback
}
}

std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
convert_custom_type_to_ros_message_unique_ptr(const SubscribedType & msg)
{
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator_, 1);
ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator_, ptr);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ptr);
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
} else {
static_assert(
!sizeof(MessageT *),
"convert_custom_type_to_ros_message_unique_ptr() "
"unexpectedly called without specialized TypeAdapter");
}
}

// Dispatch when input is a ros message and the output could be anything.
void
dispatch(
Expand Down Expand Up @@ -658,7 +674,7 @@ class AnySubscriptionCallback

void
dispatch_intra_process(
std::shared_ptr<const ROSMessageType> message,
std::shared_ptr<const SubscribedType> message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
Expand All @@ -677,65 +693,89 @@ class AnySubscriptionCallback

// conditions for custom type
if constexpr (is_ta && std::is_same_v<T, ConstRefCallback>) {
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message);
callback(*message);
} else if constexpr (is_ta && std::is_same_v<T, ConstRefWithInfoCallback>) { // NOLINT
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message, message_info);
callback(*message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrCallback>||
std::is_same_v<T, SharedPtrCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
callback(create_custom_unique_ptr_from_custom_shared_ptr_message(message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrWithInfoCallback>||
std::is_same_v<T, SharedPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
callback(create_custom_unique_ptr_from_custom_shared_ptr_message(message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrCallback>||
std::is_same_v<T, ConstRefSharedConstPtrCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
callback(message);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrWithInfoCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
callback(message, message_info);
}
// conditions for ros message type
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
callback(*message);
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
callback(*message, message_info);
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT[readability/braces]
if constexpr (is_ta) {
auto local = convert_custom_type_to_ros_message_unique_ptr(*message);
callback(*local);
} else {
callback(*message);
}
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) { // NOLINT[readability/braces]
if constexpr (is_ta) {
auto local = convert_custom_type_to_ros_message_unique_ptr(*message);
callback(*local, message_info);
} else {
callback(*message, message_info);
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrROSMessageCallback>||
std::is_same_v<T, SharedPtrROSMessageCallback>)
{
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message));
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
} else {
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message));
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<T, SharedPtrWithInfoROSMessageCallback>)
{
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message), message_info);
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
} else {
callback(create_ros_unique_ptr_from_ros_shared_ptr_message(message), message_info);
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrROSMessageCallback>)
{
callback(message);
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
} else {
callback(message);
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
{
callback(message, message_info);
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
} else {
callback(message, message_info);
}
}
// condition to catch SerializedMessage types
else if constexpr ( // NOLINT[readability/braces]
Expand Down Expand Up @@ -764,7 +804,7 @@ class AnySubscriptionCallback

void
dispatch_intra_process(
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> message,
std::unique_ptr<SubscribedType, SubscribedTypeDeleter> message,
const rclcpp::MessageInfo & message_info)
{
TRACEPOINT(callback_start, static_cast<const void *>(this), true);
Expand All @@ -778,70 +818,98 @@ class AnySubscriptionCallback
// Dispatch.
std::visit(
[&message, &message_info, this](auto && callback) {
// clang complains that 'this' lambda capture is unused, which is true
// in *some* specializations of this template, but not others. Just
// quiet it down.
(void)this;

using T = std::decay_t<decltype(callback)>;
static constexpr bool is_ta = rclcpp::TypeAdapter<MessageT>::is_specialized::value;

// conditions for custom type
if constexpr (is_ta && std::is_same_v<T, ConstRefCallback>) {
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message);
callback(*message);
} else if constexpr (is_ta && std::is_same_v<T, ConstRefWithInfoCallback>) { // NOLINT
auto local_message = convert_ros_message_to_custom_type_unique_ptr(*message);
callback(*local_message, message_info);
callback(*message, message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrCallback>||
std::is_same_v<T, SharedPtrCallback>
))
std::is_same_v<T, SharedPtrCallback>))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
callback(std::move(message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, UniquePtrWithInfoCallback>||
std::is_same_v<T, SharedPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
callback(std::move(message), message_info);
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrCallback>||
std::is_same_v<T, ConstRefSharedConstPtrCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message));
callback(std::move(message));
} else if constexpr ( // NOLINT[readability/braces]
is_ta && (
std::is_same_v<T, SharedConstPtrWithInfoCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoCallback>
))
{
callback(convert_ros_message_to_custom_type_unique_ptr(*message), message_info);
callback(std::move(message), message_info);
}
// conditions for ros message type
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT
callback(*message);
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) {
callback(*message, message_info);
else if constexpr (std::is_same_v<T, ConstRefROSMessageCallback>) { // NOLINT[readability/braces]
if constexpr (is_ta) {
auto local = convert_custom_type_to_ros_message_unique_ptr(*message);
callback(*local);
} else {
callback(*message);
}
} else if constexpr (std::is_same_v<T, ConstRefWithInfoROSMessageCallback>) { // NOLINT[readability/braces]
if constexpr (is_ta) {
auto local = convert_custom_type_to_ros_message_unique_ptr(*message);
callback(*local, message_info);
} else {
callback(*message, message_info);
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrROSMessageCallback>||
std::is_same_v<T, SharedPtrROSMessageCallback>)
{
callback(std::move(message));
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
} else {
callback(std::move(message));
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, UniquePtrWithInfoROSMessageCallback>||
std::is_same_v<T, SharedPtrWithInfoROSMessageCallback>)
{
callback(std::move(message), message_info);
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
} else {
callback(std::move(message), message_info);
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrROSMessageCallback>)
{
callback(std::move(message));
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message));
} else {
callback(std::move(message));
}
} else if constexpr ( // NOLINT[readability/braces]
std::is_same_v<T, SharedConstPtrWithInfoROSMessageCallback>||
std::is_same_v<T, ConstRefSharedConstPtrWithInfoROSMessageCallback>)
{
callback(std::move(message), message_info);
if constexpr (is_ta) {
callback(convert_custom_type_to_ros_message_unique_ptr(*message), message_info);
} else {
callback(std::move(message), message_info);
}
}
// condition to catch SerializedMessage types
else if constexpr ( // NOLINT[readability/braces]
Expand Down
Loading

0 comments on commit 6b321ed

Please sign in to comment.