Skip to content

Commit

Permalink
Publish fleet and task updates over ROS 2 if websocket is not provided (
Browse files Browse the repository at this point in the history
#383)

* Publish fleet and task updates over ROS 2 if websocket is not provided

Signed-off-by: Aaron Chong <[email protected]>

* Publish over ROS 2 for internal uses by default

Signed-off-by: Aaron Chong <[email protected]>

* Revert line change

Signed-off-by: Aaron Chong <[email protected]>

* Making qos the same

Signed-off-by: Aaron Chong <[email protected]>

* Move validator inside broadcast client logic

Signed-off-by: Aaron Chong <[email protected]>

---------

Signed-off-by: Aaron Chong <[email protected]>
  • Loading branch information
aaronchongth authored Oct 4, 2024
1 parent 6ab83a2 commit e4e82e1
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 134 deletions.
4 changes: 4 additions & 0 deletions rmf_fleet_adapter/include/rmf_fleet_adapter/StandardNames.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const std::string DestinationRequestTopicName = "destination_requests";
const std::string ModeRequestTopicName = "robot_mode_requests";
const std::string PathRequestTopicName = "robot_path_requests";
const std::string PauseRequestTopicName = "robot_pause_requests";
const std::string FleetStateUpdateTopicName = "fleet_state_update";
const std::string FleetLogUpdateTopicName = "fleet_log_update";

const std::string FinalDoorRequestTopicName = "door_requests";
const std::string AdapterDoorRequestTopicName = "adapter_door_requests";
Expand Down Expand Up @@ -68,6 +70,8 @@ const std::string InterruptRequestTopicName = "robot_interrupt_request";

const std::string TaskApiRequests = "task_api_requests";
const std::string TaskApiResponses = "task_api_responses";
const std::string TaskStateUpdateTopicName = "task_state_update";
const std::string TaskLogUpdateTopicName = "task_log_update";

const std::string ChargingAssignmentsTopicName = "charging_assignments";

Expand Down
52 changes: 37 additions & 15 deletions rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ TaskManagerPtr TaskManager::make(
self->_handle_request(request->json_msg, request->request_id);
});

auto reliable_transient_qos =
rclcpp::ServicesQoS().keep_last(10).reliable().transient_local();
mgr->_task_state_update_pub =
mgr->_context->node()->create_publisher<TaskStateUpdateMsg>(
TaskStateUpdateTopicName, reliable_transient_qos);
mgr->_task_log_update_pub =
mgr->_context->node()->create_publisher<TaskLogUpdateMsg>(
TaskLogUpdateTopicName, reliable_transient_qos.keep_last(100));

const std::vector<nlohmann::json> schemas = {
rmf_api_msgs::schemas::task_state,
rmf_api_msgs::schemas::task_log,
Expand Down Expand Up @@ -564,15 +573,15 @@ void TaskManager::ActiveTask::publish_task_state(TaskManager& mgr)

static const auto task_update_validator =
mgr._make_validator(rmf_api_msgs::schemas::task_state_update);
mgr._validate_and_publish_websocket(task_state_update, task_update_validator);
mgr._validate_and_publish_json(task_state_update, task_update_validator);

auto task_log_update = nlohmann::json();
task_log_update["type"] = "task_log_update";
task_log_update["data"] = task_logs;

static const auto log_update_validator =
mgr._make_validator(rmf_api_msgs::schemas::task_log_update);
mgr._validate_and_publish_websocket(task_log_update, log_update_validator);
mgr._validate_and_publish_json(task_log_update, log_update_validator);
}

//==============================================================================
Expand Down Expand Up @@ -2053,7 +2062,7 @@ void TaskManager::_schema_loader(
}

//==============================================================================
void TaskManager::_validate_and_publish_websocket(
void TaskManager::_validate_and_publish_json(
const nlohmann::json& msg,
const nlohmann::json_schema::json_validator& validator) const
{
Expand All @@ -2068,19 +2077,32 @@ void TaskManager::_validate_and_publish_websocket(
return;
}

if (!_broadcast_client.has_value())
return;
if (_broadcast_client.has_value())
{
const auto client = _broadcast_client->lock();
if (!client)
{
RCLCPP_ERROR(
_context->node()->get_logger(),
"Unable to lock BroadcastClient within TaskManager of robot [%s]",
_context->name().c_str());
return;
}
client->publish(msg);
}

const auto client = _broadcast_client->lock();
if (!client)
if (msg["type"] == "task_state_update")
{
RCLCPP_ERROR(
_context->node()->get_logger(),
"Unable to lock BroadcastClient within TaskManager of robot [%s]",
_context->name().c_str());
return;
TaskStateUpdateMsg update_msg;
update_msg.data = msg.dump();
_task_state_update_pub->publish(update_msg);
}
else if (msg["type"] == "task_log_update")
{
TaskLogUpdateMsg update_msg;
update_msg.data = msg.dump();
_task_log_update_pub->publish(update_msg);
}
client->publish(msg);
}

//==============================================================================
Expand Down Expand Up @@ -2182,7 +2204,7 @@ rmf_task::State TaskManager::_publish_pending_task(
static const auto validator =
_make_validator(rmf_api_msgs::schemas::task_state_update);

_validate_and_publish_websocket(task_state_update, validator);
_validate_and_publish_json(task_state_update, validator);

_pending_task_info[pending.request()] = cache;
return pending.finish_state();
Expand Down Expand Up @@ -2249,7 +2271,7 @@ void TaskManager::_publish_canceled_pending_task(
static const auto validator =
_make_validator(rmf_api_msgs::schemas::task_state_update);

_validate_and_publish_websocket(task_state_update, validator);
_validate_and_publish_json(task_state_update, validator);
}

//==============================================================================
Expand Down
11 changes: 10 additions & 1 deletion rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <rmf_fleet_msgs/msg/robot_mode.hpp>
#include <rmf_task_msgs/msg/task_summary.hpp>

#include <std_msgs/msg/string.hpp>

#include <nlohmann/json.hpp>
#include <nlohmann/json-schema.hpp>

Expand Down Expand Up @@ -397,6 +399,13 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>
bool _task_state_update_available = true;
std::chrono::steady_clock::time_point _last_update_time;

using TaskStateUpdateMsg = std_msgs::msg::String;
rclcpp::Publisher<TaskStateUpdateMsg>::SharedPtr _task_state_update_pub =
nullptr;

using TaskLogUpdateMsg = std_msgs::msg::String;
rclcpp::Publisher<TaskLogUpdateMsg>::SharedPtr _task_log_update_pub = nullptr;

// Container to keep track of tasks that have been started by this TaskManager
// Use the _register_executed_task() to populate this container.
std::vector<std::string> _executed_task_registry;
Expand Down Expand Up @@ -554,7 +563,7 @@ class TaskManager : public std::enable_shared_from_this<TaskManager>

/// Validate and publish a json. This can be used for task
/// state and log updates
void _validate_and_publish_websocket(
void _validate_and_publish_json(
const nlohmann::json& msg,
const nlohmann::json_schema::json_validator& validator) const;

Expand Down
Loading

0 comments on commit e4e82e1

Please sign in to comment.