Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement Add PROTOSpawner #819

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions webots_ros2_driver/webots_ros2_driver/proto_spawner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python

# Copyright 1996-2023 Cyberbotics Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This process simply sends urdf information to the Spawner through a service."""

from launch.actions import ExecuteProcess


def get_webots_driver_node(event, driver_node):
"""Return the driver node in case the service response is successful."""
if 'success=True' in event.text.decode().strip():
return driver_node
if 'success=False' in event.text.decode().strip():
print('WARNING: the Ros2Supervisor was not able to spawn this URDF robot.')
return


class PROTOSpawner(ExecuteProcess):
def __init__(self, output='log', name=None, proto_path=None, robot_string=None, **kwargs):
message = '{robot: {'

if proto_path:
message += 'proto_path: "' + proto_path + '",'
elif robot_string:
message += 'robot_string: "\\\n' + robot_string + '",'

message += '} }'

command = ['ros2',
'service',
'call',
'/Ros2Supervisor/spawn_urdf_robot',
'webots_ros2_msgs/srv/SpawnProtoRobot',
message]

super().__init__(
output=output,
cmd=command,
**kwargs
)
85 changes: 83 additions & 2 deletions webots_ros2_driver/webots_ros2_driver/ros2_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self):

# Services
self.create_service(SpawnUrdfRobot, 'spawn_urdf_robot', self.__spawn_urdf_robot_callback)
self.create_service(SpawnNodeFromString, 'spawn_node_from_string', self.__spawn_node_from_string_callback)
self.create_service(SpawnNodeFromString, 'spawn_proto_robot', self.__spawn_proto_robot_callback)
# Subscriptions
self.create_subscription(String, 'remove_node', self.__remove_imported_node_callback, qos_profile_services_default)

Expand Down Expand Up @@ -176,7 +176,88 @@ def __spawn_urdf_robot_callback(self, request, response):
response.success = True
return response

def __spawn_node_from_string_callback(self, request, response):
def __spawn_proto_robot_callback(self, request, response):
robot = request.robot
# Choose the conversion according to the input and platform
if robot.proto_path:
if has_shared_folder() or is_wsl():
# Check that the file exists and is an URDF
if not os.path.isfile(robot.proto_path):
sys.exit('Input file "%s" does not exist.' % robot.proto_path)
if not robot.proto_path.endswith('.urdf'):
sys.exit('"%s" is not a URDF file.' % robot.proto_path)

# Read the content of the URDF
with open(robot.urdf_path, 'r') as file:
urdfContent = file.read()
if urdfContent is None:
sys.exit('Could not read the URDF file.')

# Get the package name and parent resource directory from URDF path
split_path = robot.urdf_path.split(os.path.sep)
for i, folder in (list(enumerate(split_path))):
if folder == "share":
package_dir = os.path.sep.join(split_path[:i + 2])
resource_dir = os.path.sep.join(split_path[:i + 3])
break
# On macOS, the resources are copied to shared_folder/package_name/resource_folder
# The path prefix is updated to the path of the shared folder
if has_shared_folder():
shared_package_dir = os.path.join(container_shared_folder(), os.path.basename(package_dir))
shared_resource_dir = os.path.join(shared_package_dir, os.path.basename(resource_dir))
if (not os.path.isdir(shared_package_dir)):
os.mkdir(shared_package_dir)
if (not os.path.isdir(shared_resource_dir)):
shutil.copytree(resource_dir, shared_resource_dir)
relative_path_prefix = os.path.join(host_shared_folder(), os.path.basename(package_dir),
os.path.basename(resource_dir))
# In WSL, the prefix must be converted to WSL path to work in Webots running on native Windows
if is_wsl():
relative_path_prefix = resource_dir
command = ['wslpath', '-w', relative_path_prefix]
relative_path_prefix = subprocess.check_output(command).strip().decode('utf-8').replace('\\', '/')

robot_string = convertUrdfContent(input=urdfContent, robotName=robot_name, normal=normal,
boxCollision=box_collision, initTranslation=robot_translation,
initRotation=robot_rotation, initPos=init_pos,
relativePathPrefix=relative_path_prefix)
else:
robot_string = convertUrdfFile(input=robot.urdf_path, robotName=robot_name, normal=normal,
boxCollision=box_collision, initTranslation=robot_translation,
initRotation=robot_rotation, initPos=init_pos)
elif robot.robot_description:
relative_path_prefix = robot.relative_path_prefix if robot.relative_path_prefix else None
# In WSL, the prefix must be converted to WSL path to work in Webots running on native Windows
if is_wsl() and relative_path_prefix:
command = ['wslpath', '-w', relative_path_prefix]
relative_path_prefix = subprocess.check_output(command).strip().decode('utf-8').replace('\\', '/')
if has_shared_folder() and relative_path_prefix:
# Get the package name and parent resource directory from URDF path
split_path = relative_path_prefix.split(os.path.sep)
for i, folder in (list(enumerate(split_path))):
if folder == "share":
package_dir = os.path.sep.join(split_path[:i + 2])
resource_dir = os.path.sep.join(split_path[:i + 3])
break
# On macOS, the resources are copied to shared_folder/package_name/resource_folder
# The path prefix is updated to the path of the shared folder
shared_package_dir = os.path.join(container_shared_folder(), os.path.basename(package_dir))
shared_resource_dir = os.path.join(shared_package_dir, os.path.basename(resource_dir))
if (not os.path.isdir(shared_package_dir)):
os.mkdir(shared_package_dir)
if (not os.path.isdir(shared_resource_dir)):
shutil.copytree(resource_dir, shared_resource_dir)
relative_path_prefix = os.path.join(host_shared_folder(), os.path.basename(package_dir),
os.path.basename(resource_dir))
robot_string = convertUrdfContent(input=robot.robot_description, robotName=robot_name, normal=normal,
boxCollision=box_collision, initTranslation=robot_translation,
initRotation=robot_rotation, initPos=init_pos,
relativePathPrefix=relative_path_prefix)
else:
self.get_logger().info('Ros2Supervisor can not import a URDF file without a specified "urdf_path" or '
'"robot_description" in the URDFSpawner object.')
response.success = False
return response
object_string = request.data
if object_string == '':
self.get_logger().info('Ros2Supervisor cannot import an empty string.')
Expand Down
2 changes: 2 additions & 0 deletions webots_ros2_msgs/msg/ProtoRobot.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
string proto_path
string robot_string
3 changes: 3 additions & 0 deletions webots_ros2_msgs/srv/SpawnProtoRobot.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UrdfRobot robot
ygoumaz marked this conversation as resolved.
Show resolved Hide resolved
---
bool success