Skip to content

Commit

Permalink
Add wait_for_all_acked support
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <[email protected]>
  • Loading branch information
Barry-Xu-2018 committed May 10, 2021
1 parent 2e4c97a commit f6af16a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
64 changes: 64 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <rmw/error_handling.h>
#include <rmw/rmw.h>

#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
Expand Down Expand Up @@ -203,6 +204,69 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

/// Wait until all published message data is acknowledged or until the specified timeout elapses.
/**
* This function waits until all published message data were acknowledged by all subscribers or
* timeout.
*
* timeout must be less then std::chrono::nanoseconds::max().
* If the timeout is negative then this function will block indefinitely until all published
* message data were acknowledged.
* If the timeout is 0 then this function will be non-blocking; checking all published message
* data were acknowledged (If acknowledged, return true. Otherwise, return false),
* but not waiting.
* If the timeout is greater than 0 then this function will return after that period of time has
* elapsed (return false) or all published message data were acknowledged (return true).
*
* \param[in] timeout the duration to wait for all published message data were acknowledged.
* \return `true` if all published message data were acknowledged before timeout, otherwise
* `false`.
* \throws rclcpp::exceptions::RCLError if middleware doesn't support or internal error occurs
*/
template<typename DurationRepT = int64_t, typename DurationT = std::milli>
bool
wait_for_all_acked(
std::chrono::duration<DurationRepT, DurationT> timeout =
std::chrono::duration<DurationRepT, DurationT>(-1)) const
{
// Casting to a double representation might lose precision and allow the check below to succeed
// but the actual cast to nanoseconds fail. Using 1 DurationT worth of nanoseconds less than max
constexpr auto maximum_safe_cast_ns =
std::chrono::nanoseconds::max() - std::chrono::duration<DurationRepT, DurationT>(1);

// If period is greater than nanoseconds::max(), the duration_cast to nanoseconds will overflow
// a signed integer, which is undefined behavior. Checking whether any std::chrono::duration is
// greater than nanoseconds::max() is a difficult general problem. This is a more conservative
// version of Howard Hinnant's (the <chrono> guy>) response here:
// https://stackoverflow.com/a/44637334/2089061
// However, this doesn't solve the issue for all possible duration types of period.
// Follow-up issue: https://github.com/ros2/rclcpp/issues/1177
constexpr auto ns_max_as_double =
std::chrono::duration_cast<std::chrono::duration<double, std::chrono::nanoseconds::period>>(
maximum_safe_cast_ns);
if (timeout > ns_max_as_double) {
throw std::invalid_argument{
"timeout must be less than std::chrono::nanoseconds::max()"};
}

rcl_duration_value_t rcl_timeout;

if (timeout < std::chrono::duration<DurationRepT, DurationT>::zero()) {
rcl_timeout = -1;
} else {
rcl_timeout = (std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)).count();
}

rcl_ret_t ret = rcl_publisher_wait_for_all_acked(publisher_handle_.get(), rcl_timeout);
if (ret == RCL_RET_OK) {
return true;
} else if (ret == RCL_RET_TIMEOUT) {
return false;
} else {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}

protected:
template<typename EventCallbackT>
void
Expand Down
45 changes: 44 additions & 1 deletion rclcpp/test/rclcpp/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

#include <gtest/gtest.h>

#include <string>
#include <chrono>
#include <memory>
#include <string>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -536,3 +537,45 @@ TEST_F(TestPublisher, get_network_flow_endpoints_errors) {
EXPECT_NO_THROW(publisher->get_network_flow_endpoints());
}
}

TEST_F(TestPublisher, check_wait_for_all_acked_return) {
initialize();
const rclcpp::QoS publisher_qos(1);
auto publisher = node->create_publisher<test_msgs::msg::Empty>("topic", publisher_qos);

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_OK);
EXPECT_TRUE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1)));
}

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_TIMEOUT);
EXPECT_FALSE(publisher->wait_for_all_acked(std::chrono::milliseconds(-1)));
}

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_UNSUPPORTED);
EXPECT_THROW(
publisher->wait_for_all_acked(std::chrono::milliseconds(-1)),
rclcpp::exceptions::RCLError);
}

{
// Using 'self' instead of 'lib:rclcpp' because `rcl_publisher_wait_for_all_acked` is entirely
// defined in a header
auto mock = mocking_utils::patch_and_return(
"self", rcl_publisher_wait_for_all_acked, RCL_RET_ERROR);
EXPECT_THROW(
publisher->wait_for_all_acked(std::chrono::milliseconds(-1)),
rclcpp::exceptions::RCLError);
}
}

0 comments on commit f6af16a

Please sign in to comment.