diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 9572f422a8..b619160f92 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -38,6 +38,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/any_executable.cpp src/rclcpp/callback_group.cpp src/rclcpp/client.cpp + src/rclcpp/client_intra_process_base.cpp src/rclcpp/clock.cpp src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp @@ -97,6 +98,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/serialization.cpp src/rclcpp/serialized_message.cpp src/rclcpp/service.cpp + src/rclcpp/service_intra_process_base.cpp src/rclcpp/signal_handler.cpp src/rclcpp/subscription_base.cpp src/rclcpp/subscription_intra_process_base.cpp diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index e88fa8a949..04d8232cd1 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -34,9 +34,13 @@ #include "rcl/wait.h" #include "rclcpp/detail/cpp_callback_trampoline.hpp" +#include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" +#include "rclcpp/experimental/client_intra_process.hpp" +#include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/function_traits.hpp" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_graph_interface.hpp" @@ -45,6 +49,8 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcutils/logging_macros.h" + #include "rmw/error_handling.h" #include "rmw/impl/cpp/demangle.hpp" #include "rmw/rmw.h" @@ -251,6 +257,15 @@ class ClientBase rclcpp::QoS get_response_subscription_actual_qos() const; + /// Return the waitable for intra-process + /** + * \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup. + * \throws std::runtime_error if the intra process manager is destroyed + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_intra_process_waitable(); + /// Set a callback to be called when each new response is received. /** * The callback receives a size_t which is the number of responses received @@ -354,6 +369,19 @@ class ClientBase void set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data); + using IntraProcessManagerWeakPtr = + std::weak_ptr; + + /// Implementation detail. + RCLCPP_PUBLIC + void + setup_intra_process( + uint64_t intra_process_client_id, + IntraProcessManagerWeakPtr weak_ipm); + + std::shared_ptr client_intra_process_; + std::atomic_uint ipc_sequence_number_{1}; + rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; std::shared_ptr node_handle_; std::shared_ptr context_; @@ -363,6 +391,11 @@ class ClientBase std::atomic in_use_by_wait_set_{false}; + std::recursive_mutex ipc_mutex_; + bool use_intra_process_{false}; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t intra_process_client_id_; + std::recursive_mutex callback_mutex_; std::function on_new_response_callback_{nullptr}; }; @@ -460,12 +493,14 @@ class Client : public ClientBase * \param[in] node_graph The node graph interface of the corresponding node. * \param[in] service_name Name of the topic to publish to. * \param[in] client_options options for the subscription. + * \param[in] ipc_setting Intra-process communication setting for the client. */ Client( rclcpp::node_interfaces::NodeBaseInterface * node_base, rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, const std::string & service_name, - rcl_client_options_t & client_options) + rcl_client_options_t & client_options, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) : ClientBase(node_base, node_graph) { using rosidl_typesupport_cpp::get_service_type_support_handle; @@ -490,10 +525,27 @@ class Client : public ClientBase } rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client"); } + + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_client(); + } } virtual ~Client() { + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a client."); + return; + } + ipm->remove_client(intra_process_client_id_); } /// Take the next response for this client. @@ -610,7 +662,7 @@ class Client : public ClientBase Promise promise; auto future = promise.get_future(); auto req_id = async_send_request_impl( - *request, + std::move(request), std::move(promise)); return FutureAndRequestId(std::move(future), req_id); } @@ -645,7 +697,7 @@ class Client : public ClientBase Promise promise; auto shared_future = promise.get_future().share(); auto req_id = async_send_request_impl( - *request, + std::move(request), std::make_tuple( CallbackType{std::forward(cb)}, shared_future, @@ -676,7 +728,7 @@ class Client : public ClientBase PromiseWithRequest promise; auto shared_future = promise.get_future().share(); auto req_id = async_send_request_impl( - *request, + request, std::make_tuple( CallbackWithRequestType{std::forward(cb)}, request, @@ -789,10 +841,32 @@ class Client : public ClientBase CallbackWithRequestTypeValueVariant>; int64_t - async_send_request_impl(const Request & request, CallbackInfoVariant value) + async_send_request_impl(SharedRequest request, CallbackInfoVariant value) { + std::lock_guard lock(ipc_mutex_); + if (use_intra_process_) { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process send called after destruction of intra process manager"); + } + bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_); + + // Check if there's an intra-process server available matching this client. + // If there's not, we fall back into inter-process communication, since + // the server might be available in another process or was configured to not use IPC. + if (intra_process_server_available) { + // Send intra-process request + ipm->send_intra_process_client_request( + intra_process_client_id_, + std::make_pair(std::move(request), std::move(value))); + return ipc_sequence_number_++; + } + } + + // Send inter-process request int64_t sequence_number; - rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number); + rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); } @@ -821,6 +895,40 @@ class Client : public ClientBase return value; } + void + create_intra_process_client() + { + // Check if the QoS is compatible with intra-process. + auto qos_profile = get_response_subscription_actual_qos(); + + if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { + throw std::invalid_argument( + "intraprocess communication allowed only with keep last history qos policy"); + } + if (qos_profile.depth() == 0) { + throw std::invalid_argument( + "intraprocess communication is not allowed with 0 depth qos policy"); + } + if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { + throw std::invalid_argument( + "intraprocess communication allowed only with volatile durability"); + } + + // Create a ClientIntraProcess which will be given to the intra-process manager. + using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess; + + client_intra_process_ = std::make_shared( + context_, + this->get_service_name(), + qos_profile); + + // Add it to the intra process manager. + using rclcpp::experimental::IntraProcessManager; + auto ipm = context_->get_sub_context(); + uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_); + this->setup_intra_process(intra_process_client_id, ipm); + } + RCLCPP_DISABLE_COPY(Client) std::unordered_map< diff --git a/rclcpp/include/rclcpp/create_client.hpp b/rclcpp/include/rclcpp/create_client.hpp index 1a960b8d8f..ffa5186936 100644 --- a/rclcpp/include/rclcpp/create_client.hpp +++ b/rclcpp/include/rclcpp/create_client.hpp @@ -35,7 +35,8 @@ create_client( std::shared_ptr node_services, const std::string & service_name, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) { rcl_client_options_t options = rcl_client_get_default_options(); options.qos = qos_profile; @@ -44,7 +45,8 @@ create_client( node_base.get(), node_graph, service_name, - options); + options, + ipc_setting); auto cli_base_ptr = std::dynamic_pointer_cast(cli); node_services->add_client(cli_base_ptr, group); diff --git a/rclcpp/include/rclcpp/create_service.hpp b/rclcpp/include/rclcpp/create_service.hpp index 9aaa02a1ed..9aca6b8b6c 100644 --- a/rclcpp/include/rclcpp/create_service.hpp +++ b/rclcpp/include/rclcpp/create_service.hpp @@ -37,7 +37,8 @@ create_service( const std::string & service_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) { rclcpp::AnyServiceCallback any_service_callback; any_service_callback.set(std::forward(callback)); @@ -46,8 +47,8 @@ create_service( service_options.qos = qos_profile; auto serv = Service::make_shared( - node_base->get_shared_rcl_node_handle(), - service_name, any_service_callback, service_options); + node_base, + service_name, any_service_callback, service_options, ipc_setting); auto serv_base_ptr = std::dynamic_pointer_cast(serv); node_services->add_service(serv_base_ptr, group); return serv; diff --git a/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp b/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp index 9098bfe695..765b0132b1 100644 --- a/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp +++ b/rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp @@ -26,12 +26,12 @@ namespace detail { /// Return whether or not intra process is enabled, resolving "NodeDefault" if needed. -template +template bool -resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base) +resolve_use_intra_process(IntraProcessSetting ipc_setting, const NodeBaseT & node_base) { bool use_intra_process; - switch (options.use_intra_process_comm) { + switch (ipc_setting) { case IntraProcessSetting::Enable: use_intra_process = true; break; diff --git a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp index c2e747e774..cbe52d3aee 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp @@ -234,6 +234,51 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer +class ServiceIntraProcessBuffer : public IntraProcessBufferBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBuffer) + + virtual ~ServiceIntraProcessBuffer() {} + + explicit + ServiceIntraProcessBuffer( + std::unique_ptr> buffer_impl) + { + buffer_ = std::move(buffer_impl); + } + + bool use_take_shared_method() const override + { + return false; + } + + bool has_data() const override + { + return buffer_->has_data(); + } + + void clear() override + { + buffer_->clear(); + } + + void add(BufferT && msg) + { + buffer_->enqueue(std::move(msg)); + } + + BufferT + consume() + { + return buffer_->dequeue(); + } + +private: + std::unique_ptr> buffer_; +}; + } // namespace buffers } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/client_intra_process.hpp b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp new file mode 100644 index 0000000000..fb30e6ee81 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp @@ -0,0 +1,142 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ + +#include +#include +#include +#include +#include +#include +#include // NOLINT, cpplint doesn't think this is a cpp std header + +#include "rcutils/logging_macros.h" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +template +class ClientIntraProcess : public ClientIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ClientIntraProcess) + + using SharedRequest = typename ServiceT::Request::SharedPtr; + using SharedResponse = typename ServiceT::Response::SharedPtr; + + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + using CallbackTypeValueVariant = std::tuple; + using CallbackWithRequestTypeValueVariant = std::tuple< + CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>; + + using CallbackInfoVariant = std::variant< + std::promise, + CallbackTypeValueVariant, + CallbackWithRequestTypeValueVariant>; + + using ServiceResponse = std::pair; + + ClientIntraProcess( + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : ClientIntraProcessBase(context, service_name, qos_profile) + { + // Create the intra-process buffer. + buffer_ = rclcpp::experimental::create_service_intra_process_buffer< + ServiceResponse>(qos_profile); + } + + virtual ~ClientIntraProcess() = default; + + bool + is_ready(rcl_wait_set_t * wait_set) + { + (void) wait_set; + return buffer_->has_data(); + } + + void + store_intra_process_response(ServiceResponse && response) + { + buffer_->add(std::move(response)); + gc_.trigger(); + invoke_on_new_response(); + } + + std::shared_ptr + take_data() override + { + auto data = std::make_shared(std::move(buffer_->consume())); + return std::static_pointer_cast(data); + } + + void execute(std::shared_ptr & data) + { + if (!data) { + throw std::runtime_error("'data' is empty"); + } + + auto data_ptr = std::static_pointer_cast(data); + auto & typed_response = data_ptr->first; + auto & value = data_ptr->second; + + if (std::holds_alternative(value)) { + auto & promise = std::get(value); + promise.set_value(std::move(typed_response)); + } else if (std::holds_alternative(value)) { + auto & inner = std::get(value); + const auto & callback = std::get(inner); + auto & promise = std::get(inner); + auto & future = std::get(inner); + promise.set_value(std::move(typed_response)); + callback(std::move(future)); + } else if (std::holds_alternative(value)) { + auto & inner = std::get(value); + const auto & callback = std::get(inner); + auto & promise = std::get(inner); + auto & future = std::get(inner); + auto & request = std::get(inner); + promise.set_value(std::make_pair(std::move(request), std::move(typed_response))); + callback(std::move(future)); + } + } + +protected: + using BufferUniquePtr = + typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< + ServiceResponse>::UniquePtr; + + BufferUniquePtr buffer_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp new file mode 100644 index 0000000000..5c59d8e421 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp @@ -0,0 +1,196 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ + +#include +#include +#include +#include + +#include "rmw/impl/cpp/demangle.hpp" + +#include "rclcpp/context.hpp" +#include "rclcpp/guard_condition.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/type_support_decl.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +class ClientIntraProcessBase : public rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcessBase) + + enum class EntityType : std::size_t + { + Client, + }; + + RCLCPP_PUBLIC + ClientIntraProcessBase( + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : gc_(context), service_name_(service_name), qos_profile_(qos_profile) + {} + + virtual ~ClientIntraProcessBase() = default; + + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() {return 1;} + + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set); + + virtual bool + is_ready(rcl_wait_set_t * wait_set) = 0; + + virtual + std::shared_ptr + take_data() = 0; + + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void)id; + return take_data(); + } + + virtual void + execute(std::shared_ptr & data) = 0; + + RCLCPP_PUBLIC + const char * + get_service_name() const; + + RCLCPP_PUBLIC + QoS + get_actual_qos() const; + + /// Set a callback to be called when each new response arrives. + /** + * The callback receives a size_t which is the number of responses received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if responses were received before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bound to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new response is received. + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Client)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::ClientIntraProcessBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::ClientIntraProcessBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(reentrant_mutex_); + on_new_response_callback_ = new_callback; + + if (unread_count_ > 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { + on_new_response_callback_(unread_count_); + } else { + // Use qos profile depth as upper bound for unread_count_ + on_new_response_callback_(std::min(unread_count_, qos_profile_.depth())); + } + unread_count_ = 0; + } + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(reentrant_mutex_); + on_new_response_callback_ = nullptr; + } + +protected: + std::recursive_mutex reentrant_mutex_; + rclcpp::GuardCondition gc_; + std::function on_new_response_callback_ {nullptr}; + size_t unread_count_{0}; + + void + invoke_on_new_response() + { + std::lock_guard lock(reentrant_mutex_); + if (on_new_response_callback_) { + on_new_response_callback_(1); + } else { + unread_count_++; + } + } + +private: + std::string service_name_; + QoS qos_profile_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp index 4d7668b964..262aec96a3 100644 --- a/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp @@ -92,6 +92,26 @@ create_intra_process_buffer( return buffer; } +template +typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer::UniquePtr +create_service_intra_process_buffer(const rclcpp::QoS & qos) +{ + using rclcpp::experimental::buffers::RingBufferImplementation; + + size_t buffer_size = qos.depth(); + auto buffer_impl = std::make_unique>(buffer_size); + + using rclcpp::experimental::buffers::ServiceIntraProcessBuffer; + typename ServiceIntraProcessBuffer::UniquePtr buffer; + + // Construct the intra_process_buffer + buffer = + std::make_unique>( + std::move(buffer_impl)); + + return buffer; +} + } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d8054be2d4..967c7577de 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -28,7 +28,11 @@ #include #include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/experimental/client_intra_process.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" +#include "rclcpp/experimental/service_intra_process.hpp" +#include "rclcpp/experimental/service_intra_process_base.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" @@ -97,10 +101,10 @@ class IntraProcessManager RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager) RCLCPP_PUBLIC - IntraProcessManager(); + IntraProcessManager() = default; RCLCPP_PUBLIC - virtual ~IntraProcessManager(); + virtual ~IntraProcessManager() = default; /// Register a subscription with the manager, returns subscriptions unique id. /** @@ -116,6 +120,24 @@ class IntraProcessManager uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); + /// Register an intra-process client with the manager, returns the client unique id. + /** + * \param client the ClientIntraProcessBase to register. + * \return an unsigned 64-bit integer which is the client's unique id. + */ + RCLCPP_PUBLIC + uint64_t + add_intra_process_client(rclcpp::experimental::ClientIntraProcessBase::SharedPtr client); + + /// Register an intra-process service with the manager, returns the service unique id. + /** + * \param service the Service IntraProcessBase to register. + * \return an unsigned 64-bit integer which is the service's unique id. + */ + RCLCPP_PUBLIC + uint64_t + add_intra_process_service(rclcpp::experimental::ServiceIntraProcessBase::SharedPtr service); + /// Unregister a subscription using the subscription's unique id. /** * This method does not allocate memory. @@ -126,6 +148,22 @@ class IntraProcessManager void remove_subscription(uint64_t intra_process_subscription_id); + /// Unregister a client using the client's unique id. + /** + * \param intra_process_client_id id of the client to remove. + */ + RCLCPP_PUBLIC + void + remove_client(uint64_t intra_process_client_id); + + /// Unregister a service using the service's unique id. + /** + * \param intra_process_service_id id of the service to remove. + */ + RCLCPP_PUBLIC + void + remove_service(uint64_t intra_process_service_id); + /// Register a publisher with the manager, returns the publisher unique id. /** * This method stores the publisher intra process object, together with @@ -237,6 +275,56 @@ class IntraProcessManager } } + /// Send an intra-process client request + /** + * Using the intra-process client id, a matching intra-process service is retrieved + * which will store the request to process it asynchronously. + * + * \param intra_process_client_id the id of the client sending the request + * \param request the client's request plus callbacks if any. + */ + template + void + send_intra_process_client_request( + uint64_t intra_process_client_id, + RequestT request) + { + std::unique_lock lock(mutex_); + + auto client_it = clients_to_services_.find(intra_process_client_id); + + if (client_it == clients_to_services_.end()) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling send_intra_process_client_request for invalid or no " + "longer existing client id"); + return; + } + uint64_t service_id = client_it->second; + + auto service_it = services_.find(service_id); + if (service_it == services_.end()) { + auto cli = get_client_intra_process(intra_process_client_id); + auto warning_msg = + "Intra-process service gone out of scope. " + "Do inter-process requests.\n" + "Client topic name: " + std::string(cli->get_service_name()); + RCLCPP_WARN(rclcpp::get_logger("rclcpp"), warning_msg.c_str()); + return; + } + auto service_intra_process_base = service_it->second.lock(); + if (service_intra_process_base) { + auto service = std::dynamic_pointer_cast< + rclcpp::experimental::ServiceIntraProcess>(service_intra_process_base); + if (service) { + service->store_intra_process_request( + intra_process_client_id, std::move(request)); + } + } else { + services_.erase(service_it); + } + } + template< typename MessageT, typename ROSMessageType, @@ -306,6 +394,18 @@ class IntraProcessManager rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id); + RCLCPP_PUBLIC + rclcpp::experimental::ClientIntraProcessBase::SharedPtr + get_client_intra_process(uint64_t intra_process_client_id); + + RCLCPP_PUBLIC + rclcpp::experimental::ServiceIntraProcessBase::SharedPtr + get_service_intra_process(uint64_t intra_process_service_id); + + RCLCPP_PUBLIC + bool + service_is_available(uint64_t intra_process_client_id); + private: struct SplittedSubscriptions { @@ -322,6 +422,15 @@ class IntraProcessManager using PublisherToSubscriptionIdsMap = std::unordered_map; + using ClientMap = + std::unordered_map; + + using ServiceMap = + std::unordered_map; + + using ClientToServiceIdsMap = + std::unordered_map; + RCLCPP_PUBLIC static uint64_t @@ -337,6 +446,12 @@ class IntraProcessManager rclcpp::PublisherBase::SharedPtr pub, rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const; + RCLCPP_PUBLIC + bool + can_communicate( + rclcpp::experimental::ClientIntraProcessBase::SharedPtr client, + rclcpp::experimental::ServiceIntraProcessBase::SharedPtr service) const; + template< typename MessageT, typename Alloc, @@ -511,6 +626,9 @@ class IntraProcessManager PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; + ClientToServiceIdsMap clients_to_services_; + ClientMap clients_; + ServiceMap services_; mutable std::shared_timed_mutex mutex_; }; diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp new file mode 100644 index 0000000000..9a4852a6a7 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp @@ -0,0 +1,160 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rcl/error_handling.h" +#include "rcutils/logging_macros.h" + +#include "rclcpp/any_service_callback.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/client_intra_process.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" +#include "rclcpp/experimental/service_intra_process_base.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/type_support_decl.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +template +class ServiceIntraProcess : public ServiceIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ServiceIntraProcess) + + using SharedRequest = typename ServiceT::Request::SharedPtr; + using SharedResponse = typename ServiceT::Response::SharedPtr; + + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + using CallbackTypeValueVariant = std::tuple; + using CallbackWithRequestTypeValueVariant = std::tuple< + CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>; + + using CallbackInfoVariant = std::variant< + std::promise, + CallbackTypeValueVariant, + CallbackWithRequestTypeValueVariant>; + + using RequestCallbackPair = std::pair; + using ClientIDtoRequest = std::pair; + + ServiceIntraProcess( + AnyServiceCallback callback, + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : ServiceIntraProcessBase(context, service_name, qos_profile), any_callback_(callback) + { + // Create the intra-process buffer. + buffer_ = rclcpp::experimental::create_service_intra_process_buffer< + ClientIDtoRequest>(qos_profile); + } + + virtual ~ServiceIntraProcess() = default; + + bool + is_ready(rcl_wait_set_t * wait_set) + { + (void) wait_set; + return buffer_->has_data(); + } + + void + store_intra_process_request( + uint64_t intra_process_client_id, + RequestCallbackPair request) + { + buffer_->add(std::make_pair(intra_process_client_id, std::move(request))); + gc_.trigger(); + invoke_on_new_request(); + } + + std::shared_ptr + take_data() + { + auto data = std::make_shared(std::move(buffer_->consume())); + return std::static_pointer_cast(data); + } + + void execute(std::shared_ptr & data) + { + auto ptr = std::static_pointer_cast(data); + + uint64_t intra_process_client_id = ptr->first; + SharedRequest & typed_request = ptr->second.first; + CallbackInfoVariant & value = ptr->second.second; + + SharedResponse response = any_callback_.dispatch(nullptr, nullptr, std::move(typed_request)); + + if (response) { + std::unique_lock lock(reentrant_mutex_); + + auto client_it = clients_.find(intra_process_client_id); + + if (client_it == clients_.end()) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling intra_process_service_send_response for invalid or no " + "longer existing client id"); + return; + } + + auto client_intra_process_base = client_it->second.lock(); + + if (client_intra_process_base) { + auto client = std::dynamic_pointer_cast< + rclcpp::experimental::ClientIntraProcess>( + client_intra_process_base); + client->store_intra_process_response( + std::make_pair(std::move(response), std::move(value))); + } else { + clients_.erase(client_it); + } + } + } + +protected: + using BufferUniquePtr = + typename rclcpp::experimental::buffers::ServiceIntraProcessBuffer< + ClientIDtoRequest>::UniquePtr; + + BufferUniquePtr buffer_; + + AnyServiceCallback any_callback_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp new file mode 100644 index 0000000000..a75a83226f --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp @@ -0,0 +1,211 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rcl/error_handling.h" +#include "rmw/impl/cpp/demangle.hpp" + +#include "rclcpp/context.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" +#include "rclcpp/guard_condition.hpp" +#include "rclcpp/logging.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/type_support_decl.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +class ServiceIntraProcessBase : public rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBase) + + enum class EntityType : std::size_t + { + Service, + }; + + RCLCPP_PUBLIC + ServiceIntraProcessBase( + rclcpp::Context::SharedPtr context, + const std::string & service_name, + const rclcpp::QoS & qos_profile) + : gc_(context), service_name_(service_name), qos_profile_(qos_profile) + {} + + virtual ~ServiceIntraProcessBase() = default; + + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() {return 1;} + + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set); + + virtual bool + is_ready(rcl_wait_set_t * wait_set) = 0; + + virtual + std::shared_ptr + take_data() = 0; + + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void)id; + return take_data(); + } + + virtual void + execute(std::shared_ptr & data) = 0; + + RCLCPP_PUBLIC + const char * + get_service_name() const; + + RCLCPP_PUBLIC + QoS + get_actual_qos() const; + + RCLCPP_PUBLIC + void + add_intra_process_client( + rclcpp::experimental::ClientIntraProcessBase::SharedPtr client, + uint64_t client_id); + + /// Set a callback to be called when each new request arrives. + /** + * The callback receives a size_t which is the number of requests received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if requests were received before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bound to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new request is received. + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Service)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::ServiceIntraProcessBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::ServiceIntraProcessBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(reentrant_mutex_); + on_new_request_callback_ = new_callback; + + if (unread_count_ > 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { + on_new_request_callback_(unread_count_); + } else { + // Use qos profile depth as upper bound for unread_count_ + on_new_request_callback_(std::min(unread_count_, qos_profile_.depth())); + } + unread_count_ = 0; + } + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(reentrant_mutex_); + on_new_request_callback_ = nullptr; + } + +protected: + std::recursive_mutex reentrant_mutex_; + rclcpp::GuardCondition gc_; + std::function on_new_request_callback_ {nullptr}; + size_t unread_count_{0}; + + void + invoke_on_new_request() + { + std::lock_guard lock(reentrant_mutex_); + if (on_new_request_callback_) { + on_new_request_callback_(1); + } else { + unread_count_++; + } + } + + using ClientMap = + std::unordered_map; + + ClientMap clients_; + +private: + std::string service_name_; + QoS qos_profile_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ diff --git a/rclcpp/include/rclcpp/intra_process_setting.hpp b/rclcpp/include/rclcpp/intra_process_setting.hpp index 8e4b44eb64..5207622592 100644 --- a/rclcpp/include/rclcpp/intra_process_setting.hpp +++ b/rclcpp/include/rclcpp/intra_process_setting.hpp @@ -18,12 +18,12 @@ namespace rclcpp { -/// Used as argument in create_publisher and create_subscriber. +/// Used as argument when creating publishers, subscriptions, clients and services. enum class IntraProcessSetting { - /// Explicitly enable intraprocess comm at publisher/subscription level. + /// Explicitly enable intraprocess comm. Enable, - /// Explicitly disable intraprocess comm at publisher/subscription level. + /// Explicitly disable intraprocess comm. Disable, /// Take intraprocess configuration from the node. NodeDefault diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 65b8797716..27b326dbf8 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -245,6 +245,7 @@ class Node : public std::enable_shared_from_this * \param[in] service_name The topic to service on. * \param[in] qos_profile rmw_qos_profile_t Quality of service profile for client. * \param[in] group Callback group to call the service. + * \param[in] ipc_setting Intra-process communication setting for the client. * \return Shared pointer to the created client. */ template @@ -252,7 +253,8 @@ class Node : public std::enable_shared_from_this create_client( const std::string & service_name, const rmw_qos_profile_t & qos_profile = rmw_qos_profile_services_default, - rclcpp::CallbackGroup::SharedPtr group = nullptr); + rclcpp::CallbackGroup::SharedPtr group = nullptr, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault); /// Create and return a Service. /** @@ -260,6 +262,7 @@ class Node : public std::enable_shared_from_this * \param[in] callback User-defined callback function. * \param[in] qos_profile rmw_qos_profile_t Quality of service profile for client. * \param[in] group Callback group to call the service. + * \param[in] ipc_setting Intra-process communication setting for the service. * \return Shared pointer to the created service. */ template @@ -268,7 +271,8 @@ class Node : public std::enable_shared_from_this const std::string & service_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile = rmw_qos_profile_services_default, - rclcpp::CallbackGroup::SharedPtr group = nullptr); + rclcpp::CallbackGroup::SharedPtr group = nullptr, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault); /// Create and return a GenericPublisher. /** diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 086c2bb17c..6ed14c25c3 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -125,7 +125,8 @@ typename Client::SharedPtr Node::create_client( const std::string & service_name, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting) { return rclcpp::create_client( node_base_, @@ -133,7 +134,8 @@ Node::create_client( node_services_, extend_name_with_sub_namespace(service_name, this->get_sub_namespace()), qos_profile, - group); + group, + ipc_setting); } template @@ -142,7 +144,8 @@ Node::create_service( const std::string & service_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, - rclcpp::CallbackGroup::SharedPtr group) + rclcpp::CallbackGroup::SharedPtr group, + rclcpp::IntraProcessSetting ipc_setting) { return rclcpp::create_service( node_base_, @@ -150,7 +153,8 @@ Node::create_service( extend_name_with_sub_namespace(service_name, this->get_sub_namespace()), std::forward(callback), qos_profile, - group); + group, + ipc_setting); } template diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 78b5700611..c4abacc82a 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -182,7 +182,7 @@ class Publisher : public PublisherBase (void)options; // If needed, setup intra process communication. - if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { + if (rclcpp::detail::resolve_use_intra_process(options_.use_intra_process_comm, *node_base)) { auto context = node_base->get_context(); // Get the intra process manager instance for this context. auto ipm = context->get_sub_context(); diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 3f500eaa09..9731331136 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -35,8 +35,12 @@ #include "rclcpp/any_service_callback.hpp" #include "rclcpp/detail/cpp_callback_trampoline.hpp" +#include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" +#include "rclcpp/experimental/intra_process_manager.hpp" +#include "rclcpp/experimental/service_intra_process.hpp" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/qos.hpp" @@ -46,13 +50,19 @@ namespace rclcpp { +namespace node_interfaces +{ +class NodeBaseInterface; +} // namespace node_interfaces + class ServiceBase { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ServiceBase) RCLCPP_PUBLIC - explicit ServiceBase(std::shared_ptr node_handle); + explicit ServiceBase( + std::shared_ptr node_base); RCLCPP_PUBLIC virtual ~ServiceBase(); @@ -160,6 +170,15 @@ class ServiceBase rclcpp::QoS get_request_subscription_actual_qos() const; + /// Return the waitable for intra-process + /** + * \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup. + * \throws std::runtime_error if the intra process manager is destroyed + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_intra_process_waitable(); + /// Set a callback to be called when each new request is received. /** * The callback receives a size_t which is the number of requests received @@ -260,7 +279,20 @@ class ServiceBase void set_on_new_request_callback(rcl_event_callback_t callback, const void * user_data); + using IntraProcessManagerWeakPtr = + std::weak_ptr; + + /// Implementation detail. + RCLCPP_PUBLIC + void + setup_intra_process( + uint64_t intra_process_service_id, + IntraProcessManagerWeakPtr weak_ipm); + + std::shared_ptr service_intra_process_; + std::shared_ptr node_handle_; + std::shared_ptr context_; std::shared_ptr service_handle_; bool owns_rcl_handle_ = true; @@ -269,6 +301,11 @@ class ServiceBase std::atomic in_use_by_wait_set_{false}; + std::recursive_mutex ipc_mutex_; + bool use_intra_process_{false}; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t intra_process_service_id_; + std::recursive_mutex callback_mutex_; std::function on_new_request_callback_{nullptr}; }; @@ -301,13 +338,15 @@ class Service * \param[in] service_name Name of the topic to publish to. * \param[in] any_callback User defined callback to call when a client request is received. * \param[in] service_options options for the subscription. + * \param[in] ipc_setting Intra-process communication setting for the service. */ Service( - std::shared_ptr node_handle, + std::shared_ptr node_base, const std::string & service_name, AnyServiceCallback any_callback, - rcl_service_options_t & service_options) - : ServiceBase(node_handle), any_callback_(any_callback) + rcl_service_options_t & service_options, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) + : ServiceBase(node_base), any_callback_(any_callback) { using rosidl_typesupport_cpp::get_service_type_support_handle; auto service_type_support_handle = get_service_type_support_handle(); @@ -329,7 +368,7 @@ class Service rcl_ret_t ret = rcl_service_init( service_handle_.get(), - node_handle.get(), + node_handle_.get(), service_type_support_handle, service_name.c_str(), &service_options); @@ -354,6 +393,11 @@ class Service #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif + + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_service(); + } } /// Default constructor. @@ -365,12 +409,14 @@ class Service * \param[in] node_handle NodeBaseInterface pointer that is used in part of the setup. * \param[in] service_handle service handle. * \param[in] any_callback User defined callback to call when a client request is received. + * \param[in] ipc_setting Intra-process communication setting for the service. */ Service( - std::shared_ptr node_handle, + std::shared_ptr node_base, std::shared_ptr service_handle, - AnyServiceCallback any_callback) - : ServiceBase(node_handle), + AnyServiceCallback any_callback, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) + : ServiceBase(node_base), any_callback_(any_callback) { // check if service handle was initialized @@ -389,6 +435,11 @@ class Service #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif + + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_service(); + } } /// Default constructor. @@ -400,12 +451,14 @@ class Service * \param[in] node_handle NodeBaseInterface pointer that is used in part of the setup. * \param[in] service_handle service handle. * \param[in] any_callback User defined callback to call when a client request is received. + * \param[in] ipc_setting Intra-process communication setting for the service. */ Service( - std::shared_ptr node_handle, + std::shared_ptr node_base, rcl_service_t * service_handle, - AnyServiceCallback any_callback) - : ServiceBase(node_handle), + AnyServiceCallback any_callback, + rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault) + : ServiceBase(node_base), any_callback_(any_callback) { // check if service handle was initialized @@ -426,12 +479,28 @@ class Service #ifndef TRACETOOLS_DISABLED any_callback_.register_callback_for_tracing(); #endif + // Setup intra process if requested. + if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) { + create_intra_process_service(); + } } Service() = delete; virtual ~Service() { + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a service."); + return; + } + ipm->remove_service(intra_process_service_id_); } /// Take the next request from the service. @@ -486,6 +555,40 @@ class Service } } + void + create_intra_process_service() + { + // Check if the QoS is compatible with intra-process. + auto qos_profile = get_request_subscription_actual_qos(); + + if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { + throw std::invalid_argument( + "intraprocess communication allowed only with keep last history qos policy"); + } + if (qos_profile.depth() == 0) { + throw std::invalid_argument( + "intraprocess communication is not allowed with 0 depth qos policy"); + } + if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { + throw std::invalid_argument( + "intraprocess communication allowed only with volatile durability"); + } + + // Create a ServiceIntraProcess which will be given to the intra-process manager. + using ServiceIntraProcessT = rclcpp::experimental::ServiceIntraProcess; + + service_intra_process_ = std::make_shared( + any_callback_, + context_, + this->get_service_name(), + qos_profile); + + using rclcpp::experimental::IntraProcessManager; + auto ipm = context_->get_sub_context(); + uint64_t intra_process_service_id = ipm->add_intra_process_service(service_intra_process_); + this->setup_intra_process(intra_process_service_id, ipm); + } + private: RCLCPP_DISABLE_COPY(Service) diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 11bf9c6e43..6f02e59c05 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -179,7 +179,7 @@ class Subscription : public SubscriptionBase } // Setup intra process publishing if requested. - if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { + if (rclcpp::detail::resolve_use_intra_process(options_.use_intra_process_comm, *node_base)) { using rclcpp::detail::resolve_intra_process_buffer_type; // Check if the QoS is compatible with intra-process. diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index f21f27e864..0052f881a4 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -247,7 +247,7 @@ class SubscriptionBase : public std::enable_shared_from_this using IntraProcessManagerWeakPtr = std::weak_ptr; - /// Implemenation detail. + /// Implementation detail. RCLCPP_PUBLIC void setup_intra_process( diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index 17cc68e153..689a941c3a 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -70,6 +70,19 @@ ClientBase::~ClientBase() clear_on_new_response_callback(); // Make sure the client handle is destructed as early as possible and before the node handle client_handle_.reset(); + + std::lock_guard lock(ipc_mutex_); + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a client."); + return; + } + ipm->remove_client(intra_process_client_id_); } bool @@ -239,6 +252,37 @@ ClientBase::get_response_subscription_actual_qos() const return response_subscription_qos; } +void +ClientBase::setup_intra_process( + uint64_t intra_process_client_id, + IntraProcessManagerWeakPtr weak_ipm) +{ + std::lock_guard lock(ipc_mutex_); + weak_ipm_ = weak_ipm; + use_intra_process_ = true; + intra_process_client_id_ = intra_process_client_id; +} + +rclcpp::Waitable::SharedPtr +ClientBase::get_intra_process_waitable() +{ + std::lock_guard lock(ipc_mutex_); + // If not using intra process, shortcut to nullptr. + if (!use_intra_process_) { + return nullptr; + } + // Get the intra process manager. + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "ClientBase::get_intra_process_waitable() called " + "after destruction of intra process manager"); + } + + // Use the id to retrieve the intra-process client from the intra-process manager. + return ipm->get_client_intra_process(intra_process_client_id_); +} + void ClientBase::set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data) { diff --git a/rclcpp/src/rclcpp/client_intra_process_base.cpp b/rclcpp/src/rclcpp/client_intra_process_base.cpp new file mode 100644 index 0000000000..b445f32937 --- /dev/null +++ b/rclcpp/src/rclcpp/client_intra_process_base.cpp @@ -0,0 +1,36 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" +#include "rclcpp/experimental/client_intra_process_base.hpp" + +using rclcpp::experimental::ClientIntraProcessBase; + +void +ClientIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); +} + +const char * +ClientIntraProcessBase::get_service_name() const +{ + return service_name_.c_str(); +} + +rclcpp::QoS +ClientIntraProcessBase::get_actual_qos() const +{ + return qos_profile_; +} diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index efce4afeaf..48a8cfe8e4 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -25,12 +25,6 @@ namespace experimental static std::atomic _next_unique_id {1}; -IntraProcessManager::IntraProcessManager() -{} - -IntraProcessManager::~IntraProcessManager() -{} - uint64_t IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) { @@ -82,6 +76,73 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su return sub_id; } +uint64_t +IntraProcessManager::add_intra_process_client(ClientIntraProcessBase::SharedPtr client) +{ + std::unique_lock lock(mutex_); + + uint64_t client_id = IntraProcessManager::get_next_unique_id(); + clients_[client_id] = client; + + // adds the client to the matchable service + for (auto & pair : services_) { + auto intra_process_service = pair.second.lock(); + if (!intra_process_service) { + continue; + } + if (can_communicate(client, intra_process_service)) { + uint64_t service_id = pair.first; + clients_to_services_.emplace(client_id, service_id); + intra_process_service->add_intra_process_client(client, client_id); + break; + } + } + + return client_id; +} + +uint64_t +IntraProcessManager::add_intra_process_service(ServiceIntraProcessBase::SharedPtr service) +{ + std::unique_lock lock(mutex_); + + // First check if we have already a service registered with same service name + // Todo: Open issue about this not being enforced with normal services + auto it = services_.begin(); + + while (it != services_.end()) { + auto srv = it->second.lock(); + if (srv) { + if (srv->get_service_name() == service->get_service_name()) { + throw std::runtime_error( + "Can't have multiple services with same service name."); + } + it++; + } else { + it = services_.erase(it); + } + } + + uint64_t service_id = IntraProcessManager::get_next_unique_id(); + services_[service_id] = service; + + // adds the service to the matchable clients + for (auto & pair : clients_) { + auto client = pair.second.lock(); + if (!client) { + continue; + } + if (can_communicate(client, service)) { + uint64_t client_id = pair.first; + clients_to_services_.emplace(client_id, service_id); + + service->add_intra_process_client(client, client_id); + } + } + + return service_id; +} + void IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) { @@ -115,6 +176,33 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) pub_to_subs_.erase(intra_process_publisher_id); } +void +IntraProcessManager::remove_client(uint64_t intra_process_client_id) +{ + std::unique_lock lock(mutex_); + + clients_.erase(intra_process_client_id); + clients_to_services_.erase(intra_process_client_id); +} + +void +IntraProcessManager::remove_service(uint64_t intra_process_service_id) +{ + std::unique_lock lock(mutex_); + + services_.erase(intra_process_service_id); + + auto it = clients_to_services_.begin(); + + while (it != clients_to_services_.end()) { + if (it->second == intra_process_service_id) { + it = clients_to_services_.erase(it); + } else { + it++; + } + } +} + bool IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const { @@ -172,6 +260,58 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc } } +ServiceIntraProcessBase::SharedPtr +IntraProcessManager::get_service_intra_process(uint64_t intra_process_service_id) +{ + std::shared_lock lock(mutex_); + + auto service_it = services_.find(intra_process_service_id); + if (service_it == services_.end()) { + return nullptr; + } else { + auto service = service_it->second.lock(); + if (service) { + return service; + } else { + services_.erase(service_it); + return nullptr; + } + } +} + +bool +IntraProcessManager::service_is_available(uint64_t intra_process_client_id) +{ + std::shared_lock lock(mutex_); + + auto service_it = clients_to_services_.find(intra_process_client_id); + + if (service_it != clients_to_services_.end()) { + // A server matching the client has been found + return true; + } + return false; +} + +ClientIntraProcessBase::SharedPtr +IntraProcessManager::get_client_intra_process(uint64_t intra_process_client_id) +{ + std::shared_lock lock(mutex_); + + auto client_it = clients_.find(intra_process_client_id); + if (client_it == clients_.end()) { + return nullptr; + } else { + auto client = client_it->second.lock(); + if (client) { + return client; + } else { + clients_.erase(client_it); + return nullptr; + } + } +} + uint64_t IntraProcessManager::get_next_unique_id() { @@ -225,5 +365,25 @@ IntraProcessManager::can_communicate( return true; } +bool +IntraProcessManager::can_communicate( + ClientIntraProcessBase::SharedPtr client, + ServiceIntraProcessBase::SharedPtr service) const +{ + // client and service must be under the same name + if (strcmp(client->get_service_name(), service->get_service_name()) != 0) { + return false; + } + + auto check_result = rclcpp::qos_check_compatible( + client->get_actual_qos(), service->get_actual_qos()); + + if (check_result.compatibility == rclcpp::QoSCompatibility::Error) { + return false; + } + + return true; +} + } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp index 14ab1c82c4..f306f1fd67 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp @@ -40,6 +40,12 @@ NodeServices::add_service( node_base_->get_default_callback_group()->add_service(service_base_ptr); } + auto service_intra_process_waitable = service_base_ptr->get_intra_process_waitable(); + if (nullptr != service_intra_process_waitable) { + // Add to the callback group to be notified about intra-process msgs. + node_base_->get_default_callback_group()->add_waitable(service_intra_process_waitable); + } + // Notify the executor that a new service was created using the parent Node. auto & node_gc = node_base_->get_notify_guard_condition(); try { @@ -65,6 +71,12 @@ NodeServices::add_client( node_base_->get_default_callback_group()->add_client(client_base_ptr); } + auto client_intra_process_waitable = client_base_ptr->get_intra_process_waitable(); + if (nullptr != client_intra_process_waitable) { + // Add to the callback group to be notified about intra-process msgs. + node_base_->get_default_callback_group()->add_waitable(client_intra_process_waitable); + } + // Notify the executor that a new client was created using the parent Node. auto & node_gc = node_base_->get_notify_guard_condition(); try { diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index 7bc580ce68..9e1cbe60f6 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -21,20 +21,36 @@ #include #include "rclcpp/any_service_callback.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/macros.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" using rclcpp::ServiceBase; -ServiceBase::ServiceBase(std::shared_ptr node_handle) -: node_handle_(node_handle), - node_logger_(rclcpp::get_node_logger(node_handle_.get())) +ServiceBase::ServiceBase( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base) +: node_handle_(node_base->get_shared_rcl_node_handle()), + context_(node_base->get_context()), + node_logger_(rclcpp::get_node_logger(node_base->get_shared_rcl_node_handle().get())) {} ServiceBase::~ServiceBase() { clear_on_new_request_callback(); + + std::lock_guard lock(ipc_mutex_); + if (!use_intra_process_) { + return; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died before than a service."); + return; + } + ipm->remove_service(intra_process_service_id_); } bool @@ -126,6 +142,37 @@ ServiceBase::get_request_subscription_actual_qos() const return request_subscription_qos; } +void +ServiceBase::setup_intra_process( + uint64_t intra_process_service_id, + IntraProcessManagerWeakPtr weak_ipm) +{ + std::lock_guard lock(ipc_mutex_); + intra_process_service_id_ = intra_process_service_id; + weak_ipm_ = weak_ipm; + use_intra_process_ = true; +} + +rclcpp::Waitable::SharedPtr +ServiceBase::get_intra_process_waitable() +{ + std::lock_guard lock(ipc_mutex_); + // If not using intra process, shortcut to nullptr. + if (!use_intra_process_) { + return nullptr; + } + // Get the intra process manager. + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "ServiceBase::get_intra_process_waitable() called " + "after destruction of intra process manager"); + } + + // Use the id to retrieve the intra-process service from the intra-process manager. + return ipm->get_service_intra_process(intra_process_service_id_); +} + void ServiceBase::set_on_new_request_callback(rcl_event_callback_t callback, const void * user_data) { diff --git a/rclcpp/src/rclcpp/service_intra_process_base.cpp b/rclcpp/src/rclcpp/service_intra_process_base.cpp new file mode 100644 index 0000000000..2ccb38779c --- /dev/null +++ b/rclcpp/src/rclcpp/service_intra_process_base.cpp @@ -0,0 +1,45 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" +#include "rclcpp/experimental/service_intra_process_base.hpp" + +using rclcpp::experimental::ServiceIntraProcessBase; + +void +ServiceIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); +} + +const char * +ServiceIntraProcessBase::get_service_name() const +{ + return service_name_.c_str(); +} + +rclcpp::QoS +ServiceIntraProcessBase::get_actual_qos() const +{ + return qos_profile_; +} + +void +ServiceIntraProcessBase::add_intra_process_client( + rclcpp::experimental::ClientIntraProcessBase::SharedPtr client, + uint64_t client_id) +{ + std::unique_lock lock(reentrant_mutex_); + clients_[client_id] = client; +} diff --git a/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp b/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp index ea55a1aac2..9a6353e7d6 100644 --- a/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp +++ b/rclcpp/test/rclcpp/node_interfaces/test_node_services.cpp @@ -30,7 +30,7 @@ class TestService : public rclcpp::ServiceBase { public: explicit TestService(rclcpp::Node * node) - : rclcpp::ServiceBase(node->get_node_base_interface()->get_shared_rcl_node_handle()) {} + : rclcpp::ServiceBase(node->get_node_base_interface()) {} std::shared_ptr create_request() override {return nullptr;} std::shared_ptr create_request_header() override {return nullptr;} @@ -41,7 +41,8 @@ class TestClient : public rclcpp::ClientBase { public: explicit TestClient(rclcpp::Node * node) - : rclcpp::ClientBase(node->get_node_base_interface().get(), node->get_node_graph_interface()) {} + : rclcpp::ClientBase(node->get_node_base_interface().get(), node->get_node_graph_interface()) + {} std::shared_ptr create_response() override {return nullptr;} std::shared_ptr create_request_header() override {return nullptr;} diff --git a/rclcpp/test/rclcpp/test_client.cpp b/rclcpp/test/rclcpp/test_client.cpp index 7cb9b0af65..66c3d7a813 100644 --- a/rclcpp/test/rclcpp/test_client.cpp +++ b/rclcpp/test/rclcpp/test_client.cpp @@ -109,7 +109,8 @@ TEST_F(TestClient, construction_with_free_function) { node->get_node_services_interface(), "service", rmw_qos_profile_services_default, - nullptr); + nullptr, + rclcpp::IntraProcessSetting::Disable); } { ASSERT_THROW( @@ -120,7 +121,8 @@ TEST_F(TestClient, construction_with_free_function) { node->get_node_services_interface(), "invalid_?service", rmw_qos_profile_services_default, - nullptr); + nullptr, + rclcpp::IntraProcessSetting::Disable); }, rclcpp::exceptions::InvalidServiceNameError); } } diff --git a/rclcpp/test/rclcpp/test_externally_defined_services.cpp b/rclcpp/test/rclcpp/test_externally_defined_services.cpp index 563f7f6ec3..3938c847c7 100644 --- a/rclcpp/test/rclcpp/test_externally_defined_services.cpp +++ b/rclcpp/test/rclcpp/test_externally_defined_services.cpp @@ -66,7 +66,7 @@ TEST_F(TestExternallyDefinedServices, extern_defined_uninitialized) { // expect fail try { rclcpp::Service( - node_handle->get_node_base_interface()->get_shared_rcl_node_handle(), + node_handle->get_node_base_interface(), &service_handle, cb); } catch (const std::runtime_error &) { SUCCEED(); @@ -97,7 +97,7 @@ TEST_F(TestExternallyDefinedServices, extern_defined_initialized) { try { rclcpp::Service( - node_handle->get_node_base_interface()->get_shared_rcl_node_handle(), + node_handle->get_node_base_interface(), &service_handle, cb); } catch (const std::runtime_error &) { FAIL(); @@ -137,7 +137,7 @@ TEST_F(TestExternallyDefinedServices, extern_defined_destructor) { { // Call constructor rclcpp::Service srv_cpp( - node_handle->get_node_base_interface()->get_shared_rcl_node_handle(), + node_handle->get_node_base_interface(), &service_handle, cb); // Call destructor } diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 45d916b004..70718bb313 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -15,9 +15,12 @@ #include #include +#include #include #include +#include #include +#include // NOLINT #include #define RCLCPP_BUILDING_LIBRARY 1 @@ -304,6 +307,98 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< } }; +class ClientIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcessBase) + + const char * + get_service_name() const + { + return nullptr; + } + + QoS get_actual_qos() const + { + QoS qos(0); + return qos; + } +}; + +template +class ClientIntraProcess : public ClientIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcess) +}; + +class ServiceIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBase) + + void + add_intra_process_client( + rclcpp::experimental::mock::ClientIntraProcessBase::SharedPtr client, + uint64_t client_id) + { + (void)client; + (void)client_id; + } + + const char * + get_service_name() const + { + return nullptr; + } + + QoS get_actual_qos() const + { + QoS qos(0); + return qos; + } +}; + +template +class ServiceIntraProcess : public ServiceIntraProcessBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcess) + + using SharedRequest = typename ServiceT::Request::SharedPtr; + using SharedResponse = typename ServiceT::Response::SharedPtr; + + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + using CallbackTypeValueVariant = std::tuple; + using CallbackWithRequestTypeValueVariant = std::tuple< + CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>; + + using CallbackInfoVariant = std::variant< + std::promise, + CallbackTypeValueVariant, + CallbackWithRequestTypeValueVariant>; + + using RequestCallbackPair = std::pair; + using ClientIDtoRequest = std::pair; + + void + store_intra_process_request( + uint64_t intra_process_client_id, + RequestCallbackPair request) + { + (void)intra_process_client_id; + (void)request; + } +}; + } // namespace mock } // namespace experimental } // namespace rclcpp @@ -314,6 +409,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_HPP_ +#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase @@ -321,12 +420,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< #define SubscriptionIntraProcessBase mock::SubscriptionIntraProcessBase #define SubscriptionIntraProcessBuffer mock::SubscriptionIntraProcessBuffer #define SubscriptionIntraProcess mock::SubscriptionIntraProcess +#define ServiceIntraProcessBase mock::ServiceIntraProcessBase +#define ServiceIntraProcess mock::ServiceIntraProcess +#define ClientIntraProcessBase mock::ClientIntraProcessBase +#define ClientIntraProcess mock::ClientIntraProcess #include "../src/rclcpp/intra_process_manager.cpp" // NOLINT #undef Publisher #undef PublisherBase #undef IntraProcessBuffer #undef SubscriptionIntraProcessBase #undef SubscriptionIntraProcess +#undef ServiceIntraProcessBase +#undef ServiceIntraProcess +#undef ClientIntraProcessBase +#undef ClientIntraProcess using ::testing::_; using ::testing::UnorderedElementsAre; diff --git a/rclcpp/test/rclcpp/test_service.cpp b/rclcpp/test/rclcpp/test_service.cpp index 1dbb8ca9e4..90e535cb7a 100644 --- a/rclcpp/test/rclcpp/test_service.cpp +++ b/rclcpp/test/rclcpp/test_service.cpp @@ -19,6 +19,7 @@ #include #include "rclcpp/exceptions.hpp" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/rclcpp.hpp" #include "../mocking_utils/patch.hpp" @@ -165,18 +166,28 @@ TEST_F(TestService, basic_public_getters) { rcl_service_options_t service_options = rcl_service_get_default_options(); const rosidl_service_type_support_t * ts = rosidl_typesupport_cpp::get_service_type_support_handle(); + auto node_base_interface = node_handle_int->get_node_base_interface(); rcl_ret_t ret = rcl_service_init( &service_handle, - node_handle_int->get_node_base_interface()->get_rcl_node_handle(), + node_base_interface->get_rcl_node_handle(), ts, "base_node_service", &service_options); if (ret != RCL_RET_OK) { FAIL(); return; } rclcpp::AnyServiceCallback cb; + + + rclcpp::IntraProcessSetting ipc_setting; + if (node_base_interface->get_use_intra_process_default()) { + ipc_setting = rclcpp::IntraProcessSetting::Enable; + } else { + ipc_setting = rclcpp::IntraProcessSetting::Disable; + } + const rclcpp::Service base( - node_handle_int->get_node_base_interface()->get_shared_rcl_node_handle(), - &service_handle, cb); + node_handle_int->get_node_base_interface(), + &service_handle, cb, ipc_setting); // Use get_service_handle specific to const service std::shared_ptr const_service_handle = base.get_service_handle(); EXPECT_NE(nullptr, const_service_handle); diff --git a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp index 51de8eab07..f06b496037 100644 --- a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp +++ b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp @@ -37,6 +37,7 @@ #include "rcl_lifecycle/rcl_lifecycle.h" #include "rcl_lifecycle/transition_map.h" +#include "rclcpp/intra_process_setting.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/node_interfaces/node_services_interface.hpp" @@ -83,6 +84,13 @@ class LifecycleNode::LifecycleNodeInterfaceImpl state_machine_options.enable_com_interface = enable_communication_interface; state_machine_options.allocator = node_options->allocator; + rclcpp::IntraProcessSetting ipc_setting; + if (node_base_interface_->get_use_intra_process_default()) { + ipc_setting = rclcpp::IntraProcessSetting::Enable; + } else { + ipc_setting = rclcpp::IntraProcessSetting::Disable; + } + // The call to initialize the state machine takes // currently five different typesupports for all publishers/services // created within the RCL_LIFECYCLE structure. @@ -114,9 +122,10 @@ class LifecycleNode::LifecycleNodeInterfaceImpl any_cb.set(std::move(cb)); srv_change_state_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_change_state, - any_cb); + any_cb, + ipc_setting); node_services_interface_->add_service( std::dynamic_pointer_cast(srv_change_state_), nullptr); @@ -130,9 +139,10 @@ class LifecycleNode::LifecycleNodeInterfaceImpl any_cb.set(std::move(cb)); srv_get_state_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_state, - any_cb); + any_cb, + ipc_setting); node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_state_), nullptr); @@ -146,9 +156,10 @@ class LifecycleNode::LifecycleNodeInterfaceImpl any_cb.set(std::move(cb)); srv_get_available_states_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_available_states, - any_cb); + any_cb, + ipc_setting); node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_available_states_), nullptr); @@ -163,9 +174,10 @@ class LifecycleNode::LifecycleNodeInterfaceImpl srv_get_available_transitions_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_available_transitions, - any_cb); + any_cb, + ipc_setting); node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_available_transitions_), nullptr); @@ -180,9 +192,10 @@ class LifecycleNode::LifecycleNodeInterfaceImpl srv_get_transition_graph_ = std::make_shared>( - node_base_interface_->get_shared_rcl_node_handle(), + node_base_interface_, &state_machine_.com_interface.srv_get_transition_graph, - any_cb); + any_cb, + ipc_setting); node_services_interface_->add_service( std::dynamic_pointer_cast(srv_get_transition_graph_), nullptr);