Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*_raw function #388

Merged
merged 17 commits into from
Jun 16, 2018
21 changes: 21 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@ if(BUILD_TESTING)
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_serialized_message_allocator test/test_serialized_message_allocator.cpp
ENV RCL_ASSERT_RMW_ID_MATCHES=${rmw_implementation})
if(TARGET test_serialized_message_allocator)
target_include_directories(test_serialized_message_allocator PUBLIC
${test_msgs_INCLUDE_DIRS}
)
target_link_libraries(test_serialized_message_allocator
${PROJECT_NAME}
${test_msgs_LIBRARIES}
)
endif()
ament_add_gtest(test_service test/test_service.cpp)
if(TARGET test_service)
target_include_directories(test_service PUBLIC
Expand All @@ -280,6 +291,16 @@ if(BUILD_TESTING)
)
target_link_libraries(test_subscription ${PROJECT_NAME})
endif()
find_package(test_msgs REQUIRED)
ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp)
if(TARGET test_subscription_traits)
target_include_directories(test_subscription_traits PUBLIC
${rcl_INCLUDE_DIRS}
)
ament_target_dependencies(test_subscription_traits
"test_msgs"
)
endif()
ament_add_gtest(test_find_weak_nodes test/test_find_weak_nodes.cpp)
if(TARGET test_find_weak_nodes)
target_include_directories(test_find_weak_nodes PUBLIC
Expand Down
16 changes: 11 additions & 5 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
namespace rclcpp
{

template<typename MessageT, typename CallbackT, typename AllocatorT, typename SubscriptionT>
typename rclcpp::Subscription<MessageT, AllocatorT>::SharedPtr
template<
typename MessageT,
typename CallbackT,
typename AllocatorT,
typename CallbackMessageT,
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>>
typename std::shared_ptr<SubscriptionT>
create_subscription(
rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
const std::string & topic_name,
Expand All @@ -36,16 +41,17 @@ create_subscription(
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
bool use_intra_process_comms,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, AllocatorT>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
CallbackMessageT, AllocatorT>::SharedPtr
msg_mem_strat,
typename std::shared_ptr<AllocatorT> allocator)
{
auto subscription_options = rcl_subscription_get_default_options();
subscription_options.qos = qos_profile;
subscription_options.ignore_local_publications = ignore_local_publications;

auto factory =
rclcpp::create_subscription_factory<MessageT, CallbackT, AllocatorT, SubscriptionT>(
auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), msg_mem_strat, allocator);

auto sub = node_topics->create_subscription(
Expand Down
65 changes: 65 additions & 0 deletions rclcpp/include/rclcpp/message_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
#include <memory>
#include <stdexcept>

#include "rcl/types.h"

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rmw/serialized_message.h"

namespace rclcpp
{
namespace message_memory_strategy
Expand All @@ -39,14 +44,29 @@ class MessageMemoryStrategy
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;

using SerializedMessageAllocTraits = allocator::AllocRebind<rcl_serialized_message_t, Alloc>;
using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type;
using SerializedMessageDeleter =
allocator::Deleter<SerializedMessageAlloc, rcl_serialized_message_t>;

using BufferAllocTraits = allocator::AllocRebind<char, Alloc>;
using BufferAlloc = typename BufferAllocTraits::allocator_type;
using BufferDeleter = allocator::Deleter<BufferAlloc, char>;

MessageMemoryStrategy()
{
message_allocator_ = std::make_shared<MessageAlloc>();
serialized_message_allocator_ = std::make_shared<SerializedMessageAlloc>();
buffer_allocator_ = std::make_shared<BufferAlloc>();
rcutils_allocator_ = allocator::get_rcl_allocator<char, BufferAlloc>(*buffer_allocator_.get());
}

explicit MessageMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
serialized_message_allocator_ = std::make_shared<SerializedMessageAlloc>(*allocator.get());
buffer_allocator_ = std::make_shared<BufferAlloc>(*allocator.get());
rcutils_allocator_ = allocator::get_rcl_allocator<char, BufferAlloc>(*buffer_allocator_.get());
}

/// Default factory method
Expand All @@ -62,15 +82,60 @@ class MessageMemoryStrategy
return std::allocate_shared<MessageT, MessageAlloc>(*message_allocator_.get());
}

virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message(size_t capacity)
{
auto msg = new rcl_serialized_message_t;
*msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

auto serialized_msg = std::shared_ptr<rcl_serialized_message_t>(msg,
[](rmw_serialized_message_t * msg) {
auto ret = rmw_serialized_message_fini(msg);
delete msg;
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret, "leaking memory");
}
});

return serialized_msg;
}

virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message()
{
return borrow_serialized_message(default_buffer_capacity_);
}

virtual void set_default_buffer_capacity(size_t capacity)
{
default_buffer_capacity_ = capacity;
}

/// Release ownership of the message, which will deallocate it if it has no more owners.
/** \param[in] msg Shared pointer to the message we are returning. */
virtual void return_message(std::shared_ptr<MessageT> & msg)
{
msg.reset();
}

virtual void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & serialized_msg)
{
serialized_msg.reset();
}

std::shared_ptr<MessageAlloc> message_allocator_;
MessageDeleter message_deleter_;

std::shared_ptr<SerializedMessageAlloc> serialized_message_allocator_;
SerializedMessageDeleter serialized_message_deleter_;

std::shared_ptr<BufferAlloc> buffer_allocator_;
BufferDeleter buffer_deleter_;
size_t default_buffer_capacity_ = 0;

rcutils_allocator_t rcutils_allocator_;
};

} // namespace message_memory_strategy
Expand Down
15 changes: 10 additions & 5 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "rclcpp/publisher.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -183,15 +184,17 @@ class Node : public std::enable_shared_from_this<Node>
typename MessageT,
typename CallbackT,
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<MessageT, Alloc>>
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);

Expand All @@ -214,15 +217,17 @@ class Node : public std::enable_shared_from_this<Node>
typename MessageT,
typename CallbackT,
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<MessageT, Alloc>>
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
size_t qos_history_depth,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);

Expand Down
28 changes: 20 additions & 8 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,28 +82,35 @@ Node::create_publisher(
allocator);
}

template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
{
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;

if (!allocator) {
allocator = std::make_shared<Alloc>();
}

if (!msg_mem_strat) {
using rclcpp::message_memory_strategy::MessageMemoryStrategy;
msg_mem_strat = MessageMemoryStrategy<MessageT, Alloc>::create_default();
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default();
}

return rclcpp::create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
return rclcpp::create_subscription<MessageT, CallbackT, Alloc, CallbackMessageT, SubscriptionT>(
this->node_topics_.get(),
topic_name,
std::forward<CallbackT>(callback),
Expand All @@ -115,21 +122,26 @@ Node::create_subscription(
allocator);
}

template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
size_t qos_history_depth,
CallbackT && callback,
size_t qos_history_depth,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you speak a little about why you had to change the order of the arguments to this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't really explain why, but with the modifications done in subscription_traits, I get template deduction errors without this change. Somehow it can't really differentiate between the two function overloads. With the change done here, it works though.
I am happy to discuss/debug that further and try to resolve this. I just don't know how :)

rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
{
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;
return this->create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
return this->create_subscription<MessageT>(
topic_name,
std::forward<CallbackT>(callback),
qos,
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/parameter_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ class AsyncParametersClient
auto msg_mem_strat =
MessageMemoryStrategy<rcl_interfaces::msg::ParameterEvent, Alloc>::create_default();

using rcl_interfaces::msg::ParameterEvent;
return rclcpp::create_subscription<
rcl_interfaces::msg::ParameterEvent, CallbackT, Alloc, SubscriptionT>(
ParameterEvent, CallbackT, Alloc, ParameterEvent, SubscriptionT>(
this->node_topics_interface_.get(),
"parameter_events",
std::forward<CallbackT>(callback),
Expand Down
27 changes: 17 additions & 10 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,8 @@ class Publisher : public PublisherBase
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
if (status != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
throw std::runtime_error(
std::string("failed to publish intra process message: ") + rcl_get_error_string_safe());
// *INDENT-ON*
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
}
} else {
// Always destroy the message, even if we don't consume it, for consistency.
Expand Down Expand Up @@ -279,6 +276,19 @@ class Publisher : public PublisherBase
return this->publish(*msg);
}

void
publish(const rcl_serialized_message_t * serialized_msg)
{
if (store_intra_process_message_) {
// TODO(Karsten1987): support serialized message passed by intraprocess
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
}
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg);
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
}
}

std::shared_ptr<MessageAlloc> get_allocator() const
{
return message_allocator_;
Expand All @@ -289,11 +299,8 @@ class Publisher : public PublisherBase
do_inter_process_publish(const MessageT * msg)
{
auto status = rcl_publish(&publisher_handle_, msg);
if (status != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rcl_get_error_string_safe());
// *INDENT-ON*
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
}
}

Expand Down
Loading