diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 434f1f80bb..8935f0884a 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -32,9 +32,16 @@ namespace rclcpp // Forward declarations for friend statement in class CallbackGroup namespace node { +// TODO(wjwwood): remove this and the friend decl when refactoring of Node is done. class Node; } // namespace node +// Forward declarations for friend statement in class CallbackGroup +namespace node_interfaces +{ +class NodeTopics; +} // namespace node_interfaces + namespace callback_group { @@ -47,6 +54,7 @@ enum class CallbackGroupType class CallbackGroup { friend class rclcpp::node::Node; + friend class rclcpp::node_interfaces::NodeTopics; public: RCLCPP_SMART_PTR_DEFINITIONS(CallbackGroup); @@ -78,7 +86,7 @@ class CallbackGroup const CallbackGroupType & type() const; -private: +protected: RCLCPP_DISABLE_COPY(CallbackGroup); RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 4cc9dc6e5d..10f7d9718f 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -40,6 +40,7 @@ #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base.hpp" +#include "rclcpp/node_interfaces/node_topics.hpp" #include "rclcpp/parameter.hpp" #include "rclcpp/publisher.hpp" #include "rclcpp/service.hpp" @@ -382,6 +383,10 @@ class Node : public std::enable_shared_from_this rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_base_interface(); + RCLCPP_PUBLIC + rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr + get_node_topics_interface(); + private: RCLCPP_DISABLE_COPY(Node); @@ -390,6 +395,7 @@ class Node : public std::enable_shared_from_this group_in_node(callback_group::CallbackGroup::SharedPtr group); rclcpp::node_interfaces::NodeBase::SharedPtr node_base_; + rclcpp::node_interfaces::NodeTopics::SharedPtr node_topics_; size_t number_of_timers_; size_t number_of_services_; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 78056594dc..bc3faab950 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -38,6 +38,8 @@ #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/parameter.hpp" +#include "rclcpp/publisher_factory.hpp" +#include "rclcpp/subscription_factory.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -77,60 +79,16 @@ Node::create_publisher( auto publisher_options = rcl_publisher_get_default_options(); publisher_options.qos = qos_profile; - auto message_alloc = - std::make_shared( - *allocator.get()); - publisher_options.allocator = allocator::get_rcl_allocator( - *message_alloc.get()); - - auto publisher = std::make_shared( - node_base_->get_shared_rcl_node_handle(), topic_name, publisher_options, message_alloc); - - if (use_intra_process_comms_) { - auto context = node_base_->get_context(); - auto intra_process_manager = - context->get_sub_context(); - uint64_t intra_process_publisher_id = - intra_process_manager->add_publisher(publisher); - rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; - // *INDENT-OFF* - auto shared_publish_callback = - [weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t - { - auto ipm = weak_ipm.lock(); - if (!ipm) { - // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning? - throw std::runtime_error( - "intra process publish called after destruction of intra process manager"); - } - if (!msg) { - throw std::runtime_error("cannot publisher msg which is a null pointer"); - } - auto & message_type_info = typeid(MessageT); - if (message_type_info != type_info) { - throw std::runtime_error( - std::string("published type '") + type_info.name() + - "' is incompatible from the publisher type '" + message_type_info.name() + "'"); - } - MessageT * typed_message_ptr = static_cast(msg); - using MessageDeleter = typename publisher::Publisher::MessageDeleter; - std::unique_ptr unique_msg(typed_message_ptr); - uint64_t message_seq = - ipm->store_intra_process_message(publisher_id, unique_msg); - return message_seq; - }; - // *INDENT-ON* - publisher->setup_intra_process( - intra_process_publisher_id, - shared_publish_callback, - publisher_options); - } - if (rcl_trigger_guard_condition(node_base_->get_notify_guard_condition()) != RCL_RET_OK) { - throw std::runtime_error( - std::string( - "Failed to notify waitset on publisher creation: ") + rmw_get_error_string()); - } - return publisher; + auto pub = node_topics_->create_publisher( + topic_name, + rclcpp::create_publisher_factory(allocator), + publisher_options, + use_intra_process_comms_); + node_topics_->add_publisher( + pub, + // this is the callback group, not currently used or exposed to the user for publishers + nullptr); + return std::dynamic_pointer_cast(pub); } template @@ -149,92 +107,25 @@ Node::create_subscription( allocator = std::make_shared(); } - rclcpp::subscription::AnySubscriptionCallback any_subscription_callback(allocator); - any_subscription_callback.set(std::forward(callback)); - if (!msg_mem_strat) { msg_mem_strat = rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); } - auto message_alloc = - std::make_shared::MessageAlloc>(); auto subscription_options = rcl_subscription_get_default_options(); subscription_options.qos = qos_profile; - subscription_options.allocator = rclcpp::allocator::get_rcl_allocator( - *message_alloc.get()); subscription_options.ignore_local_publications = ignore_local_publications; - using rclcpp::subscription::Subscription; - using rclcpp::subscription::SubscriptionBase; + using SubscriptionT = rclcpp::subscription::Subscription; + auto factory = rclcpp::create_subscription_factory( + callback, msg_mem_strat, allocator); - auto sub = Subscription::make_shared( - node_base_->get_shared_rcl_node_handle(), + auto sub = node_topics_->create_subscription( topic_name, + factory, subscription_options, - any_subscription_callback, - msg_mem_strat); - auto sub_base_ptr = std::dynamic_pointer_cast(sub); - // Setup intra process. - if (use_intra_process_comms_) { - auto intra_process_options = rcl_subscription_get_default_options(); - intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator( - *message_alloc.get()); - intra_process_options.qos = qos_profile; - intra_process_options.ignore_local_publications = false; - - auto context = node_base_->get_context(); - auto intra_process_manager = - context->get_sub_context(); - rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; - uint64_t intra_process_subscription_id = - intra_process_manager->add_subscription(sub_base_ptr); - // *INDENT-OFF* - sub->setup_intra_process( - intra_process_subscription_id, - [weak_ipm]( - uint64_t publisher_id, - uint64_t message_sequence, - uint64_t subscription_id, - typename Subscription::MessageUniquePtr & message) - { - auto ipm = weak_ipm.lock(); - if (!ipm) { - // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning? - throw std::runtime_error( - "intra process take called after destruction of intra process manager"); - } - ipm->take_intra_process_message( - publisher_id, message_sequence, subscription_id, message); - }, - [weak_ipm](const rmw_gid_t * sender_gid) -> bool { - auto ipm = weak_ipm.lock(); - if (!ipm) { - throw std::runtime_error( - "intra process publisher check called after destruction of intra process manager"); - } - return ipm->matches_any_publishers(sender_gid); - }, - intra_process_options - ); - // *INDENT-ON* - } - // Assign to a group. - if (group) { - if (!group_in_node(group)) { - // TODO(jacquelinekay): use custom exception - throw std::runtime_error("Cannot create subscription, group not in node."); - } - group->add_subscription(sub_base_ptr); - } else { - node_base_->get_default_callback_group()->add_subscription(sub_base_ptr); - } - if (rcl_trigger_guard_condition(node_base_->get_notify_guard_condition()) != RCL_RET_OK) { - throw std::runtime_error( - std::string( - "Failed to notify waitset on subscription creation: ") + rmw_get_error_string()); - } + use_intra_process_comms_); + node_topics_->add_subscription(sub, group); return sub; } diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp new file mode 100644 index 0000000000..685aa4b1a1 --- /dev/null +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp @@ -0,0 +1,91 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__NODE_INTERFACES__NODE_TOPICS_HPP_ +#define RCLCPP__NODE_INTERFACES__NODE_TOPICS_HPP_ + +#include + +#include "rcl/publisher.h" +#include "rcl/subscription.h" + +#include "rclcpp/macros.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/node_interfaces/node_topics_interface.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/publisher_factory.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace node_interfaces +{ + +/// Implementation of the NodeTopicsInterface, which is part of the Node API. +class NodeTopics : public NodeTopicsInterface +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTopicsInterface) + + RCLCPP_PUBLIC + explicit + NodeTopics(rclcpp::node_interfaces::NodeBaseInterface * node_base); + + RCLCPP_PUBLIC + virtual + ~NodeTopics(); + + RCLCPP_PUBLIC + virtual + rclcpp::publisher::PublisherBase::SharedPtr + create_publisher( + const std::string & topic_name, + const rclcpp::PublisherFactory & publisher_factory, + const rcl_publisher_options_t & publisher_options, + bool use_intra_process); + + RCLCPP_PUBLIC + virtual + void + add_publisher( + rclcpp::publisher::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group); + + RCLCPP_PUBLIC + virtual + rclcpp::subscription::SubscriptionBase::SharedPtr + create_subscription( + const std::string & topic_name, + const rclcpp::SubscriptionFactory & subscription_factory, + const rcl_subscription_options_t & subscription_options, + bool use_intra_process); + + RCLCPP_PUBLIC + virtual + void + add_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group); + +private: + RCLCPP_DISABLE_COPY(NodeTopics); + + NodeBaseInterface * node_base_; + +}; + +} // namespace node_interfaces +} // namespace rclcpp + +#endif // RCLCPP__NODE_INTERFACES__NODE_TOPICS_HPP_ diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp new file mode 100644 index 0000000000..2abe88aa62 --- /dev/null +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp @@ -0,0 +1,80 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__NODE_INTERFACES__NODE_TOPICS_INTERFACE_HPP_ +#define RCLCPP__NODE_INTERFACES__NODE_TOPICS_INTERFACE_HPP_ + +#include +#include +#include + +#include "rcl/publisher.h" +#include "rcl/subscription.h" + +#include "rclcpp/macros.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/publisher_factory.hpp" +#include "rclcpp/subscription.hpp" +#include "rclcpp/subscription_factory.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace node_interfaces +{ + +/// Pure virtual interface class for the NodeTopics part of the Node API. +class NodeTopicsInterface +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTopicsInterface) + + RCLCPP_PUBLIC + virtual + rclcpp::publisher::PublisherBase::SharedPtr + create_publisher( + const std::string & topic_name, + const rclcpp::PublisherFactory & publisher_factory, + const rcl_publisher_options_t & publisher_options, + bool use_intra_process) = 0; + + RCLCPP_PUBLIC + virtual + void + add_publisher( + rclcpp::publisher::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0; + + RCLCPP_PUBLIC + virtual + rclcpp::subscription::SubscriptionBase::SharedPtr + create_subscription( + const std::string & topic_name, + const rclcpp::SubscriptionFactory & subscription_factory, + const rcl_subscription_options_t & subscription_options, + bool use_intra_process) = 0; + + RCLCPP_PUBLIC + virtual + void + add_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0; + +}; + +} // namespace node_interfaces +} // namespace rclcpp + +#endif // RCLCPP__NODE_INTERFACES__NODE_TOPICS_INTERFACE_HPP_ diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 44bb4768a3..975d1aef20 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -28,28 +28,30 @@ #include "rcl/publisher.h" #include "rcl_interfaces/msg/intra_process_message.hpp" -#include "rmw/impl/cpp/demangle.hpp" #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp { -// Forward declaration for friend statement -namespace node +// Forward declaration is used for friend statement. +namespace node_interfaces { -class Node; -} // namespace node +class NodeTopicsInterface; +} namespace publisher { class PublisherBase { + friend rclcpp::node_interfaces::NodeTopicsInterface; + public: RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase); @@ -57,15 +59,17 @@ class PublisherBase /** * Typically, a publisher is not created through this method, but instead is created through a * call to `Node::create_publisher`. - * \param[in] node_handle The corresponding rcl representation of the owner node. + * \param[in] node_base A pointer to the NodeBaseInterface for the parent node. * \param[in] topic The topic that this publisher publishes on. - * \param[in] queue_size The maximum number of unpublished messages to queue. + * \param[in] type_support The type support structure for the type to be published. + * \param[in] publisher_options QoS settings for this publisher. */ RCLCPP_PUBLIC PublisherBase( - std::shared_ptr node_handle, - std::string topic, - size_t queue_size); + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rosidl_message_type_support_t & type_support, + const rcl_publisher_options_t & publisher_options); RCLCPP_PUBLIC virtual ~PublisherBase(); @@ -73,7 +77,7 @@ class PublisherBase /// Get the topic that this publisher publishes on. // \return The topic name. RCLCPP_PUBLIC - const std::string & + const char * get_topic_name() const; /// Get the queue size for this publisher. @@ -114,9 +118,9 @@ class PublisherBase bool operator==(const rmw_gid_t * gid) const; - typedef std::function StoreMessageCallbackT; + using StoreMessageCallbackT = std::function; -protected: + /// Implementation utility function used to setup intra process publishing after creation. RCLCPP_PUBLIC void setup_intra_process( @@ -124,14 +128,11 @@ class PublisherBase StoreMessageCallbackT callback, const rcl_publisher_options_t & intra_process_options); - std::shared_ptr node_handle_; +protected: + std::shared_ptr rcl_node_handle_; rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); - rcl_allocator_t rcl_allocator_ = rcl_get_default_allocator(); - - std::string topic_; - size_t queue_size_; uint64_t intra_process_publisher_id_; StoreMessageCallbackT store_intra_process_message_; @@ -144,8 +145,6 @@ class PublisherBase template> class Publisher : public PublisherBase { - friend rclcpp::node::Node; - public: using MessageAllocTraits = allocator::AllocRebind; using MessageAlloc = typename MessageAllocTraits::allocator_type; @@ -155,55 +154,22 @@ class Publisher : public PublisherBase RCLCPP_SMART_PTR_DEFINITIONS(Publisher); Publisher( - std::shared_ptr node_handle, - std::string topic, + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, const rcl_publisher_options_t & publisher_options, - std::shared_ptr allocator) - : PublisherBase(node_handle, topic, publisher_options.qos.depth), message_allocator_(allocator) + const std::shared_ptr & allocator) + : PublisherBase( + node_base, + topic, + *rosidl_generator_cpp::get_message_type_support_handle(), + publisher_options), + message_allocator_(allocator) { - using rosidl_generator_cpp::get_message_type_support_handle; allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); - - rcl_allocator_ = publisher_options.allocator; - auto type_support_handle = get_message_type_support_handle(); - if (rcl_publisher_init( - &publisher_handle_, node_handle_.get(), type_support_handle, - topic.c_str(), &publisher_options) != RCL_RET_OK) - { - throw std::runtime_error( - std::string("could not create publisher: ") + - rcl_get_error_string_safe()); - } - // Life time of this object is tied to the publisher handle. - rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(&publisher_handle_); - if (!publisher_rmw_handle) { - throw std::runtime_error( - std::string("failed to get rmw handle: ") + rcl_get_error_string_safe()); - } - if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here) - throw std::runtime_error( - std::string("failed to get publisher gid: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } } virtual ~Publisher() - { - if (rcl_publisher_fini(&intra_process_publisher_handle_, node_handle_.get()) != RCL_RET_OK) { - fprintf( - stderr, - "Error in destruction of intra process rcl publisher handle: %s\n", - rcl_get_error_string_safe()); - } - - if (rcl_publisher_fini(&publisher_handle_, node_handle_.get()) != RCL_RET_OK) { - fprintf( - stderr, - "Error in destruction of rcl publisher handle: %s\n", - rcl_get_error_string_safe()); - } - } + {} /// Send a message to the topic for this publisher. /** @@ -319,6 +285,7 @@ class Publisher : public PublisherBase }; } // namespace publisher + } // namespace rclcpp #endif // RCLCPP__PUBLISHER_HPP_ diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp new file mode 100644 index 0000000000..2fa851922a --- /dev/null +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -0,0 +1,146 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__PUBLISHER_FACTORY_HPP_ +#define RCLCPP__PUBLISHER_FACTORY_HPP_ + +#include +#include + +#include "rcl/publisher.h" + +#include "rosidl_generator_cpp/message_type_support.hpp" + +#include "rclcpp/publisher.hpp" +#include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +/// Factory with functions used to create a MessageT specific PublisherT. +/* This factory class is used to encapsulate the template generated functions + * which are used during the creation of a Message type specific publisher + * within a non-templated class. + * + * It is created using the create_publisher_factory function, which is usually + * called from a templated "create_publisher" method on the Node class, and + * is passed to the non-templated "create_publisher" method on the NodeTopics + * class where it is used to create and setup the Publisher. + */ +struct PublisherFactory +{ + // Creates a PublisherT publisher object and returns it as a PublisherBase. + using PublisherFactoryFunction = std::function< + rclcpp::publisher::PublisherBase::SharedPtr ( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + rcl_publisher_options_t & publisher_options)>; + + PublisherFactoryFunction create_typed_publisher; + + // Adds the PublisherBase to the intraprocess manager with the correctly + // templated call to IntraProcessManager::store_intra_process_message. + using AddPublisherToIntraProcessManagerFunction = std::function< + uint64_t ( + rclcpp::intra_process_manager::IntraProcessManager * ipm, + rclcpp::publisher::PublisherBase::SharedPtr publisher)>; + + AddPublisherToIntraProcessManagerFunction add_publisher_to_intra_process_manager; + + // Creates the callback function which is called on each + // PublisherT::publish() and which handles the intra process transmission of + // the message being published. + using SharedPublishCallbackFactoryFunction = std::function< + rclcpp::publisher::PublisherBase::StoreMessageCallbackT ( + rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm)>; + + SharedPublishCallbackFactoryFunction create_shared_publish_callback; +}; + +/// Return a PublisherFactory with functions setup for creating a PublisherT. +template +PublisherFactory +create_publisher_factory(std::shared_ptr allocator) +{ + PublisherFactory factory; + + // factory function that creates a MessageT specific PublisherT + factory.create_typed_publisher = + [allocator]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + rcl_publisher_options_t & publisher_options) -> std::shared_ptr + { + auto message_alloc = std::make_shared(*allocator.get()); + publisher_options.allocator = allocator::get_rcl_allocator(*message_alloc.get()); + + return std::make_shared(node_base, topic_name, publisher_options, message_alloc); + }; + + // function to add a publisher to the intra process manager + factory.add_publisher_to_intra_process_manager = + []( + rclcpp::intra_process_manager::IntraProcessManager * ipm, + rclcpp::publisher::PublisherBase::SharedPtr publisher) -> uint64_t + { + return ipm->add_publisher(std::dynamic_pointer_cast(publisher)); + }; + + // function to create a shared publish callback std::function + using StoreMessageCallbackT = rclcpp::publisher::PublisherBase::StoreMessageCallbackT; + factory.create_shared_publish_callback = + [](rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm) -> StoreMessageCallbackT + { + rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm; + + // this function is called on each call to publish() and handles storing + // of the published message in the intra process manager + auto shared_publish_callback = + [weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t + { + auto ipm = weak_ipm.lock(); + if (!ipm) { + // TODO(wjwwood): should this just return silently? Or maybe return with a warning? + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + if (!msg) { + throw std::runtime_error("cannot publisher msg which is a null pointer"); + } + auto & message_type_info = typeid(MessageT); + if (message_type_info != type_info) { + throw std::runtime_error( + std::string("published type '") + type_info.name() + + "' is incompatible from the publisher type '" + message_type_info.name() + "'"); + } + MessageT * typed_message_ptr = static_cast(msg); + using MessageDeleter = typename publisher::Publisher::MessageDeleter; + std::unique_ptr unique_msg(typed_message_ptr); + uint64_t message_seq = + ipm->store_intra_process_message(publisher_id, unique_msg); + return message_seq; + }; + + return shared_publish_callback; + }; + + // return the factory now that it is populated + return factory; +} + +} // namespace rclcpp + +#endif // RCLCPP__PUBLISHER_FACTORY_HPP_ diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp new file mode 100644 index 0000000000..dfa1d723e6 --- /dev/null +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -0,0 +1,168 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SUBSCRIPTION_FACTORY_HPP_ +#define RCLCPP__SUBSCRIPTION_FACTORY_HPP_ + +#include +#include + +#include "rcl/subscription.h" + +#include "rosidl_generator_cpp/message_type_support.hpp" + +#include "rclcpp/subscription.hpp" +#include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +/// Factory with functions used to create a Subscription. +/* This factory class is used to encapsulate the template generated functions + * which are used during the creation of a Message type specific subscription + * within a non-templated class. + * + * It is created using the create_subscription_factory function, which is + * usually called from a templated "create_subscription" method of the Node + * class, and is passed to the non-templated "create_subscription" method of + * the NodeTopics class where it is used to create and setup the Subscription. + */ +struct SubscriptionFactory +{ + // Creates a Subscription object and returns it as a SubscriptionBase. + using SubscriptionFactoryFunction = std::function< + rclcpp::subscription::SubscriptionBase::SharedPtr ( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rcl_subscription_options_t & subscription_options)>; + + SubscriptionFactoryFunction create_typed_subscription; + + // Function that takes a MessageT from the intra process manager + using SetupIntraProcessFunction = std::function; + + SetupIntraProcessFunction setup_intra_process; +}; + +/// Return a SubscriptionFactory with functions for creating a SubscriptionT. +template +SubscriptionFactory +create_subscription_factory( + CallbackT && callback, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat, + std::shared_ptr allocator) +{ + SubscriptionFactory factory; + + using rclcpp::subscription::AnySubscriptionCallback; + AnySubscriptionCallback any_subscription_callback(allocator); + any_subscription_callback.set(std::forward(callback)); + + auto message_alloc = + std::make_shared::MessageAlloc>(); + + // factory function that creates a MessageT specific SubscriptionT + factory.create_typed_subscription = + [allocator, msg_mem_strat, any_subscription_callback, message_alloc]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rcl_subscription_options_t & subscription_options + ) -> rclcpp::subscription::SubscriptionBase::SharedPtr + { + subscription_options.allocator = + rclcpp::allocator::get_rcl_allocator(*message_alloc.get()); + + using rclcpp::subscription::Subscription; + using rclcpp::subscription::SubscriptionBase; + + auto sub = Subscription::make_shared( + node_base->get_shared_rcl_node_handle(), + topic_name, + subscription_options, + any_subscription_callback, + msg_mem_strat); + auto sub_base_ptr = std::dynamic_pointer_cast(sub); + return sub_base_ptr; + }; + + // function that will setup intra process communications for the subscription + factory.setup_intra_process = + [message_alloc]( + rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm, + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const rcl_subscription_options_t & subscription_options) + { + rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm; + uint64_t intra_process_subscription_id = ipm->add_subscription(subscription); + + auto intra_process_options = rcl_subscription_get_default_options(); + intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator( + *message_alloc.get()); + intra_process_options.qos = subscription_options.qos; + intra_process_options.ignore_local_publications = false; + + // function that will be called to take a MessageT from the intra process manager + auto take_intra_process_message_func = + [weak_ipm]( + uint64_t publisher_id, + uint64_t message_sequence, + uint64_t subscription_id, + typename rclcpp::subscription::Subscription::MessageUniquePtr & message) + { + auto ipm = weak_ipm.lock(); + if (!ipm) { + // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning? + throw std::runtime_error( + "intra process take called after destruction of intra process manager"); + } + ipm->take_intra_process_message( + publisher_id, message_sequence, subscription_id, message); + }; + + // function that is called to see if the publisher id matches any local publishers + auto matches_any_publisher_func = + [weak_ipm](const rmw_gid_t * sender_gid) -> bool + { + auto ipm = weak_ipm.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publisher check called after destruction of intra process manager"); + } + return ipm->matches_any_publishers(sender_gid); + }; + + auto typed_sub_ptr = + std::dynamic_pointer_cast>(subscription); + typed_sub_ptr->setup_intra_process( + intra_process_subscription_id, + take_intra_process_message_func, + matches_any_publisher_func, + intra_process_options + ); + }; + // end definition of factory function to setup intra process + + // return the factory now that it is populated + return factory; +} + +} // namespace rclcpp + +#endif // RCLCPP__SUBSCRIPTION_FACTORY_HPP_ diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index 7a070cd852..bf1e3ab15c 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -41,7 +41,7 @@ Node::Node( rclcpp::context::Context::SharedPtr context, bool use_intra_process_comms) : node_base_(new rclcpp::node_interfaces::NodeBase(node_name, context)), - number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0), number_of_clients_(0), + number_of_timers_(0), number_of_services_(0), number_of_clients_(0), use_intra_process_comms_(use_intra_process_comms), graph_listener_(context->get_sub_context()), should_add_to_graph_listener_(true), graph_users_count_(0) diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp new file mode 100644 index 0000000000..0d2aaa8842 --- /dev/null +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -0,0 +1,125 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/node_interfaces/node_topics.hpp" + +#include "rclcpp/exceptions.hpp" + +using rclcpp::exceptions::throw_from_rcl_error; + +using namespace rclcpp::node_interfaces; + +NodeTopics::NodeTopics(NodeBaseInterface * node_base) +: node_base_(node_base) +{} + +NodeTopics::~NodeTopics() +{} + +rclcpp::publisher::PublisherBase::SharedPtr +NodeTopics::create_publisher( + const std::string & topic_name, + const rclcpp::PublisherFactory & publisher_factory, + const rcl_publisher_options_t & publisher_options, + bool use_intra_process) +{ + // Create the MessageT specific Publisher using the factory, but store it as PublisherBase. + auto publisher = publisher_factory.create_typed_publisher( + node_base_, topic_name, publisher_options); + + // Setup intra process publishing if requested. + if (use_intra_process) { + auto context = node_base_->get_context(); + // Get the intra process manager instance for this context. + auto ipm = context->get_sub_context(); + // Register the publisher with the intra process manager. + uint64_t intra_process_publisher_id = + publisher_factory.add_publisher_to_intra_process_manager(ipm.get(), publisher); + // Create a function to be called when publisher to do the intra process publish. + auto shared_publish_callback = publisher_factory.create_shared_publish_callback(ipm); + publisher->setup_intra_process( + intra_process_publisher_id, + shared_publish_callback, + publisher_options); + } + + // Return the completed publisher. + return publisher; +} + +void +NodeTopics::add_publisher( + rclcpp::publisher::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) +{ + // Notify the executor that a new publisher was created using the parent Node. + { + auto notify_guard_condition_lock = node_base_->acquire_notify_guard_condition_lock(); + if (rcl_trigger_guard_condition(node_base_->get_notify_guard_condition()) != RCL_RET_OK) { + throw std::runtime_error( + std::string("Failed to notify waitset on publisher creation: ") + rmw_get_error_string()); + } + } +} + +rclcpp::subscription::SubscriptionBase::SharedPtr +NodeTopics::create_subscription( + const std::string & topic_name, + const rclcpp::SubscriptionFactory & subscription_factory, + const rcl_subscription_options_t & subscription_options, + bool use_intra_process) +{ + auto subscription = subscription_factory.create_typed_subscription( + node_base_, topic_name, subscription_options); + + // Setup intra process publishing if requested. + if (use_intra_process) { + auto context = node_base_->get_context(); + auto intra_process_manager = + context->get_sub_context(); + subscription_factory.setup_intra_process( + intra_process_manager, subscription, subscription_options); + } + + // Return the completed subscription. + return subscription; +} + +void +NodeTopics::add_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) +{ + // Assign to a group. + if (callback_group) { + if (!node_base_->callback_group_in_node(callback_group)) { + // TODO(jacquelinekay): use custom exception + throw std::runtime_error("Cannot create subscription, callback group not in node."); + } + callback_group->add_subscription(subscription); + } else { + node_base_->get_default_callback_group()->add_subscription(subscription); + } + + // Notify the executor that a new subscription was created using the parent Node. + { + auto notify_guard_condition_lock = node_base_->acquire_notify_guard_condition_lock(); + if (rcl_trigger_guard_condition(node_base_->get_notify_guard_condition()) != RCL_RET_OK) { + throw std::runtime_error( + std::string("Failed to notify waitset on subscription creation: ") + rmw_get_error_string() + ); + } + } +} diff --git a/rclcpp/src/rclcpp/publisher.cpp b/rclcpp/src/rclcpp/publisher.cpp index 53e948cbb4..6d7fbf1d01 100644 --- a/rclcpp/src/rclcpp/publisher.cpp +++ b/rclcpp/src/rclcpp/publisher.cpp @@ -29,33 +29,72 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/node.hpp" using rclcpp::publisher::PublisherBase; PublisherBase::PublisherBase( - std::shared_ptr node_handle, - std::string topic, - size_t queue_size) -: node_handle_(node_handle), - topic_(topic), queue_size_(queue_size), + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rosidl_message_type_support_t & type_support, + const rcl_publisher_options_t & publisher_options) +: rcl_node_handle_(node_base->get_shared_rcl_node_handle()), intra_process_publisher_id_(0), store_intra_process_message_(nullptr) { + if (rcl_publisher_init( + &publisher_handle_, rcl_node_handle_.get(), &type_support, + topic.c_str(), &publisher_options) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("could not create publisher: ") + + rcl_get_error_string_safe()); + } + // Life time of this object is tied to the publisher handle. + rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(&publisher_handle_); + if (!publisher_rmw_handle) { + throw std::runtime_error( + std::string("failed to get rmw handle: ") + rcl_get_error_string_safe()); + } + if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here) + throw std::runtime_error( + std::string("failed to get publisher gid: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } } PublisherBase::~PublisherBase() { + if (rcl_publisher_fini(&intra_process_publisher_handle_, rcl_node_handle_.get()) != RCL_RET_OK) { + fprintf( + stderr, + "Error in destruction of intra process rcl publisher handle: %s\n", + rcl_get_error_string_safe()); + } + + if (rcl_publisher_fini(&publisher_handle_, rcl_node_handle_.get()) != RCL_RET_OK) { + fprintf( + stderr, + "Error in destruction of rcl publisher handle: %s\n", + rcl_get_error_string_safe()); + } } -const std::string & +const char * PublisherBase::get_topic_name() const { - return topic_; + return rcl_publisher_get_topic_name(&publisher_handle_); } size_t PublisherBase::get_queue_size() const { - return queue_size_; + const rcl_publisher_options_t * publisher_options = rcl_publisher_get_options(&publisher_handle_); + if (!publisher_options) { + throw std::runtime_error( + std::string("failed to get publisher options: ") + rcl_get_error_string_safe()); + } + return publisher_options->qos.depth; } const rmw_gid_t & @@ -101,10 +140,11 @@ PublisherBase::setup_intra_process( StoreMessageCallbackT callback, const rcl_publisher_options_t & intra_process_options) { + auto intra_process_topic_name = std::string(this->get_topic_name()) + "__intra"; if (rcl_publisher_init( - &intra_process_publisher_handle_, node_handle_.get(), + &intra_process_publisher_handle_, rcl_node_handle_.get(), rclcpp::type_support::get_intra_process_message_msg_type_support(), - (topic_ + "__intra").c_str(), &intra_process_options) != RCL_RET_OK) + intra_process_topic_name.c_str(), &intra_process_options) != RCL_RET_OK) { throw std::runtime_error( std::string("could not create intra process publisher: ") +