From 81040198e18e79e3a94e85d9c62a6422a0695d91 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Thu, 11 Dec 2014 17:07:36 -0800 Subject: [PATCH 1/9] Added support for services --- rclcpp/include/rclcpp/callback_group.hpp | 8 ++ rclcpp/include/rclcpp/client.hpp | 65 +++++++++ rclcpp/include/rclcpp/executor.hpp | 170 +++++++++++++++++++++++ rclcpp/include/rclcpp/node.hpp | 17 +++ rclcpp/include/rclcpp/node_impl.hpp | 64 ++++++++- rclcpp/include/rclcpp/service.hpp | 113 +++++++++++++++ 6 files changed, 436 insertions(+), 1 deletion(-) create mode 100644 rclcpp/include/rclcpp/client.hpp create mode 100644 rclcpp/include/rclcpp/service.hpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index bfe44319c6..82023e0994 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -22,6 +22,7 @@ #include #include +#include namespace rclcpp { @@ -62,9 +63,16 @@ class CallbackGroup timer_ptrs_.push_back(timer_ptr); } + void + add_service(const service::ServiceBase::SharedPtr &service_ptr) + { + service_ptrs_.push_back(service_ptr); + } + CallbackGroupType type_; std::vector subscription_ptrs_; std::vector timer_ptrs_; + std::vector service_ptrs_; std::atomic_bool can_be_taken_from_; }; diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp new file mode 100644 index 0000000000..6bda23022c --- /dev/null +++ b/rclcpp/include/rclcpp/client.hpp @@ -0,0 +1,65 @@ +/* Copyright 2014 Open Source Robotics Foundation, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RCLCPP_RCLCPP_CLIENT_HPP_ +#define RCLCPP_RCLCPP_CLIENT_HPP_ + +#include + +#include +#include + +#include + +namespace rclcpp +{ + +// Forward declaration for friend statement +namespace node {class Node;} + +namespace client +{ + +template +class Client +{ +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(Client); + + Client(ros_middleware_interface::ClientHandle client_handle, + std::string& service_name) + : client_handle_(client_handle), service_name_(service_name) + {} + + std::shared_ptr + send_request(std::shared_ptr &req) + { + ::ros_middleware_interface::send_request(client_handle_, req.get()); + + std::shared_ptr res = std::make_shared(); + ::ros_middleware_interface::receive_response(client_handle_, res.get()); + return res; + } + +private: + ros_middleware_interface::ClientHandle client_handle_; + std::string service_name_; + +}; + +} /* namespace client */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_CLIENT_HPP_ */ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 2d14094b14..8de14cf713 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -106,6 +106,7 @@ class Executor // Either the subscription or the timer will be set, but not both rclcpp::subscription::SubscriptionBase::SharedPtr subscription; rclcpp::timer::TimerBase::SharedPtr timer; + rclcpp::service::ServiceBase::SharedPtr service; // These are used to keep the scope on the containing items rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; rclcpp::node::Node::SharedPtr node; @@ -126,6 +127,10 @@ class Executor { execute_subscription(any_exec->subscription); } + if (any_exec->service) + { + execute_service(any_exec->service); + } // Reset the callback_group, regardless of type any_exec->callback_group->can_be_taken_from_.store(true); // Wake the wait, because it may need to be recalculated or work that @@ -161,6 +166,26 @@ class Executor timer->callback_(); } + static void + execute_service( + rclcpp::service::ServiceBase::SharedPtr &service) + { + std::shared_ptr request = service->create_request(); + bool taken = ros_middleware_interface::take_request( + service->service_handle_, + request.get()); + if (taken) + { + service->handle_request(request); + } + else + { + std::cout << "[rclcpp::error] take failed for service on service: " + << service->get_service_name() + << std::endl; + } + } + /*** Populate class storage from stored weak node pointers and wait. ***/ void @@ -170,6 +195,7 @@ class Executor bool has_invalid_weak_nodes = false; std::vector subs; std::vector timers; + std::vector services; for (auto &weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -193,6 +219,10 @@ class Executor { timers.push_back(timer); } + for (auto &service : group->service_ptrs_) + { + services.push_back(service); + } } } // Clean up any invalid nodes, if they were detected @@ -224,6 +254,28 @@ class Executor subscription->subscription_handle_.data_; subscriber_handle_index += 1; } + + // Use the number of services to allocate memory in the handles + size_t number_of_services = services.size(); + ros_middleware_interface::ServiceHandles service_handles; + service_handles.service_count_ = number_of_services; + // TODO: Avoid redundant malloc's + service_handles.services_ = static_cast( + std::malloc(sizeof(void *) * number_of_services)); + if (service_handles.services_ == NULL) + { + // TODO: Use a different error here? maybe std::bad_alloc? + throw std::runtime_error("Could not malloc for service pointers."); + } + // Then fill the ServiceHandles with ready services + size_t service_handle_index = 0; + for (auto &service : services) + { + service_handles.services_[service_handle_index] = \ + service->service_handle_.data_; + service_handle_index += 1; + } + // Use the number of guard conditions to allocate memory in the handles // Add 2 to the number for the ctrl-c guard cond and the executor's size_t start_of_timer_guard_conds = 2; @@ -254,9 +306,11 @@ class Executor timer->guard_condition_.data_; guard_cond_handle_index += 1; } + // Now wait on the waitable subscriptions and timers ros_middleware_interface::wait(subscriber_handles, guard_condition_handles, + service_handles, nonblocking); // If ctrl-c guard condition, return directly if (guard_condition_handles.guard_conditions_[0] != 0) @@ -264,6 +318,7 @@ class Executor // Make sure to free memory // TODO: Remove theses when "Avoid redundant malloc's" todo is addressed std::free(subscriber_handles.subscribers_); + std::free(service_handles.services_); std::free(guard_condition_handles.guard_conditions_); return; } @@ -286,9 +341,20 @@ class Executor guard_condition_handles_.push_back(handle); } } + // Then the services + for (size_t i = 0; i < number_of_services; ++i) + { + void *handle = service_handles.services_[i]; + if (handle) + { + service_handles_.push_back(handle); + } + } + // Make sure to free memory // TODO: Remove theses when "Avoid redundant malloc's" todo is addressed std::free(subscriber_handles.subscribers_); + std::free(service_handles.services_); std::free(guard_condition_handles.guard_conditions_); } @@ -352,6 +418,35 @@ class Executor return rclcpp::timer::TimerBase::SharedPtr(); } + rclcpp::service::ServiceBase::SharedPtr + get_service_by_handle(void * service_handle) + { + for (auto weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + if (!group) + { + continue; + } + for (auto service : group->service_ptrs_) + { + if (service->service_handle_.data_ == service_handle) + { + return service; + } + } + } + } + return rclcpp::service::ServiceBase::SharedPtr(); + } + void remove_subscriber_handle_from_subscriber_handles(void *handle) { @@ -364,6 +459,12 @@ class Executor guard_condition_handles_.remove(handle); } + void + remove_service_handle_from_service_handles(void *handle) + { + service_handles_.remove(handle); + } + rclcpp::node::Node::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr &group) { @@ -516,6 +617,67 @@ class Executor } } + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_service( + rclcpp::service::ServiceBase::SharedPtr &service) + { + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + for (auto &serv : group->service_ptrs_) + { + if (serv == service) + { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_service(std::shared_ptr &any_exec) + { + for (auto handle : service_handles_) + { + auto service = get_service_by_handle(handle); + if (service) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_service(service); + if (!group) + { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + remove_service_handle_from_service_handles(handle); + continue; + } + if (!group->can_be_taken_from_.load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->service = service; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group); + remove_service_handle_from_service_handles(handle); + return; + } + // Else, the service is no longer valid, remove it and continue + remove_service_handle_from_service_handles(handle); + } + } + std::shared_ptr get_next_ready_executable() { @@ -532,6 +694,12 @@ class Executor { return any_exec; } + // Check the services to see if there are any that are ready + get_next_service(any_exec); + if (any_exec->service) + { + return any_exec; + } // If there is neither a ready timer nor subscription, return a null ptr any_exec.reset(); return any_exec; @@ -580,6 +748,8 @@ class Executor SubscriberHandles subscriber_handles_; typedef std::list GuardConditionHandles; GuardConditionHandles guard_condition_handles_; + typedef std::list ServiceHandles; + ServiceHandles service_handles_; }; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 7b696318dd..dcc91bd60a 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -21,9 +21,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -89,6 +91,20 @@ class Node typedef std::weak_ptr CallbackGroupWeakPtr; typedef std::list CallbackGroupWeakPtrList; + /* Create and return a Client. */ + template + typename rclcpp::client::Client::SharedPtr + create_client(std::string service_name); + + /* Create and return a Service. */ + template + typename rclcpp::service::Service::SharedPtr + create_service( + std::string service_name, + std::function &, + std::shared_ptr&)> callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); + private: RCLCPP_DISABLE_COPY(Node); @@ -106,6 +122,7 @@ class Node size_t number_of_subscriptions_; size_t number_of_timers_; + size_t number_of_services_; }; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 22856df085..002360f384 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -23,6 +23,7 @@ #include #include +#include #include @@ -41,7 +42,7 @@ Node::Node(std::string node_name) Node::Node(std::string node_name, context::Context::SharedPtr context) : name_(node_name), context_(context), - number_of_subscriptions_(0), number_of_timers_(0) + number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0) { node_handle_ = ::ros_middleware_interface::create_node(); using rclcpp::callback_group::CallbackGroupType; @@ -162,4 +163,65 @@ Node::create_wall_timer( group); } +template +typename client::Client::SharedPtr +Node::create_client(std::string service_name) +{ + namespace rmi = ::ros_middleware_interface; + + auto &service_type_support_handle = rmi::get_service_type_support_handle(); + + auto client_handle = rmi::create_client(this->node_handle_, + service_type_support_handle, + service_name.c_str()); + + using namespace rclcpp::client; + + auto cli = Client::make_shared(client_handle, + service_name); + + return cli; +} + +template +typename service::Service::SharedPtr +Node::create_service( + std::string service_name, + std::function &, + std::shared_ptr&)> callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group) +{ + namespace rmi = ::ros_middleware_interface; + + auto &service_type_support_handle = rmi::get_service_type_support_handle(); + + auto service_handle = rmi::create_service(this->node_handle_, + service_type_support_handle, + service_name.c_str()); + + using namespace rclcpp::service; + + auto serv = Service::make_shared(service_handle, + service_name, + callback); + auto serv_base_ptr = std::dynamic_pointer_cast(serv); + + if (group) + { + if (!group_in_node(group)) + { + // TODO: use custom exception + throw std::runtime_error("Cannot create timer, group not in node."); + } + group->add_service(serv_base_ptr); + } + else + { + default_callback_group_->add_service(serv_base_ptr); + } + number_of_services_++; + + return serv; +} + #endif /* RCLCPP_RCLCPP_NODE_IMPL_HPP_ */ diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp new file mode 100644 index 0000000000..301dba437b --- /dev/null +++ b/rclcpp/include/rclcpp/service.hpp @@ -0,0 +1,113 @@ +/* Copyright 2014 Open Source Robotics Foundation, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RCLCPP_RCLCPP_SERVICE_HPP_ +#define RCLCPP_RCLCPP_SERVICE_HPP_ + +#include + +#include +#include + +#include + +namespace rclcpp +{ + +// Forward declaration for friend statement +namespace node {class Node;} + +namespace service +{ + +class ServiceBase +{ + friend class rclcpp::executor::Executor; +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(ServiceBase); + + ServiceBase( + ros_middleware_interface::ServiceHandle service_handle, + std::string &service_name) + : service_handle_(service_handle), service_name_(service_name) + {} + + std::string get_service_name() + { + return this->service_name_; + } + + ros_middleware_interface::ServiceHandle get_service_handle() + { + return this->service_handle_; + } + + virtual std::shared_ptr create_request() = 0; + virtual void handle_request(std::shared_ptr &request) = 0; + +private: + RCLCPP_DISABLE_COPY(ServiceBase); + + ros_middleware_interface::ServiceHandle service_handle_; + std::string service_name_; + +}; + +template +class Service : public ServiceBase +{ +public: + typedef std::function< + void(const std::shared_ptr &, + std::shared_ptr&)> CallbackType; + RCLCPP_MAKE_SHARED_DEFINITIONS(Service); + + Service( + ros_middleware_interface::ServiceHandle service_handle, + std::string &service_name, + CallbackType callback) + : ServiceBase(service_handle, service_name), callback_(callback) + {} + + std::shared_ptr create_request() + { + return std::shared_ptr(new typename ServiceT::Request()); + } + + void handle_request(std::shared_ptr &request) + { + auto typed_request = std::static_pointer_cast(request); + auto response = std::shared_ptr(new typename ServiceT::Response); + callback_(typed_request, response); + send_response(typed_request, response); + } + + void send_response( + std::shared_ptr &request, + std::shared_ptr &response) + { + ::ros_middleware_interface::send_response(get_service_handle(), request.get(), response.get()); + } + +private: + RCLCPP_DISABLE_COPY(Service); + + CallbackType callback_; +}; + +} /* namespace service */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_SERVICE_HPP_ */ From 180c0f9016e4a1bdaf7ed9995f6c4015dc28f623 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Wed, 7 Jan 2015 14:15:46 -0800 Subject: [PATCH 2/9] Move request header to a separate structure --- rclcpp/include/rclcpp/node.hpp | 2 +- rclcpp/include/rclcpp/node_impl.hpp | 2 +- rclcpp/include/rclcpp/service.hpp | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index dcc91bd60a..63ff7e520a 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -101,7 +101,7 @@ class Node typename rclcpp::service::Service::SharedPtr create_service( std::string service_name, - std::function &, + std::function &, std::shared_ptr&)> callback, rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 002360f384..7fb684116b 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -187,7 +187,7 @@ template typename service::Service::SharedPtr Node::create_service( std::string service_name, - std::function &, + std::function &, std::shared_ptr&)> callback, rclcpp::callback_group::CallbackGroup::SharedPtr group) { diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 301dba437b..bd5b6ef278 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -70,7 +70,7 @@ class Service : public ServiceBase { public: typedef std::function< - void(const std::shared_ptr &, + void(const std::shared_ptr &, std::shared_ptr&)> CallbackType; RCLCPP_MAKE_SHARED_DEFINITIONS(Service); @@ -83,19 +83,19 @@ class Service : public ServiceBase std::shared_ptr create_request() { - return std::shared_ptr(new typename ServiceT::Request()); + return std::shared_ptr(new typename ServiceT::RequestWithHeader()); } void handle_request(std::shared_ptr &request) { - auto typed_request = std::static_pointer_cast(request); + auto typed_request = std::static_pointer_cast(request); auto response = std::shared_ptr(new typename ServiceT::Response); callback_(typed_request, response); send_response(typed_request, response); } void send_response( - std::shared_ptr &request, + std::shared_ptr &request, std::shared_ptr &response) { ::ros_middleware_interface::send_response(get_service_handle(), request.get(), response.get()); From 4b290d66e8708a6e0453786d334ec4b674677c91 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Thu, 8 Jan 2015 09:53:24 -0800 Subject: [PATCH 3/9] Refactored to split request messages --- rclcpp/include/rclcpp/executor.hpp | 6 ++++-- rclcpp/include/rclcpp/node.hpp | 5 ++++- rclcpp/include/rclcpp/node_impl.hpp | 3 ++- rclcpp/include/rclcpp/service.hpp | 28 +++++++++++++++++++--------- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 8de14cf713..df38ffc915 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -171,12 +171,14 @@ class Executor rclcpp::service::ServiceBase::SharedPtr &service) { std::shared_ptr request = service->create_request(); + std::shared_ptr request_header = service->create_request_header(); bool taken = ros_middleware_interface::take_request( service->service_handle_, - request.get()); + request.get(), + request_header.get()); if (taken) { - service->handle_request(request); + service->handle_request(request, request_header); } else { diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 63ff7e520a..53bc2082c0 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -29,6 +29,8 @@ #include #include +#include + namespace rclcpp { @@ -101,7 +103,8 @@ class Node typename rclcpp::service::Service::SharedPtr create_service( std::string service_name, - std::function &, + std::function &, + const std::shared_ptr &, std::shared_ptr&)> callback, rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 7fb684116b..5f9c3b2bb4 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -187,7 +187,8 @@ template typename service::Service::SharedPtr Node::create_service( std::string service_name, - std::function &, + std::function &, + const std::shared_ptr &, std::shared_ptr&)> callback, rclcpp::callback_group::CallbackGroup::SharedPtr group) { diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index bd5b6ef278..2e85fe3cfd 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -23,6 +23,8 @@ #include +#include + namespace rclcpp { @@ -55,7 +57,8 @@ class ServiceBase } virtual std::shared_ptr create_request() = 0; - virtual void handle_request(std::shared_ptr &request) = 0; + virtual std::shared_ptr create_request_header() = 0; + virtual void handle_request(std::shared_ptr &request, std::shared_ptr &req_id) = 0; private: RCLCPP_DISABLE_COPY(ServiceBase); @@ -70,7 +73,8 @@ class Service : public ServiceBase { public: typedef std::function< - void(const std::shared_ptr &, + void(const std::shared_ptr &, + const std::shared_ptr &, std::shared_ptr&)> CallbackType; RCLCPP_MAKE_SHARED_DEFINITIONS(Service); @@ -83,22 +87,28 @@ class Service : public ServiceBase std::shared_ptr create_request() { - return std::shared_ptr(new typename ServiceT::RequestWithHeader()); + return std::shared_ptr(new typename ServiceT::Request()); + } + + std::shared_ptr create_request_header() + { + return std::shared_ptr(new userland_msgs::RequestId()); } - void handle_request(std::shared_ptr &request) + void handle_request(std::shared_ptr &request, std::shared_ptr &req_id) { - auto typed_request = std::static_pointer_cast(request); + auto typed_request = std::static_pointer_cast(request); + auto typed_req_id = std::static_pointer_cast(req_id); auto response = std::shared_ptr(new typename ServiceT::Response); - callback_(typed_request, response); - send_response(typed_request, response); + callback_(typed_request, typed_req_id, response); + send_response(typed_req_id, response); } void send_response( - std::shared_ptr &request, + std::shared_ptr &req_id, std::shared_ptr &response) { - ::ros_middleware_interface::send_response(get_service_handle(), request.get(), response.get()); + ::ros_middleware_interface::send_response(get_service_handle(), req_id.get(), response.get()); } private: From 3e0621a1ffb56c811a6423b178b85b5e3a90f2d4 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Thu, 8 Jan 2015 14:49:47 -0800 Subject: [PATCH 4/9] Added support for timeouts --- rclcpp/include/rclcpp/client.hpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index 6bda23022c..89609b6f32 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -44,12 +44,17 @@ class Client {} std::shared_ptr - send_request(std::shared_ptr &req) + send_request(std::shared_ptr &req, long timeout=10) { ::ros_middleware_interface::send_request(client_handle_, req.get()); std::shared_ptr res = std::make_shared(); - ::ros_middleware_interface::receive_response(client_handle_, res.get()); + bool received = ::ros_middleware_interface::receive_response(client_handle_, res.get(), timeout); + if(!received) + { + // TODO: use custom exception + throw std::runtime_error("Timed out while waiting for response"); + } return res; } From e91563640ea58f9649fdca1a93bbf2df2b2544b7 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Fri, 9 Jan 2015 17:14:27 -0800 Subject: [PATCH 5/9] Do not pass RequestId to the server callback. Use RequestId from rpc.h --- rclcpp/include/rclcpp/node.hpp | 3 --- rclcpp/include/rclcpp/node_impl.hpp | 1 - rclcpp/include/rclcpp/service.hpp | 11 +++++------ 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 53bc2082c0..dcc91bd60a 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -29,8 +29,6 @@ #include #include -#include - namespace rclcpp { @@ -104,7 +102,6 @@ class Node create_service( std::string service_name, std::function &, - const std::shared_ptr &, std::shared_ptr&)> callback, rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 5f9c3b2bb4..002360f384 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -188,7 +188,6 @@ typename service::Service::SharedPtr Node::create_service( std::string service_name, std::function &, - const std::shared_ptr &, std::shared_ptr&)> callback, rclcpp::callback_group::CallbackGroup::SharedPtr group) { diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 2e85fe3cfd..04d644e533 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -20,10 +20,10 @@ #include #include +#include #include -#include namespace rclcpp { @@ -74,7 +74,6 @@ class Service : public ServiceBase public: typedef std::function< void(const std::shared_ptr &, - const std::shared_ptr &, std::shared_ptr&)> CallbackType; RCLCPP_MAKE_SHARED_DEFINITIONS(Service); @@ -92,20 +91,20 @@ class Service : public ServiceBase std::shared_ptr create_request_header() { - return std::shared_ptr(new userland_msgs::RequestId()); + return std::shared_ptr(new ros_middleware_interface::RequestId()); } void handle_request(std::shared_ptr &request, std::shared_ptr &req_id) { auto typed_request = std::static_pointer_cast(request); - auto typed_req_id = std::static_pointer_cast(req_id); + auto typed_req_id = std::static_pointer_cast(req_id); auto response = std::shared_ptr(new typename ServiceT::Response); - callback_(typed_request, typed_req_id, response); + callback_(typed_request, response); send_response(typed_req_id, response); } void send_response( - std::shared_ptr &req_id, + std::shared_ptr &req_id, std::shared_ptr &response) { ::ros_middleware_interface::send_response(get_service_handle(), req_id.get(), response.get()); From af3709f040bb70b258b7261701b4c0087a6ff04b Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Fri, 9 Jan 2015 17:42:25 -0800 Subject: [PATCH 6/9] Removed timeouts --- rclcpp/include/rclcpp/client.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index 89609b6f32..f58694eab9 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -44,12 +44,12 @@ class Client {} std::shared_ptr - send_request(std::shared_ptr &req, long timeout=10) + send_request(std::shared_ptr &req) { ::ros_middleware_interface::send_request(client_handle_, req.get()); std::shared_ptr res = std::make_shared(); - bool received = ::ros_middleware_interface::receive_response(client_handle_, res.get(), timeout); + bool received = ::ros_middleware_interface::receive_response(client_handle_, res.get()); if(!received) { // TODO: use custom exception From f26bed380d30ac4c4094ec7ae4291fa9ba3e299b Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Mon, 12 Jan 2015 14:36:42 -0800 Subject: [PATCH 7/9] Pass response object as parameter --- rclcpp/include/rclcpp/client.hpp | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index f58694eab9..f9ca72c21e 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -43,19 +43,13 @@ class Client : client_handle_(client_handle), service_name_(service_name) {} - std::shared_ptr - send_request(std::shared_ptr &req) + ::ros_middleware_interface::ROS2_RETCODE_t send_request( + std::shared_ptr &req, + std::shared_ptr &res) { ::ros_middleware_interface::send_request(client_handle_, req.get()); - std::shared_ptr res = std::make_shared(); - bool received = ::ros_middleware_interface::receive_response(client_handle_, res.get()); - if(!received) - { - // TODO: use custom exception - throw std::runtime_error("Timed out while waiting for response"); - } - return res; + return ::ros_middleware_interface::receive_response(client_handle_, res.get()); } private: From 8332480c8510a09374c49be1e196a788041460f9 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Tue, 13 Jan 2015 18:32:14 -0800 Subject: [PATCH 8/9] Added support for asynchronous clients --- rclcpp/include/rclcpp/callback_group.hpp | 8 ++ rclcpp/include/rclcpp/client.hpp | 91 +++++++++++-- rclcpp/include/rclcpp/executor.hpp | 166 +++++++++++++++++++++++ rclcpp/include/rclcpp/node.hpp | 5 +- rclcpp/include/rclcpp/node_impl.hpp | 22 ++- 5 files changed, 280 insertions(+), 12 deletions(-) diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 82023e0994..088439ed42 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace rclcpp { @@ -69,10 +70,17 @@ class CallbackGroup service_ptrs_.push_back(service_ptr); } + void + add_client(const client::ClientBase::SharedPtr &client_ptr) + { + client_ptrs_.push_back(client_ptr); + } + CallbackGroupType type_; std::vector subscription_ptrs_; std::vector timer_ptrs_; std::vector service_ptrs_; + std::vector client_ptrs_; std::atomic_bool can_be_taken_from_; }; diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index f9ca72c21e..f6635f7743 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -21,7 +21,11 @@ #include #include +#include #include +#include +#include +#include namespace rclcpp { @@ -32,30 +36,99 @@ namespace node {class Node;} namespace client { +class ClientBase +{ + friend class rclcpp::executor::Executor; +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(ClientBase); + + ClientBase( + ros_middleware_interface::ClientHandle client_handle, + std::string &service_name) + : client_handle_(client_handle), service_name_(service_name) + {} + + std::string get_service_name() + { + return this->service_name_; + } + + ros_middleware_interface::ClientHandle get_client_handle() + { + return this->client_handle_; + } + + virtual std::shared_ptr create_response() = 0; + virtual std::shared_ptr create_request_header() = 0; + virtual void handle_response(std::shared_ptr &response, std::shared_ptr &req_id) = 0; + +private: + RCLCPP_DISABLE_COPY(ClientBase); + + ros_middleware_interface::ClientHandle client_handle_; + std::string service_name_; + +}; + template -class Client +class Client : public ClientBase { public: + typedef std::promise Promise; + typedef std::shared_ptr SharedPromise; + typedef std::shared_future SharedFuture; + RCLCPP_MAKE_SHARED_DEFINITIONS(Client); Client(ros_middleware_interface::ClientHandle client_handle, std::string& service_name) - : client_handle_(client_handle), service_name_(service_name) + : ClientBase(client_handle, service_name) {} - ::ros_middleware_interface::ROS2_RETCODE_t send_request( - std::shared_ptr &req, - std::shared_ptr &res) + std::shared_ptr get_response(int64_t sequence_number) + { + auto pair = this->pending_requests_[sequence_number]; + return pair.second; + } + + std::shared_ptr create_response() + { + return std::shared_ptr(new typename ServiceT::Response()); + } + + std::shared_ptr create_request_header() { - ::ros_middleware_interface::send_request(client_handle_, req.get()); + return std::shared_ptr(new ros_middleware_interface::RequestId()); + } - return ::ros_middleware_interface::receive_response(client_handle_, res.get()); + void handle_response(std::shared_ptr &response, std::shared_ptr &req_id) + { + auto typed_req_id = std::static_pointer_cast(req_id); + auto typed_response = std::static_pointer_cast(response); + int64_t sequence_number = typed_req_id->sequence_number; + auto pair = this->pending_requests_[sequence_number]; + auto call_promise = pair.first; + this->pending_requests_.erase(sequence_number); + call_promise->set_value(typed_response); + } + + SharedFuture async_send_request( + typename ServiceT::Request::Ptr &request, + typename ServiceT::Response::Ptr &response) + { + int64_t sequence_number = ::ros_middleware_interface::send_request(get_client_handle(), request.get()); + + SharedPromise call_promise = std::make_shared(); + pending_requests_[sequence_number] = std::make_pair(call_promise, response); + + SharedFuture f(call_promise->get_future()); + return f; } private: - ros_middleware_interface::ClientHandle client_handle_; - std::string service_name_; + RCLCPP_DISABLE_COPY(Client); + std::map > pending_requests_; }; } /* namespace client */ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index df38ffc915..4b1725b218 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -107,6 +107,7 @@ class Executor rclcpp::subscription::SubscriptionBase::SharedPtr subscription; rclcpp::timer::TimerBase::SharedPtr timer; rclcpp::service::ServiceBase::SharedPtr service; + rclcpp::client::ClientBase::SharedPtr client; // These are used to keep the scope on the containing items rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; rclcpp::node::Node::SharedPtr node; @@ -131,6 +132,10 @@ class Executor { execute_service(any_exec->service); } + if (any_exec->client) + { + execute_client(any_exec->client); + } // Reset the callback_group, regardless of type any_exec->callback_group->can_be_taken_from_.store(true); // Wake the wait, because it may need to be recalculated or work that @@ -188,6 +193,26 @@ class Executor } } + static void + execute_client( + rclcpp::client::ClientBase::SharedPtr &client) + { + std::shared_ptr response = client->create_response(); + std::shared_ptr request_header = client->create_request_header(); + bool taken = ros_middleware_interface::take_response( + client->client_handle_, + response.get(), + request_header.get()); + if (taken) + { + client->handle_response(response, request_header); + } + else + { + std::cout << "[rclcpp::error] take failed for service on client" << std::endl; + } + } + /*** Populate class storage from stored weak node pointers and wait. ***/ void @@ -198,6 +223,7 @@ class Executor std::vector subs; std::vector timers; std::vector services; + std::vector clients; for (auto &weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -225,6 +251,10 @@ class Executor { services.push_back(service); } + for (auto &client : group->client_ptrs_) + { + clients.push_back(client); + } } } // Clean up any invalid nodes, if they were detected @@ -278,6 +308,27 @@ class Executor service_handle_index += 1; } + // Use the number of clients to allocate memory in the handles + size_t number_of_clients = clients.size(); + ros_middleware_interface::ClientHandles client_handles; + client_handles.client_count_ = number_of_clients; + // TODO: Avoid redundant malloc's + client_handles.clients_ = static_cast( + std::malloc(sizeof(void *) * number_of_clients)); + if (client_handles.clients_ == NULL) + { + // TODO: Use a different error here? maybe std::bad_alloc? + throw std::runtime_error("Could not malloc for client pointers."); + } + // Then fill the ServiceHandles with ready clients + size_t client_handle_index = 0; + for (auto &client : clients) + { + client_handles.clients_[client_handle_index] = \ + client->client_handle_.data_; + client_handle_index += 1; + } + // Use the number of guard conditions to allocate memory in the handles // Add 2 to the number for the ctrl-c guard cond and the executor's size_t start_of_timer_guard_conds = 2; @@ -313,6 +364,7 @@ class Executor ros_middleware_interface::wait(subscriber_handles, guard_condition_handles, service_handles, + client_handles, nonblocking); // If ctrl-c guard condition, return directly if (guard_condition_handles.guard_conditions_[0] != 0) @@ -352,6 +404,15 @@ class Executor service_handles_.push_back(handle); } } + // Then the clients + for (size_t i = 0; i < number_of_clients; ++i) + { + void *handle = client_handles.clients_[i]; + if (handle) + { + client_handles_.push_back(handle); + } + } // Make sure to free memory // TODO: Remove theses when "Avoid redundant malloc's" todo is addressed @@ -449,6 +510,36 @@ class Executor return rclcpp::service::ServiceBase::SharedPtr(); } + rclcpp::client::ClientBase::SharedPtr + get_client_by_handle(void * client_handle) + { + for (auto weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + if (!group) + { + continue; + } + for (auto client : group->client_ptrs_) + { + if (client->client_handle_.data_ == client_handle) + { + return client; + } + } + } + } + return rclcpp::client::ClientBase::SharedPtr(); + } + + void remove_subscriber_handle_from_subscriber_handles(void *handle) { @@ -467,6 +558,12 @@ class Executor service_handles_.remove(handle); } + void + remove_client_handle_from_client_handles(void *handle) + { + client_handles_.remove(handle); + } + rclcpp::node::Node::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr &group) { @@ -680,6 +777,67 @@ class Executor } } + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_client( + rclcpp::client::ClientBase::SharedPtr &client) + { + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + for (auto &cli : group->client_ptrs_) + { + if (cli == client) + { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_client(std::shared_ptr &any_exec) + { + for (auto handle : client_handles_) + { + auto client = get_client_by_handle(handle); + if (client) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_client(client); + if (!group) + { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + remove_client_handle_from_client_handles(handle); + continue; + } + if (!group->can_be_taken_from_.load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->client = client; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group); + remove_client_handle_from_client_handles(handle); + return; + } + // Else, the service is no longer valid, remove it and continue + remove_client_handle_from_client_handles(handle); + } + } + std::shared_ptr get_next_ready_executable() { @@ -702,6 +860,12 @@ class Executor { return any_exec; } + // Check the clients to see if there are any that are ready + get_next_client(any_exec); + if (any_exec->client) + { + return any_exec; + } // If there is neither a ready timer nor subscription, return a null ptr any_exec.reset(); return any_exec; @@ -752,6 +916,8 @@ class Executor GuardConditionHandles guard_condition_handles_; typedef std::list ServiceHandles; ServiceHandles service_handles_; + typedef std::list ClientHandles; + ClientHandles client_handles_; }; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index dcc91bd60a..cbae28354b 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -94,7 +94,9 @@ class Node /* Create and return a Client. */ template typename rclcpp::client::Client::SharedPtr - create_client(std::string service_name); + create_client( + std::string service_name, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); /* Create and return a Service. */ template @@ -123,6 +125,7 @@ class Node size_t number_of_subscriptions_; size_t number_of_timers_; size_t number_of_services_; + size_t number_of_clients_; }; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 002360f384..f08d0bd8c5 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -165,7 +165,9 @@ Node::create_wall_timer( template typename client::Client::SharedPtr -Node::create_client(std::string service_name) +Node::create_client( + std::string service_name, + rclcpp::callback_group::CallbackGroup::SharedPtr group) { namespace rmi = ::ros_middleware_interface; @@ -180,6 +182,22 @@ Node::create_client(std::string service_name) auto cli = Client::make_shared(client_handle, service_name); + auto cli_base_ptr = std::dynamic_pointer_cast(cli); + if (group) + { + if (!group_in_node(group)) + { + // TODO: use custom exception + throw std::runtime_error("Cannot create client, group not in node."); + } + group->add_client(cli_base_ptr); + } + else + { + default_callback_group_->add_client(cli_base_ptr); + } + number_of_clients_++; + return cli; } @@ -211,7 +229,7 @@ Node::create_service( if (!group_in_node(group)) { // TODO: use custom exception - throw std::runtime_error("Cannot create timer, group not in node."); + throw std::runtime_error("Cannot create service, group not in node."); } group->add_service(serv_base_ptr); } From 444b56c281eea9a4949372c848d00a2ec27d0c88 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez Date: Thu, 15 Jan 2015 18:24:45 -0800 Subject: [PATCH 9/9] Extend async_request to take accept a callback as an argument --- rclcpp/include/rclcpp/client.hpp | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index f6635f7743..66976d5c06 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -74,9 +74,11 @@ template class Client : public ClientBase { public: - typedef std::promise Promise; - typedef std::shared_ptr SharedPromise; - typedef std::shared_future SharedFuture; + typedef std::promise Promise; + typedef std::shared_ptr SharedPromise; + typedef std::shared_future SharedFuture; + + typedef std::function CallbackType; RCLCPP_MAKE_SHARED_DEFINITIONS(Client); @@ -106,29 +108,39 @@ class Client : public ClientBase auto typed_req_id = std::static_pointer_cast(req_id); auto typed_response = std::static_pointer_cast(response); int64_t sequence_number = typed_req_id->sequence_number; - auto pair = this->pending_requests_[sequence_number]; - auto call_promise = pair.first; + auto tuple = this->pending_requests_[sequence_number]; + auto call_promise = std::get<0>(tuple); + auto callback = std::get<1>(tuple); + auto future = std::get<2>(tuple); this->pending_requests_.erase(sequence_number); call_promise->set_value(typed_response); + callback(future); } SharedFuture async_send_request( typename ServiceT::Request::Ptr &request, typename ServiceT::Response::Ptr &response) + { + return async_send_request(request, response, [] (SharedFuture f) { }); + } + + SharedFuture async_send_request( + typename ServiceT::Request::Ptr &request, + typename ServiceT::Response::Ptr &response, + CallbackType cb) { int64_t sequence_number = ::ros_middleware_interface::send_request(get_client_handle(), request.get()); SharedPromise call_promise = std::make_shared(); - pending_requests_[sequence_number] = std::make_pair(call_promise, response); - SharedFuture f(call_promise->get_future()); + pending_requests_[sequence_number] = std::make_tuple(call_promise, cb, f); return f; } private: RCLCPP_DISABLE_COPY(Client); - std::map > pending_requests_; + std::map > pending_requests_; }; } /* namespace client */