Skip to content

Commit

Permalink
create NodeTopics interface, refactor pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
wjwwood committed Dec 1, 2016
1 parent 3f12ab5 commit 42bc080
Show file tree
Hide file tree
Showing 11 changed files with 725 additions and 203 deletions.
10 changes: 9 additions & 1 deletion rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ namespace rclcpp
// Forward declarations for friend statement in class CallbackGroup
namespace node
{
// TODO(wjwwood): remove this and the friend decl when refactoring of Node is done.
class Node;
} // namespace node

// Forward declarations for friend statement in class CallbackGroup
namespace node_interfaces
{
class NodeTopics;
} // namespace node_interfaces

namespace callback_group
{

Expand All @@ -47,6 +54,7 @@ enum class CallbackGroupType
class CallbackGroup
{
friend class rclcpp::node::Node;
friend class rclcpp::node_interfaces::NodeTopics;

public:
RCLCPP_SMART_PTR_DEFINITIONS(CallbackGroup);
Expand Down Expand Up @@ -78,7 +86,7 @@ class CallbackGroup
const CallbackGroupType &
type() const;

private:
protected:
RCLCPP_DISABLE_COPY(CallbackGroup);

RCLCPP_PUBLIC
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "rclcpp/macros.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/node_interfaces/node_base.hpp"
#include "rclcpp/node_interfaces/node_topics.hpp"
#include "rclcpp/parameter.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/service.hpp"
Expand Down Expand Up @@ -382,6 +383,10 @@ class Node : public std::enable_shared_from_this<Node>
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
get_node_base_interface();

RCLCPP_PUBLIC
rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr
get_node_topics_interface();

private:
RCLCPP_DISABLE_COPY(Node);

Expand All @@ -390,6 +395,7 @@ class Node : public std::enable_shared_from_this<Node>
group_in_node(callback_group::CallbackGroup::SharedPtr group);

rclcpp::node_interfaces::NodeBase::SharedPtr node_base_;
rclcpp::node_interfaces::NodeTopics::SharedPtr node_topics_;

size_t number_of_timers_;
size_t number_of_services_;
Expand Down
147 changes: 19 additions & 128 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/parameter.hpp"
#include "rclcpp/publisher_factory.hpp"
#include "rclcpp/subscription_factory.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -77,60 +79,16 @@ Node::create_publisher(
auto publisher_options = rcl_publisher_get_default_options();
publisher_options.qos = qos_profile;

auto message_alloc =
std::make_shared<typename PublisherT::MessageAlloc>(
*allocator.get());
publisher_options.allocator = allocator::get_rcl_allocator<MessageT>(
*message_alloc.get());

auto publisher = std::make_shared<PublisherT>(
node_base_->get_shared_rcl_node_handle(), topic_name, publisher_options, message_alloc);

if (use_intra_process_comms_) {
auto context = node_base_->get_context();
auto intra_process_manager =
context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
uint64_t intra_process_publisher_id =
intra_process_manager->add_publisher<MessageT, Alloc>(publisher);
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
// *INDENT-OFF*
auto shared_publish_callback =
[weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publisher msg which is a null pointer");
}
auto & message_type_info = typeid(MessageT);
if (message_type_info != type_info) {
throw std::runtime_error(
std::string("published type '") + type_info.name() +
"' is incompatible from the publisher type '" + message_type_info.name() + "'");
}
MessageT * typed_message_ptr = static_cast<MessageT *>(msg);
using MessageDeleter = typename publisher::Publisher<MessageT, Alloc>::MessageDeleter;
std::unique_ptr<MessageT, MessageDeleter> unique_msg(typed_message_ptr);
uint64_t message_seq =
ipm->store_intra_process_message<MessageT, Alloc>(publisher_id, unique_msg);
return message_seq;
};
// *INDENT-ON*
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
publisher_options);
}
if (rcl_trigger_guard_condition(node_base_->get_notify_guard_condition()) != RCL_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on publisher creation: ") + rmw_get_error_string());
}
return publisher;
auto pub = node_topics_->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, Alloc, PublisherT>(allocator),
publisher_options,
use_intra_process_comms_);
node_topics_->add_publisher(
pub,
// this is the callback group, not currently used or exposed to the user for publishers
nullptr);
return std::dynamic_pointer_cast<PublisherT>(pub);
}

template<typename MessageT, typename CallbackT, typename Alloc>
Expand All @@ -149,92 +107,25 @@ Node::create_subscription(
allocator = std::make_shared<Alloc>();
}

rclcpp::subscription::AnySubscriptionCallback<MessageT,
Alloc> any_subscription_callback(allocator);
any_subscription_callback.set(std::forward<CallbackT>(callback));

if (!msg_mem_strat) {
msg_mem_strat =
rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::create_default();
}
auto message_alloc =
std::make_shared<typename subscription::Subscription<MessageT, Alloc>::MessageAlloc>();

auto subscription_options = rcl_subscription_get_default_options();
subscription_options.qos = qos_profile;
subscription_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
*message_alloc.get());
subscription_options.ignore_local_publications = ignore_local_publications;

using rclcpp::subscription::Subscription;
using rclcpp::subscription::SubscriptionBase;
using SubscriptionT = rclcpp::subscription::Subscription<MessageT>;
auto factory = rclcpp::create_subscription_factory<MessageT, CallbackT, Alloc, SubscriptionT>(
callback, msg_mem_strat, allocator);

auto sub = Subscription<MessageT, Alloc>::make_shared(
node_base_->get_shared_rcl_node_handle(),
auto sub = node_topics_->create_subscription(
topic_name,
factory,
subscription_options,
any_subscription_callback,
msg_mem_strat);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
// Setup intra process.
if (use_intra_process_comms_) {
auto intra_process_options = rcl_subscription_get_default_options();
intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
*message_alloc.get());
intra_process_options.qos = qos_profile;
intra_process_options.ignore_local_publications = false;

auto context = node_base_->get_context();
auto intra_process_manager =
context->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
uint64_t intra_process_subscription_id =
intra_process_manager->add_subscription(sub_base_ptr);
// *INDENT-OFF*
sub->setup_intra_process(
intra_process_subscription_id,
[weak_ipm](
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
typename Subscription<MessageT, Alloc>::MessageUniquePtr & message)
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message<MessageT, Alloc>(
publisher_id, message_sequence, subscription_id, message);
},
[weak_ipm](const rmw_gid_t * sender_gid) -> bool {
auto ipm = weak_ipm.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publisher check called after destruction of intra process manager");
}
return ipm->matches_any_publishers(sender_gid);
},
intra_process_options
);
// *INDENT-ON*
}
// Assign to a group.
if (group) {
if (!group_in_node(group)) {
// TODO(jacquelinekay): use custom exception
throw std::runtime_error("Cannot create subscription, group not in node.");
}
group->add_subscription(sub_base_ptr);
} else {
node_base_->get_default_callback_group()->add_subscription(sub_base_ptr);
}
if (rcl_trigger_guard_condition(node_base_->get_notify_guard_condition()) != RCL_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on subscription creation: ") + rmw_get_error_string());
}
use_intra_process_comms_);
node_topics_->add_subscription(sub, group);
return sub;
}

Expand Down
91 changes: 91 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_topics.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__NODE_INTERFACES__NODE_TOPICS_HPP_
#define RCLCPP__NODE_INTERFACES__NODE_TOPICS_HPP_

#include <string>

#include "rcl/publisher.h"
#include "rcl/subscription.h"

#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_factory.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{
namespace node_interfaces
{

/// Implementation of the NodeTopicsInterface, which is part of the Node API.
class NodeTopics : public NodeTopicsInterface
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTopicsInterface)

RCLCPP_PUBLIC
explicit
NodeTopics(rclcpp::node_interfaces::NodeBaseInterface * node_base);

RCLCPP_PUBLIC
virtual
~NodeTopics();

RCLCPP_PUBLIC
virtual
rclcpp::publisher::PublisherBase::SharedPtr
create_publisher(
const std::string & topic_name,
const rclcpp::PublisherFactory & publisher_factory,
const rcl_publisher_options_t & publisher_options,
bool use_intra_process);

RCLCPP_PUBLIC
virtual
void
add_publisher(
rclcpp::publisher::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);

RCLCPP_PUBLIC
virtual
rclcpp::subscription::SubscriptionBase::SharedPtr
create_subscription(
const std::string & topic_name,
const rclcpp::SubscriptionFactory & subscription_factory,
const rcl_subscription_options_t & subscription_options,
bool use_intra_process);

RCLCPP_PUBLIC
virtual
void
add_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);

private:
RCLCPP_DISABLE_COPY(NodeTopics);

NodeBaseInterface * node_base_;

};

} // namespace node_interfaces
} // namespace rclcpp

#endif // RCLCPP__NODE_INTERFACES__NODE_TOPICS_HPP_
Loading

0 comments on commit 42bc080

Please sign in to comment.