Skip to content

Commit

Permalink
Fix TypeAdapted publishing with large messages. (#2443)
Browse files Browse the repository at this point in the history
Mostly by ensuring we aren't attempting to store
large messages on the stack.  Also add in tests.
I verified that before these changes, the tests failed,
while after them they succeed.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette authored Mar 5, 2024
1 parent 3df73f0 commit 51a4d2e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 15 deletions.
24 changes: 10 additions & 14 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ class Publisher : public PublisherBase
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg);
this->do_inter_process_publish(msg);
return;
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
Expand All @@ -318,22 +318,19 @@ class Publisher : public PublisherBase
>
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, ros_msg);
return this->do_inter_process_publish(ros_msg);
auto ros_msg_ptr = std::make_unique<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
this->do_inter_process_publish(*ros_msg_ptr);
return;
}

bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();

if (inter_process_publish_needed) {
auto ros_msg_ptr = std::make_shared<ROSMessageType>();
// TODO(clalancette): This is unnecessarily doing an additional conversion
// that may have already been done in do_intra_process_publish_and_return_shared().
// We should just reuse that effort.
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
this->do_intra_process_publish(std::move(msg));
this->do_inter_process_publish(*ros_msg_ptr);
Expand Down Expand Up @@ -368,13 +365,12 @@ class Publisher : public PublisherBase
>
publish(const T & msg)
{
// Avoid double allocating when not using intra process.
if (!intra_process_is_enabled_) {
// Convert to the ROS message equivalent and publish it.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, ros_msg);
// In this case we're not using intra process.
return this->do_inter_process_publish(ros_msg);
auto ros_msg_ptr = std::make_unique<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ros_msg_ptr);
this->do_inter_process_publish(*ros_msg_ptr);
return;
}

// Otherwise we have to allocate memory in a unique_ptr and pass it along.
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/test/msg/LargeMessage.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# A message with a size larger than the default Linux stack size
uint8[10485760] data
uint64 size
1 change: 1 addition & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ add_definitions(-DTEST_RESOURCES_DIRECTORY="${TEST_RESOURCES_DIRECTORY}")

rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs
../msg/Header.msg
../msg/LargeMessage.msg
../msg/MessageWithHeader.msg
../msg/String.msg
DEPENDENCIES builtin_interfaces
Expand Down
71 changes: 70 additions & 1 deletion rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.


#include <gtest/gtest.h>

#include <algorithm>
#include <chrono>
#include <memory>
#include <string>
Expand All @@ -25,6 +25,7 @@
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/rclcpp.hpp"

#include "rclcpp/msg/large_message.hpp"
#include "rclcpp/msg/string.hpp"


Expand Down Expand Up @@ -105,6 +106,32 @@ struct TypeAdapter<int, rclcpp::msg::String>
}
};

template<>
struct TypeAdapter<std::string, rclcpp::msg::LargeMessage>
{
using is_specialized = std::true_type;
using custom_type = std::string;
using ros_message_type = rclcpp::msg::LargeMessage;

static void
convert_to_ros_message(
const custom_type & source,
ros_message_type & destination)
{
destination.size = source.size();
std::memcpy(destination.data.data(), source.data(), source.size());
}

static void
convert_to_custom(
const ros_message_type & source,
custom_type & destination)
{
destination.resize(source.size);
std::memcpy(destination.data(), source.data.data(), source.size);
}
};

} // namespace rclcpp

/*
Expand Down Expand Up @@ -336,3 +363,45 @@ TEST_F(TestPublisher, check_type_adapted_message_is_sent_and_received) {
assert_message_was_received();
}
}

TEST_F(TestPublisher, test_large_message_unique)
{
// There have been some bugs in the past when trying to type-adapt large messages
// (larger than the stack size). Here we just make sure that a 10MB message works,
// which is larger than the default stack size on Linux.

using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::LargeMessage>;

auto node = std::make_shared<rclcpp::Node>("my_node", "/ns", rclcpp::NodeOptions());

const std::string topic_name = "topic_name";

auto pub = node->create_publisher<StringTypeAdapter>(topic_name, 1);

static constexpr size_t length = 10 * 1024 * 1024;
auto message_data = std::make_unique<std::string>();
message_data->reserve(length);
std::fill(message_data->begin(), message_data->begin() + length, '#');
pub->publish(std::move(message_data));
}

TEST_F(TestPublisher, test_large_message_constref)
{
// There have been some bugs in the past when trying to type-adapt large messages
// (larger than the stack size). Here we just make sure that a 10MB message works,
// which is larger than the default stack size on Linux.

using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::LargeMessage>;

auto node = std::make_shared<rclcpp::Node>("my_node", "/ns", rclcpp::NodeOptions());

const std::string topic_name = "topic_name";

auto pub = node->create_publisher<StringTypeAdapter>(topic_name, 1);

static constexpr size_t length = 10 * 1024 * 1024;
std::string message_data;
message_data.reserve(length);
std::fill(message_data.begin(), message_data.begin() + length, '#');
pub->publish(message_data);
}

0 comments on commit 51a4d2e

Please sign in to comment.