Skip to content

Commit

Permalink
Support zero-copy intra-process publishing (#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjsowa authored May 16, 2024
1 parent 74d9e8e commit fd51363
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 8 deletions.
21 changes: 21 additions & 0 deletions image_transport/include/image_transport/camera_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ class CameraPublisher
const sensor_msgs::msg::Image::ConstSharedPtr & image,
const sensor_msgs::msg::CameraInfo::ConstSharedPtr & info) const;

/*!
* \brief Publish an (image, info) pair on the topics associated with this CameraPublisher.
*/
IMAGE_TRANSPORT_PUBLIC
void publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info) const;

/*!
* \brief Publish an (image, info) pair with given timestamp on the topics associated with
* this CameraPublisher.
Expand All @@ -123,6 +131,19 @@ class CameraPublisher
sensor_msgs::msg::Image & image, sensor_msgs::msg::CameraInfo & info,
rclcpp::Time stamp) const;

/*!
* \brief Publish an (image, info) pair with given timestamp on the topics associated with
* this CameraPublisher.
*
* Convenience version, which sets the timestamps of both image and info to stamp before
* publishing.
*/
IMAGE_TRANSPORT_PUBLIC
void publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info,
rclcpp::Time stamp) const;

/*!
* \brief Shutdown the advertisements associated with this Publisher.
*/
Expand Down
6 changes: 6 additions & 0 deletions image_transport/include/image_transport/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ class Publisher
IMAGE_TRANSPORT_PUBLIC
void publish(const sensor_msgs::msg::Image::ConstSharedPtr & message) const;

/*!
* \brief Publish an image on the topics associated with this Publisher.
*/
IMAGE_TRANSPORT_PUBLIC
void publish(sensor_msgs::msg::Image::UniquePtr message) const;

/*!
* \brief Shutdown the advertisements associated with this Publisher.
*/
Expand Down
21 changes: 21 additions & 0 deletions image_transport/include/image_transport/publisher_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#ifndef IMAGE_TRANSPORT__PUBLISHER_PLUGIN_HPP_
#define IMAGE_TRANSPORT__PUBLISHER_PLUGIN_HPP_

#include <stdexcept>
#include <string>
#include <vector>

Expand Down Expand Up @@ -59,6 +60,14 @@ class PublisherPlugin
*/
virtual std::string getTransportName() const = 0;

/**
* \brief Check whether this plugin supports publishing using UniquePtr.
*/
virtual bool supportsUniquePtrPub() const
{
return false;
}

/**
* \brief Advertise a topic, simple version.
*/
Expand Down Expand Up @@ -95,6 +104,18 @@ class PublisherPlugin
publish(*message);
}

/**
* \brief Publish an image using the transport associated with this PublisherPlugin.
* This version of the function can be used to optimize cases where the Plugin can
* avoid doing copies of the data when having the ownership to the image message.
* Plugins that can take advantage of message ownership should overwrite this method
* along with supportsUniquePtrPub().
*/
virtual void publishUniquePtr(sensor_msgs::msg::Image::UniquePtr /*message*/) const
{
throw std::logic_error("publishUniquePtr() is not implemented.");
}

/**
* \brief Publish an image using the transport associated with this PublisherPlugin.
* This version of the function can be used to optimize cases where you don't want to
Expand Down
19 changes: 19 additions & 0 deletions image_transport/include/image_transport/raw_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define IMAGE_TRANSPORT__RAW_PUBLISHER_HPP_

#include <string>
#include <utility>

#include "sensor_msgs/msg/image.hpp"

Expand All @@ -56,12 +57,30 @@ class RawPublisher : public SimplePublisherPlugin<sensor_msgs::msg::Image>
return "raw";
}

virtual bool supportsUniquePtrPub() const
{
return true;
}

protected:
[[deprecated("Use publish(const sensor_msgs::msg::Image&, const PublisherT&) instead.")]]
virtual void publish(const sensor_msgs::msg::Image & message, const PublishFn & publish_fn) const
{
publish_fn(message);
}

virtual void publish(const sensor_msgs::msg::Image & message, const PublisherT & publisher) const
{
publisher->publish(message);
}

virtual void publish(
sensor_msgs::msg::Image::UniquePtr message,
const PublisherT & publisher) const
{
publisher->publish(std::move(message));
}

virtual std::string getTopicToAdvertise(const std::string & base_topic) const
{
return base_topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
#define IMAGE_TRANSPORT__SIMPLE_PUBLISHER_PLUGIN_HPP_

#include <memory>
#include <stdexcept>
#include <string>
#include <utility>

#include "rclcpp/node.hpp"
#include "rclcpp/logger.hpp"
Expand Down Expand Up @@ -89,7 +91,20 @@ class SimplePublisherPlugin : public PublisherPlugin
return;
}

publish(message, bindInternalPublisher(simple_impl_->pub_.get()));
publish(message, simple_impl_->pub_);
}

void publishUniquePtr(sensor_msgs::msg::Image::UniquePtr message) const override
{
if (!simple_impl_ || !simple_impl_->pub_) {
auto logger = simple_impl_ ? simple_impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_ERROR(
logger,
"Call to publish() on an invalid image_transport::SimplePublisherPlugin");
return;
}

publish(std::move(message), simple_impl_->pub_);
}

void shutdown() override
Expand All @@ -112,20 +127,54 @@ class SimplePublisherPlugin : public PublisherPlugin
simple_impl_->pub_ = node->create_publisher<M>(transport_topic, qos, options);
}

typedef typename rclcpp::Publisher<M>::SharedPtr PublisherT;

//! Generic function for publishing the internal message type.
typedef std::function<void (const M &)> PublishFn;

/**
* \brief Publish an image using the specified publish function. Must be implemented by
* the subclass.
* \brief Publish an image using the specified publish function.
*
* \deprecated Use publish(const sensor_msgs::msg::Image&, const PublisherT&) instead.
*
* The PublishFn publishes the transport-specific message type. This indirection allows
* SimpleSubscriberPlugin to use this function for both normal broadcast publishing and
* single subscriber publishing (in subscription callbacks).
*/
virtual void publish(
const sensor_msgs::msg::Image & /*message*/,
const PublishFn & /*publish_fn*/) const
{
throw std::logic_error(
"publish(const sensor_msgs::msg::Image&, const PublishFn&) is not implemented.");
}

/**
* \brief Publish an image using the specified publisher.
*/
virtual void publish(
const sensor_msgs::msg::Image & message,
const PublishFn & publish_fn) const = 0;
const PublisherT & publisher) const
{
// Fallback to old, deprecated method
publish(message, bindInternalPublisher(publisher.get()));
}

/**
* \brief Publish an image using the specified publisher.
*
* This version of the function can be used to optimize cases where the Plugin can
* avoid doing copies of the data when having the ownership to the image message.
* Plugins that can take advantage of message ownership should overwrite this method
* along with supportsUniquePtrPub().
*/
virtual void publish(
sensor_msgs::msg::Image::UniquePtr /*message*/,
const PublisherT & /*publisher*/) const
{
throw std::logic_error(
"publish(sensor_msgs::msg::Image::UniquePtr, const PublisherT&) is not implemented.");
}

/**
* \brief Return the communication topic name for a given base topic.
Expand All @@ -148,7 +197,7 @@ class SimplePublisherPlugin : public PublisherPlugin

rclcpp::Node * node_;
rclcpp::Logger logger_;
typename rclcpp::Publisher<M>::SharedPtr pub_;
PublisherT pub_;
};

std::unique_ptr<SimplePublisherPluginImpl> simple_impl_;
Expand Down
39 changes: 36 additions & 3 deletions image_transport/src/camera_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <memory>
#include <string>
#include <utility>

#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/logging.hpp"
Expand Down Expand Up @@ -124,7 +125,6 @@ void CameraPublisher::publish(
const sensor_msgs::msg::CameraInfo & info) const
{
if (!impl_ || !impl_->isValid()) {
// TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
Expand All @@ -141,7 +141,6 @@ void CameraPublisher::publish(
const sensor_msgs::msg::CameraInfo::ConstSharedPtr & info) const
{
if (!impl_ || !impl_->isValid()) {
// TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
Expand All @@ -153,12 +152,27 @@ void CameraPublisher::publish(
impl_->info_pub_->publish(*info);
}

void CameraPublisher::publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info) const
{
if (!impl_ || !impl_->isValid()) {
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
"Call to publish() on an invalid image_transport::CameraPublisher");
return;
}

impl_->image_pub_.publish(std::move(image));
impl_->info_pub_->publish(std::move(info));
}

void CameraPublisher::publish(
sensor_msgs::msg::Image & image, sensor_msgs::msg::CameraInfo & info,
rclcpp::Time stamp) const
{
if (!impl_ || !impl_->isValid()) {
// TODO(ros2) Switch to RCUTILS_ASSERT when ros2/rcutils#112 is merged
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
Expand All @@ -172,6 +186,25 @@ void CameraPublisher::publish(
impl_->info_pub_->publish(info);
}

void CameraPublisher::publish(
sensor_msgs::msg::Image::UniquePtr image,
sensor_msgs::msg::CameraInfo::UniquePtr info,
rclcpp::Time stamp) const
{
if (!impl_ || !impl_->isValid()) {
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(
logger,
"Call to publish() on an invalid image_transport::CameraPublisher");
return;
}

image->header.stamp = stamp;
info->header.stamp = stamp;
impl_->image_pub_.publish(std::move(image));
impl_->info_pub_->publish(std::move(info));
}

void CameraPublisher::shutdown()
{
if (impl_) {
Expand Down
31 changes: 31 additions & 0 deletions image_transport/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "image_transport/publisher.hpp"

#include <memory>
#include <optional>
#include <set>
#include <string>
#include <utility>
Expand Down Expand Up @@ -202,6 +203,36 @@ void Publisher::publish(const sensor_msgs::msg::Image::ConstSharedPtr & message)
}
}

void Publisher::publish(sensor_msgs::msg::Image::UniquePtr message) const
{
if (!impl_ || !impl_->isValid()) {
auto logger = impl_ ? impl_->logger_ : rclcpp::get_logger("image_transport");
RCLCPP_FATAL(logger, "Call to publish() on an invalid image_transport::Publisher");
return;
}

std::vector<std::shared_ptr<PublisherPlugin>> pubs_take_reference;
std::optional<std::shared_ptr<PublisherPlugin>> pub_takes_ownership = std::nullopt;

for (const auto & pub : impl_->publishers_) {
if (pub->getNumSubscribers() > 0) {
if (pub->supportsUniquePtrPub() && !pub_takes_ownership.has_value()) {
pub_takes_ownership = pub;
} else {
pubs_take_reference.push_back(pub);
}
}
}

for (const auto & pub : pubs_take_reference) {
pub->publish(*message);
}

if (pub_takes_ownership.has_value()) {
pub_takes_ownership.value()->publishUniquePtr(std::move(message));
}
}

void Publisher::shutdown()
{
if (impl_) {
Expand Down

0 comments on commit fd51363

Please sign in to comment.