diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp index 52f951850..5915541b4 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp @@ -28,6 +28,8 @@ const std::string DestinationRequestTopicName = "destination_requests"; const std::string ModeRequestTopicName = "robot_mode_requests"; const std::string PathRequestTopicName = "robot_path_requests"; const std::string PauseRequestTopicName = "robot_pause_requests"; +const std::string FleetStateUpdateTopicName = "fleet_state_update"; +const std::string FleetLogUpdateTopicName = "fleet_log_update"; const std::string FinalDoorRequestTopicName = "door_requests"; const std::string AdapterDoorRequestTopicName = "adapter_door_requests"; @@ -68,6 +70,8 @@ const std::string InterruptRequestTopicName = "robot_interrupt_request"; const std::string TaskApiRequests = "task_api_requests"; const std::string TaskApiResponses = "task_api_responses"; +const std::string TaskStateUpdateTopicName = "task_state_update"; +const std::string TaskLogUpdateTopicName = "task_log_update"; const std::string ChargingAssignmentsTopicName = "charging_assignments"; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 4cb314264..d26717c3a 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -192,6 +192,15 @@ TaskManagerPtr TaskManager::make( self->_handle_request(request->json_msg, request->request_id); }); + auto reliable_transient_qos = + rclcpp::ServicesQoS().keep_last(10).reliable().transient_local(); + mgr->_task_state_update_pub = + mgr->_context->node()->create_publisher( + TaskStateUpdateTopicName, reliable_transient_qos); + mgr->_task_log_update_pub = + mgr->_context->node()->create_publisher( + TaskLogUpdateTopicName, reliable_transient_qos.keep_last(100)); + const std::vector schemas = { rmf_api_msgs::schemas::task_state, rmf_api_msgs::schemas::task_log, @@ -564,7 +573,7 @@ void TaskManager::ActiveTask::publish_task_state(TaskManager& mgr) static const auto task_update_validator = mgr._make_validator(rmf_api_msgs::schemas::task_state_update); - mgr._validate_and_publish_websocket(task_state_update, task_update_validator); + mgr._validate_and_publish_json(task_state_update, task_update_validator); auto task_log_update = nlohmann::json(); task_log_update["type"] = "task_log_update"; @@ -572,7 +581,7 @@ void TaskManager::ActiveTask::publish_task_state(TaskManager& mgr) static const auto log_update_validator = mgr._make_validator(rmf_api_msgs::schemas::task_log_update); - mgr._validate_and_publish_websocket(task_log_update, log_update_validator); + mgr._validate_and_publish_json(task_log_update, log_update_validator); } //============================================================================== @@ -2053,7 +2062,7 @@ void TaskManager::_schema_loader( } //============================================================================== -void TaskManager::_validate_and_publish_websocket( +void TaskManager::_validate_and_publish_json( const nlohmann::json& msg, const nlohmann::json_schema::json_validator& validator) const { @@ -2068,19 +2077,32 @@ void TaskManager::_validate_and_publish_websocket( return; } - if (!_broadcast_client.has_value()) - return; + if (_broadcast_client.has_value()) + { + const auto client = _broadcast_client->lock(); + if (!client) + { + RCLCPP_ERROR( + _context->node()->get_logger(), + "Unable to lock BroadcastClient within TaskManager of robot [%s]", + _context->name().c_str()); + return; + } + client->publish(msg); + } - const auto client = _broadcast_client->lock(); - if (!client) + if (msg["type"] == "task_state_update") { - RCLCPP_ERROR( - _context->node()->get_logger(), - "Unable to lock BroadcastClient within TaskManager of robot [%s]", - _context->name().c_str()); - return; + TaskStateUpdateMsg update_msg; + update_msg.data = msg.dump(); + _task_state_update_pub->publish(update_msg); + } + else if (msg["type"] == "task_log_update") + { + TaskLogUpdateMsg update_msg; + update_msg.data = msg.dump(); + _task_log_update_pub->publish(update_msg); } - client->publish(msg); } //============================================================================== @@ -2182,7 +2204,7 @@ rmf_task::State TaskManager::_publish_pending_task( static const auto validator = _make_validator(rmf_api_msgs::schemas::task_state_update); - _validate_and_publish_websocket(task_state_update, validator); + _validate_and_publish_json(task_state_update, validator); _pending_task_info[pending.request()] = cache; return pending.finish_state(); @@ -2249,7 +2271,7 @@ void TaskManager::_publish_canceled_pending_task( static const auto validator = _make_validator(rmf_api_msgs::schemas::task_state_update); - _validate_and_publish_websocket(task_state_update, validator); + _validate_and_publish_json(task_state_update, validator); } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 5f1026fa8..630128331 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -32,6 +32,8 @@ #include #include +#include + #include #include @@ -397,6 +399,13 @@ class TaskManager : public std::enable_shared_from_this bool _task_state_update_available = true; std::chrono::steady_clock::time_point _last_update_time; + using TaskStateUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr _task_state_update_pub = + nullptr; + + using TaskLogUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr _task_log_update_pub = nullptr; + // Container to keep track of tasks that have been started by this TaskManager // Use the _register_executed_task() to populate this container. std::vector _executed_task_registry; @@ -554,7 +563,7 @@ class TaskManager : public std::enable_shared_from_this /// Validate and publish a json. This can be used for task /// state and log updates - void _validate_and_publish_websocket( + void _validate_and_publish_json( const nlohmann::json& msg, const nlohmann::json_schema::json_validator& validator) const; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 5b290e45d..ef805da3e 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -1226,151 +1226,160 @@ void FleetUpdateHandle::Implementation::update_fleet() const //============================================================================== void FleetUpdateHandle::Implementation::update_fleet_state() const { - // Publish to API server - if (broadcast_client) + nlohmann::json fleet_state_update_msg; + fleet_state_update_msg["type"] = "fleet_state_update"; + auto& fleet_state_msg = fleet_state_update_msg["data"]; + fleet_state_msg["name"] = name; + auto& robots = fleet_state_msg["robots"]; + robots = std::unordered_map(); + for (const auto& [context, mgr] : task_managers) { - nlohmann::json fleet_state_update_msg; - fleet_state_update_msg["type"] = "fleet_state_update"; - auto& fleet_state_msg = fleet_state_update_msg["data"]; - fleet_state_msg["name"] = name; - auto& robots = fleet_state_msg["robots"]; - robots = std::unordered_map(); - for (const auto& [context, mgr] : task_managers) + const auto& name = context->name(); + nlohmann::json& json = robots[name]; + json["name"] = name; + json["status"] = mgr->robot_status(); + json["task_id"] = mgr->current_task_id().value_or(""); + json["unix_millis_time"] = + std::chrono::duration_cast( + context->now().time_since_epoch()).count(); + json["battery"] = context->current_battery_soc(); + + const auto location_msg = convert_location(*context); + if (location_msg.has_value()) { - const auto& name = context->name(); - nlohmann::json& json = robots[name]; - json["name"] = name; - json["status"] = mgr->robot_status(); - json["task_id"] = mgr->current_task_id().value_or(""); - json["unix_millis_time"] = - std::chrono::duration_cast( - context->now().time_since_epoch()).count(); - json["battery"] = context->current_battery_soc(); - - const auto location_msg = convert_location(*context); - if (location_msg.has_value()) - { - nlohmann::json& location = json["location"]; - location["map"] = location_msg->level_name; - location["x"] = location_msg->x; - location["y"] = location_msg->y; - location["yaw"] = location_msg->yaw; - } + nlohmann::json& location = json["location"]; + location["map"] = location_msg->level_name; + location["x"] = location_msg->x; + location["y"] = location_msg->y; + location["yaw"] = location_msg->yaw; + } - std::lock_guard lock(context->reporting().mutex()); - const auto& issues = context->reporting().open_issues(); - auto& issues_msg = json["issues"]; - issues_msg = std::vector(); - for (const auto& issue : issues) - { - nlohmann::json issue_msg; - issue_msg["category"] = issue->category; - issue_msg["detail"] = issue->detail; - issues_msg.push_back(std::move(issue_msg)); - } + std::lock_guard lock(context->reporting().mutex()); + const auto& issues = context->reporting().open_issues(); + auto& issues_msg = json["issues"]; + issues_msg = std::vector(); + for (const auto& issue : issues) + { + nlohmann::json issue_msg; + issue_msg["category"] = issue->category; + issue_msg["detail"] = issue->detail; + issues_msg.push_back(std::move(issue_msg)); + } - const auto& commission = context->commission(); - nlohmann::json commission_json; - commission_json["dispatch_tasks"] = - commission.is_accepting_dispatched_tasks(); - commission_json["direct_tasks"] = commission.is_accepting_direct_tasks(); - commission_json["idle_behavior"] = - commission.is_performing_idle_behavior(); + const auto& commission = context->commission(); + nlohmann::json commission_json; + commission_json["dispatch_tasks"] = + commission.is_accepting_dispatched_tasks(); + commission_json["direct_tasks"] = commission.is_accepting_direct_tasks(); + commission_json["idle_behavior"] = + commission.is_performing_idle_behavior(); - json["commission"] = commission_json; + json["commission"] = commission_json; - nlohmann::json mutex_groups_json; - std::vector locked_mutex_groups; - for (const auto& g : context->locked_mutex_groups()) - { - locked_mutex_groups.push_back(g.first); - } - mutex_groups_json["locked"] = std::move(locked_mutex_groups); - - std::vector requesting_mutex_groups; - for (const auto& g : context->requesting_mutex_groups()) - { - requesting_mutex_groups.push_back(g.first); - } - mutex_groups_json["requesting"] = std::move(requesting_mutex_groups); + nlohmann::json mutex_groups_json; + std::vector locked_mutex_groups; + for (const auto& g : context->locked_mutex_groups()) + { + locked_mutex_groups.push_back(g.first); + } + mutex_groups_json["locked"] = std::move(locked_mutex_groups); - json["mutex_groups"] = std::move(mutex_groups_json); + std::vector requesting_mutex_groups; + for (const auto& g : context->requesting_mutex_groups()) + { + requesting_mutex_groups.push_back(g.first); } + mutex_groups_json["requesting"] = std::move(requesting_mutex_groups); - try + json["mutex_groups"] = std::move(mutex_groups_json); + } + + try + { + std::unique_lock lock(*update_callback_mutex); + if (update_callback) + update_callback(fleet_state_update_msg); + + // Publish to API server + if (broadcast_client) { static const auto validator = make_validator(rmf_api_msgs::schemas::fleet_state_update); - validator.validate(fleet_state_update_msg); - std::unique_lock lock(*update_callback_mutex); - if (update_callback) - update_callback(fleet_state_update_msg); broadcast_client->publish(fleet_state_update_msg); } - catch (const std::exception& e) - { - RCLCPP_ERROR( - node->get_logger(), - "Malformed outgoing fleet state json message: %s\nMessage:\n%s", - e.what(), - fleet_state_update_msg.dump(2).c_str()); - } + + FleetStateUpdateMsg update_msg; + update_msg.data = fleet_state_update_msg.dump(); + fleet_state_update_pub->publish(update_msg); + } + catch (const std::exception& e) + { + RCLCPP_ERROR( + node->get_logger(), + "Malformed outgoing fleet state json message: %s\nMessage:\n%s", + e.what(), + fleet_state_update_msg.dump(2).c_str()); } } //============================================================================== void FleetUpdateHandle::Implementation::update_fleet_logs() const { - if (broadcast_client) + nlohmann::json fleet_log_update_msg; + fleet_log_update_msg["type"] = "fleet_log_update"; + auto& fleet_log_msg = fleet_log_update_msg["data"]; + fleet_log_msg["name"] = name; + // TODO(MXG): fleet_log_msg["log"] + auto& robots_msg = fleet_log_msg["robots"]; + robots_msg = std::unordered_map(); + for (const auto& [context, _] : task_managers) { - nlohmann::json fleet_log_update_msg; - fleet_log_update_msg["type"] = "fleet_log_update"; - auto& fleet_log_msg = fleet_log_update_msg["data"]; - fleet_log_msg["name"] = name; - // TODO(MXG): fleet_log_msg["log"] - auto& robots_msg = fleet_log_msg["robots"]; - robots_msg = std::unordered_map(); - for (const auto& [context, _] : task_managers) - { - auto robot_log_msg_array = std::vector(); + auto robot_log_msg_array = std::vector(); - std::lock_guard lock(context->reporting().mutex()); - const auto& log = context->reporting().log(); - for (const auto& entry : log_reader.read(log.view())) - robot_log_msg_array.push_back(log_to_json(entry)); + std::lock_guard lock(context->reporting().mutex()); + const auto& log = context->reporting().log(); + for (const auto& entry : log_reader.read(log.view())) + robot_log_msg_array.push_back(log_to_json(entry)); - if (!robot_log_msg_array.empty()) - robots_msg[context->name()] = std::move(robot_log_msg_array); - } + if (!robot_log_msg_array.empty()) + robots_msg[context->name()] = std::move(robot_log_msg_array); + } - if (robots_msg.empty()) - { - // No new logs to report - return; - } + if (robots_msg.empty()) + { + // No new logs to report + return; + } - try - { - static const auto validator = - make_validator(rmf_api_msgs::schemas::fleet_log_update); + try + { + static const auto validator = + make_validator(rmf_api_msgs::schemas::fleet_log_update); - validator.validate(fleet_log_update_msg); + validator.validate(fleet_log_update_msg); - std::unique_lock lock(*update_callback_mutex); - if (update_callback) - update_callback(fleet_log_update_msg); - broadcast_client->publish(fleet_log_update_msg); - } - catch (const std::exception& e) + std::unique_lock lock(*update_callback_mutex); + if (update_callback) + update_callback(fleet_log_update_msg); + + if (broadcast_client) { - RCLCPP_ERROR( - node->get_logger(), - "Malformed outgoing fleet log json message: %s\nMessage:\n%s", - e.what(), - fleet_log_update_msg.dump(2).c_str()); + broadcast_client->publish(fleet_log_update_msg); } + + FleetLogUpdateMsg update_msg; + update_msg.data = fleet_log_update_msg.dump(); + fleet_log_update_pub->publish(update_msg); + } + catch (const std::exception& e) + { + RCLCPP_ERROR( + node->get_logger(), + "Malformed outgoing fleet log json message: %s\nMessage:\n%s", + e.what(), + fleet_log_update_msg.dump(2).c_str()); } } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 7b5800445..e8ef93b91 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -18,6 +18,8 @@ #ifndef SRC__RMF_FLEET_ADAPTER__AGV__INTERNAL_FLEETUPDATEHANDLE_HPP #define SRC__RMF_FLEET_ADAPTER__AGV__INTERNAL_FLEETUPDATEHANDLE_HPP +#include + #include #include @@ -296,6 +298,15 @@ class FleetUpdateHandle::Implementation std::shared_ptr> task_managers = {}; std::shared_ptr broadcast_client = nullptr; + + using FleetStateUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr fleet_state_update_pub = + nullptr; + + using FleetLogUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr fleet_log_update_pub = + nullptr; + // Map uri to schema for validator loader function std::unordered_map schema_dictionary = {}; @@ -531,6 +542,13 @@ class FleetUpdateHandle::Implementation }); } + handle->_pimpl->fleet_state_update_pub = + handle->_pimpl->node->create_publisher( + FleetStateUpdateTopicName, reliable_transient_qos.keep_last(10)); + handle->_pimpl->fleet_log_update_pub = + handle->_pimpl->node->create_publisher( + FleetLogUpdateTopicName, reliable_transient_qos.keep_last(100)); + // Add PerformAction event to deserialization auto validator = handle->_pimpl->deserialization.make_validator_shared( schemas::event_description__perform_action); diff --git a/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp b/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp index fea8c49eb..7765c6469 100644 --- a/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp +++ b/rmf_task_ros2/include/rmf_task_ros2/StandardNames.hpp @@ -34,6 +34,7 @@ const std::string DispatchCommandTopicName = Prefix + "dispatch_request"; const std::string DispatchAckTopicName = Prefix + "dispatch_ack"; const std::string DispatchStatesTopicName = "dispatch_states"; const std::string TaskStatusTopicName = "task_summaries"; +const std::string TaskStateUpdateTopicName = "task_state_update"; } // namespace rmf_task_ros2 diff --git a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp index c8d92f0d0..0db2ab6a0 100644 --- a/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp +++ b/rmf_task_ros2/src/rmf_task_ros2/Dispatcher.cpp @@ -46,6 +46,8 @@ #include #include +#include + #include #include @@ -141,6 +143,10 @@ class Dispatcher::Implementation rclcpp::Subscription::SharedPtr api_request; rclcpp::Publisher::SharedPtr api_response; + using TaskStateUpdateMsg = std_msgs::msg::String; + rclcpp::Publisher::SharedPtr task_state_update_pub = + nullptr; + class ApiMemory { public: @@ -294,9 +300,15 @@ class Dispatcher::Implementation this->handle_dispatch_ack(*msg); }); + task_state_update_pub = node->create_publisher( + rmf_task_ros2::TaskStateUpdateTopicName, + rclcpp::ServicesQoS().keep_last(10).reliable().transient_local()); + if (server_uri) + { broadcast_client = rmf_websocket::BroadcastClient::make( *server_uri, node); + } auctioneer = bidding::Auctioneer::make( node, @@ -794,15 +806,19 @@ class Dispatcher::Implementation } /// Publish failed bid + const auto task_state = + create_task_state_json(dispatch_state, "failed"); + auto task_state_update = _task_state_update_json; + task_state_update["data"] = task_state; if (broadcast_client) { - const auto task_state = - create_task_state_json(dispatch_state, "failed"); - auto task_state_update = _task_state_update_json; - task_state_update["data"] = task_state; broadcast_client->publish(task_state_update); } + TaskStateUpdateMsg update_msg; + update_msg.data = task_state_update.dump(); + task_state_update_pub->publish(update_msg); + auctioneer->ready_for_next_bid(); return; }