From fd51363e2b8f8cd99e9a77970c1d49eb86c1480f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Thu, 16 May 2024 09:50:51 +0200 Subject: [PATCH] Support zero-copy intra-process publishing (#306) --- .../image_transport/camera_publisher.hpp | 21 +++++++ .../include/image_transport/publisher.hpp | 6 ++ .../image_transport/publisher_plugin.hpp | 21 +++++++ .../include/image_transport/raw_publisher.hpp | 19 ++++++ .../simple_publisher_plugin.hpp | 59 +++++++++++++++++-- image_transport/src/camera_publisher.cpp | 39 +++++++++++- image_transport/src/publisher.cpp | 31 ++++++++++ 7 files changed, 188 insertions(+), 8 deletions(-) diff --git a/image_transport/include/image_transport/camera_publisher.hpp b/image_transport/include/image_transport/camera_publisher.hpp index 9ed81b07..ad80328c 100644 --- a/image_transport/include/image_transport/camera_publisher.hpp +++ b/image_transport/include/image_transport/camera_publisher.hpp @@ -111,6 +111,14 @@ class CameraPublisher const sensor_msgs::msg::Image::ConstSharedPtr & image, const sensor_msgs::msg::CameraInfo::ConstSharedPtr & info) const; + /*! + * \brief Publish an (image, info) pair on the topics associated with this CameraPublisher. + */ + IMAGE_TRANSPORT_PUBLIC + void publish( + sensor_msgs::msg::Image::UniquePtr image, + sensor_msgs::msg::CameraInfo::UniquePtr info) const; + /*! * \brief Publish an (image, info) pair with given timestamp on the topics associated with * this CameraPublisher. @@ -123,6 +131,19 @@ class CameraPublisher sensor_msgs::msg::Image & image, sensor_msgs::msg::CameraInfo & info, rclcpp::Time stamp) const; + /*! + * \brief Publish an (image, info) pair with given timestamp on the topics associated with + * this CameraPublisher. + * + * Convenience version, which sets the timestamps of both image and info to stamp before + * publishing. + */ + IMAGE_TRANSPORT_PUBLIC + void publish( + sensor_msgs::msg::Image::UniquePtr image, + sensor_msgs::msg::CameraInfo::UniquePtr info, + rclcpp::Time stamp) const; + /*! * \brief Shutdown the advertisements associated with this Publisher. */ diff --git a/image_transport/include/image_transport/publisher.hpp b/image_transport/include/image_transport/publisher.hpp index dc28d017..4f529121 100644 --- a/image_transport/include/image_transport/publisher.hpp +++ b/image_transport/include/image_transport/publisher.hpp @@ -103,6 +103,12 @@ class Publisher IMAGE_TRANSPORT_PUBLIC void publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const; + /*! + * \brief Publish an image on the topics associated with this Publisher. + */ + IMAGE_TRANSPORT_PUBLIC + void publish(sensor_msgs::msg::Image::UniquePtr message) const; + /*! * \brief Shutdown the advertisements associated with this Publisher. */ diff --git a/image_transport/include/image_transport/publisher_plugin.hpp b/image_transport/include/image_transport/publisher_plugin.hpp index d899349e..c545b4ca 100644 --- a/image_transport/include/image_transport/publisher_plugin.hpp +++ b/image_transport/include/image_transport/publisher_plugin.hpp @@ -29,6 +29,7 @@ #ifndef IMAGE_TRANSPORT__PUBLISHER_PLUGIN_HPP_ #define IMAGE_TRANSPORT__PUBLISHER_PLUGIN_HPP_ +#include #include #include @@ -59,6 +60,14 @@ class PublisherPlugin */ virtual std::string getTransportName() const = 0; + /** + * \brief Check whether this plugin supports publishing using UniquePtr. + */ + virtual bool supportsUniquePtrPub() const + { + return false; + } + /** * \brief Advertise a topic, simple version. */ @@ -95,6 +104,18 @@ class PublisherPlugin publish(*message); } + /** + * \brief Publish an image using the transport associated with this PublisherPlugin. + * This version of the function can be used to optimize cases where the Plugin can + * avoid doing copies of the data when having the ownership to the image message. + * Plugins that can take advantage of message ownership should overwrite this method + * along with supportsUniquePtrPub(). + */ + virtual void publishUniquePtr(sensor_msgs::msg::Image::UniquePtr /*message*/) const + { + throw std::logic_error("publishUniquePtr() is not implemented."); + } + /** * \brief Publish an image using the transport associated with this PublisherPlugin. * This version of the function can be used to optimize cases where you don't want to diff --git a/image_transport/include/image_transport/raw_publisher.hpp b/image_transport/include/image_transport/raw_publisher.hpp index e52b26b9..65fed4a7 100644 --- a/image_transport/include/image_transport/raw_publisher.hpp +++ b/image_transport/include/image_transport/raw_publisher.hpp @@ -30,6 +30,7 @@ #define IMAGE_TRANSPORT__RAW_PUBLISHER_HPP_ #include +#include #include "sensor_msgs/msg/image.hpp" @@ -56,12 +57,30 @@ class RawPublisher : public SimplePublisherPlugin return "raw"; } + virtual bool supportsUniquePtrPub() const + { + return true; + } + protected: + [[deprecated("Use publish(const sensor_msgs::msg::Image&, const PublisherT&) instead.")]] virtual void publish(const sensor_msgs::msg::Image & message, const PublishFn & publish_fn) const { publish_fn(message); } + virtual void publish(const sensor_msgs::msg::Image & message, const PublisherT & publisher) const + { + publisher->publish(message); + } + + virtual void publish( + sensor_msgs::msg::Image::UniquePtr message, + const PublisherT & publisher) const + { + publisher->publish(std::move(message)); + } + virtual std::string getTopicToAdvertise(const std::string & base_topic) const { return base_topic; diff --git a/image_transport/include/image_transport/simple_publisher_plugin.hpp b/image_transport/include/image_transport/simple_publisher_plugin.hpp index f088c874..4ca77524 100644 --- a/image_transport/include/image_transport/simple_publisher_plugin.hpp +++ b/image_transport/include/image_transport/simple_publisher_plugin.hpp @@ -30,7 +30,9 @@ #define IMAGE_TRANSPORT__SIMPLE_PUBLISHER_PLUGIN_HPP_ #include +#include #include +#include #include "rclcpp/node.hpp" #include "rclcpp/logger.hpp" @@ -89,7 +91,20 @@ class SimplePublisherPlugin : public PublisherPlugin return; } - publish(message, bindInternalPublisher(simple_impl_->pub_.get())); + publish(message, simple_impl_->pub_); + } + + void publishUniquePtr(sensor_msgs::msg::Image::UniquePtr message) const override + { + if (!simple_impl_ || !simple_impl_->pub_) { + auto logger = simple_impl_ ? simple_impl_->logger_ : rclcpp::get_logger("image_transport"); + RCLCPP_ERROR( + logger, + "Call to publish() on an invalid image_transport::SimplePublisherPlugin"); + return; + } + + publish(std::move(message), simple_impl_->pub_); } void shutdown() override @@ -112,20 +127,54 @@ class SimplePublisherPlugin : public PublisherPlugin simple_impl_->pub_ = node->create_publisher(transport_topic, qos, options); } + typedef typename rclcpp::Publisher::SharedPtr PublisherT; + //! Generic function for publishing the internal message type. typedef std::function PublishFn; /** - * \brief Publish an image using the specified publish function. Must be implemented by - * the subclass. + * \brief Publish an image using the specified publish function. + * + * \deprecated Use publish(const sensor_msgs::msg::Image&, const PublisherT&) instead. * * The PublishFn publishes the transport-specific message type. This indirection allows * SimpleSubscriberPlugin to use this function for both normal broadcast publishing and * single subscriber publishing (in subscription callbacks). */ + virtual void publish( + const sensor_msgs::msg::Image & /*message*/, + const PublishFn & /*publish_fn*/) const + { + throw std::logic_error( + "publish(const sensor_msgs::msg::Image&, const PublishFn&) is not implemented."); + } + + /** + * \brief Publish an image using the specified publisher. + */ virtual void publish( const sensor_msgs::msg::Image & message, - const PublishFn & publish_fn) const = 0; + const PublisherT & publisher) const + { + // Fallback to old, deprecated method + publish(message, bindInternalPublisher(publisher.get())); + } + + /** + * \brief Publish an image using the specified publisher. + * + * This version of the function can be used to optimize cases where the Plugin can + * avoid doing copies of the data when having the ownership to the image message. + * Plugins that can take advantage of message ownership should overwrite this method + * along with supportsUniquePtrPub(). + */ + virtual void publish( + sensor_msgs::msg::Image::UniquePtr /*message*/, + const PublisherT & /*publisher*/) const + { + throw std::logic_error( + "publish(sensor_msgs::msg::Image::UniquePtr, const PublisherT&) is not implemented."); + } /** * \brief Return the communication topic name for a given base topic. @@ -148,7 +197,7 @@ class SimplePublisherPlugin : public PublisherPlugin rclcpp::Node * node_; rclcpp::Logger logger_; - typename rclcpp::Publisher::SharedPtr pub_; + PublisherT pub_; }; std::unique_ptr simple_impl_; diff --git a/image_transport/src/camera_publisher.cpp b/image_transport/src/camera_publisher.cpp index 7c1ae450..66c7d7f1 100644 --- a/image_transport/src/camera_publisher.cpp +++ b/image_transport/src/camera_publisher.cpp @@ -30,6 +30,7 @@ #include #include +#include #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/logging.hpp" @@ -124,7 +125,6 @@ void CameraPublisher::publish( const sensor_msgs::msg::CameraInfo & info) const { if (!impl_ || !impl_->isValid()) { - // TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport"); RCLCPP_FATAL( logger, @@ -141,7 +141,6 @@ void CameraPublisher::publish( const sensor_msgs::msg::CameraInfo::ConstSharedPtr & info) const { if (!impl_ || !impl_->isValid()) { - // TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport"); RCLCPP_FATAL( logger, @@ -153,12 +152,27 @@ void CameraPublisher::publish( impl_->info_pub_->publish(*info); } +void CameraPublisher::publish( + sensor_msgs::msg::Image::UniquePtr image, + sensor_msgs::msg::CameraInfo::UniquePtr info) const +{ + if (!impl_ || !impl_->isValid()) { + auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport"); + RCLCPP_FATAL( + logger, + "Call to publish() on an invalid image_transport::CameraPublisher"); + return; + } + + impl_->image_pub_.publish(std::move(image)); + impl_->info_pub_->publish(std::move(info)); +} + void CameraPublisher::publish( sensor_msgs::msg::Image & image, sensor_msgs::msg::CameraInfo & info, rclcpp::Time stamp) const { if (!impl_ || !impl_->isValid()) { - // TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport"); RCLCPP_FATAL( logger, @@ -172,6 +186,25 @@ void CameraPublisher::publish( impl_->info_pub_->publish(info); } +void CameraPublisher::publish( + sensor_msgs::msg::Image::UniquePtr image, + sensor_msgs::msg::CameraInfo::UniquePtr info, + rclcpp::Time stamp) const +{ + if (!impl_ || !impl_->isValid()) { + auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport"); + RCLCPP_FATAL( + logger, + "Call to publish() on an invalid image_transport::CameraPublisher"); + return; + } + + image->header.stamp = stamp; + info->header.stamp = stamp; + impl_->image_pub_.publish(std::move(image)); + impl_->info_pub_->publish(std::move(info)); +} + void CameraPublisher::shutdown() { if (impl_) { diff --git a/image_transport/src/publisher.cpp b/image_transport/src/publisher.cpp index 0bed7b09..d467e54b 100644 --- a/image_transport/src/publisher.cpp +++ b/image_transport/src/publisher.cpp @@ -29,6 +29,7 @@ #include "image_transport/publisher.hpp" #include +#include #include #include #include @@ -202,6 +203,36 @@ void Publisher::publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) } } +void Publisher::publish(sensor_msgs::msg::Image::UniquePtr message) const +{ + if (!impl_ || !impl_->isValid()) { + auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport"); + RCLCPP_FATAL(logger, "Call to publish() on an invalid image_transport::Publisher"); + return; + } + + std::vector> pubs_take_reference; + std::optional> pub_takes_ownership = std::nullopt; + + for (const auto & pub : impl_->publishers_) { + if (pub->getNumSubscribers() > 0) { + if (pub->supportsUniquePtrPub() && !pub_takes_ownership.has_value()) { + pub_takes_ownership = pub; + } else { + pubs_take_reference.push_back(pub); + } + } + } + + for (const auto & pub : pubs_take_reference) { + pub->publish(*message); + } + + if (pub_takes_ownership.has_value()) { + pub_takes_ownership.value()->publishUniquePtr(std::move(message)); + } +} + void Publisher::shutdown() { if (impl_) {