Skip to content

Commit

Permalink
Advertize and subscribe with custom qos (#288)
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
ahcorde authored Oct 9, 2023
1 parent 2033e92 commit f9ac45d
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 0 deletions.
66 changes: 66 additions & 0 deletions image_transport/include/image_transport/image_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ class ImageTransport
IMAGE_TRANSPORT_PUBLIC
Publisher advertise(const std::string & base_topic, uint32_t queue_size, bool latch = false);

/*!
* \brief Advertise an image topic, simple version.
*/
IMAGE_TRANSPORT_PUBLIC
Publisher advertise(
const std::string & base_topic, rmw_qos_profile_t custom_qos,
bool latch = false);

/*!
* \brief Advertise an image topic with subcriber status callbacks.
*/
Expand Down Expand Up @@ -189,6 +197,64 @@ class ImageTransport
obj, transport_hints, options);
}

/**
* \brief Subscribe to an image topic, version for arbitrary std::function object and QoS.
*/
IMAGE_TRANSPORT_PUBLIC
Subscriber subscribe(
const std::string & base_topic, rmw_qos_profile_t custom_qos,
const Subscriber::Callback & callback,
const VoidPtr & tracked_object,
const TransportHints * transport_hints,
const rclcpp::SubscriptionOptions options);

/**
* \brief Subscribe to an image topic, version for bare function.
*/
IMAGE_TRANSPORT_PUBLIC
Subscriber subscribe(
const std::string & base_topic, rmw_qos_profile_t custom_qos,
void (* fp)(const ImageConstPtr &),
const TransportHints * transport_hints = nullptr,
const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions())
{
return subscribe(
base_topic, custom_qos,
std::function<void(const ImageConstPtr &)>(fp),
VoidPtr(), transport_hints, options);
}

/**
* \brief Subscribe to an image topic, version for class member function with bare pointer.
*/
template<class T>
Subscriber subscribe(
const std::string & base_topic, rmw_qos_profile_t custom_qos,
void (T::* fp)(const ImageConstPtr &), T * obj,
const TransportHints * transport_hints = nullptr,
const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions())
{
return subscribe(
base_topic, custom_qos, std::bind(fp, obj, std::placeholders::_1),
VoidPtr(), transport_hints, options);
}

/**
* \brief Subscribe to an image topic, version for class member function with shared_ptr.
*/
template<class T>
Subscriber subscribe(
const std::string & base_topic, rmw_qos_profile_t custom_qos,
void (T::* fp)(const ImageConstPtr &),
const std::shared_ptr<T> & obj,
const TransportHints * transport_hints = nullptr,
const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions())
{
return subscribe(
base_topic, custom_qos, std::bind(fp, obj.get(), std::placeholders::_1),
obj, transport_hints, options);
}

/*!
* \brief Advertise a synchronized camera raw image + info topic pair, simple version.
*/
Expand Down
23 changes: 23 additions & 0 deletions image_transport/src/image_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,29 @@ Publisher ImageTransport::advertise(const std::string & base_topic, uint32_t que
return create_publisher(impl_->node_.get(), base_topic, custom_qos);
}

Publisher ImageTransport::advertise(
const std::string & base_topic, rmw_qos_profile_t custom_qos,
bool latch)
{
// TODO(ros2) implement when resolved: https://github.com/ros2/ros2/issues/464
(void) latch;
return create_publisher(impl_->node_.get(), base_topic, custom_qos);
}

Subscriber ImageTransport::subscribe(
const std::string & base_topic, rmw_qos_profile_t custom_qos,
const Subscriber::Callback & callback,
const VoidPtr & tracked_object,
const TransportHints * transport_hints,
const rclcpp::SubscriptionOptions options)
{
(void) tracked_object;
return create_subscription(
impl_->node_.get(), base_topic, callback,
getTransportOrDefault(transport_hints), custom_qos,
options);
}

Subscriber ImageTransport::subscribe(
const std::string & base_topic, uint32_t queue_size,
const Subscriber::Callback & callback,
Expand Down
5 changes: 5 additions & 0 deletions image_transport/test/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ TEST_F(TestPublisher, image_transport_camera_publisher) {
auto pub = it.advertiseCamera("camera/image", 1);
}

TEST_F(TestPublisher, image_transport_camera_publisher_qos) {
image_transport::ImageTransport it(node_);
auto pub = it.advertise("camera/image", rmw_qos_profile_sensor_data);
}

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
Expand Down
65 changes: 65 additions & 0 deletions image_transport/test/test_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,71 @@ TEST_F(TestSubscriber, callback_groups) {
EXPECT_LT(timeout_elapsed, timeout);
}

TEST_F(TestSubscriber, callback_groups_custom_qos) {
using namespace std::chrono_literals;

// Create a publisher node.
auto node_publisher = rclcpp::Node::make_shared("image_publisher", rclcpp::NodeOptions());
image_transport::ImageTransport it_publisher(node_publisher);
image_transport::Publisher pub = it_publisher.advertise(
"camera/image",
rmw_qos_profile_sensor_data);

auto msg = sensor_msgs::msg::Image();
auto timer = node_publisher->create_wall_timer(100ms, [&]() {pub.publish(msg);});

// Create a subscriber to read the images.
std::atomic<bool> flag_1 = false;
std::atomic<bool> flag_2 = false;
std::function<void(const sensor_msgs::msg::Image::ConstSharedPtr & msg)> fcn1 =
[&](const auto & msg) {
(void)msg;
flag_1 = true;
std::this_thread::sleep_for(1s);
};
std::function<void(const sensor_msgs::msg::Image::ConstSharedPtr & msg)> fcn2 =
[&](const auto & msg) {
(void)msg;
flag_2 = true;
std::this_thread::sleep_for(1s);
};

auto cb_group = node_->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = cb_group;

image_transport::ImageTransport it(node_);

auto subscriber_1 = it.subscribe(
"camera/image", rmw_qos_profile_sensor_data, fcn1, nullptr,
nullptr, sub_options);
auto subscriber_2 = it.subscribe(
"camera/image", rmw_qos_profile_sensor_data, fcn2, nullptr,
nullptr, sub_options);

rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node_);
executor.add_node(node_publisher);
// Both callbacks should be executed and the flags should be set.
std::thread executor_thread([&]() {executor.spin();});

// The callbacks sleep for 5 seconds and mutually exclusive callbacks should be blocked.
// However, because of the the multithreaded executor and renentrant callback group,
// the flags should be set, as the callbacks should be in different threads.
auto timeout_elapsed = 0.0s;
auto sleep_duration = 0.1s;
auto timeout = 0.5s;

while (!(flag_1 && flag_2)) {
std::this_thread::sleep_for(sleep_duration);
timeout_elapsed += sleep_duration;
}
executor.cancel();
executor_thread.join();

EXPECT_LT(timeout_elapsed, timeout);
}

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
Expand Down

0 comments on commit f9ac45d

Please sign in to comment.