This repository has been archived by the owner on Jan 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #127 from bit-bots/feature/refactor-and-test-team-…
…comm Feature: refactor and test team comm
- Loading branch information
Showing
22 changed files
with
1,695 additions
and
436 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
humanoid_league_team_communication/humanoid_league_team_communication/communication.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from ipaddress import IPv4Address | ||
import socket | ||
from typing import List, Optional | ||
|
||
from rclpy.node import Node | ||
|
||
|
||
class SocketCommunication: | ||
|
||
def __init__(self, node: Node, logger, team_id, robot_id): | ||
self.logger = logger | ||
|
||
self.buffer_size: int = 1024 | ||
self.socket: Optional[socket.socket] = None | ||
self.target_ip: IPv4Address = IPv4Address(node.get_parameter('target_ip').value) | ||
|
||
if self.target_ip.is_loopback: | ||
# local mode on loopback device, bind to port depending on bot id and team id | ||
local_target_ports: List[int] = node.get_parameter('local_target_ports').value | ||
self.target_ports = [port + 10 * team_id for port in local_target_ports] | ||
self.receive_port = self.target_ports[robot_id - 1] | ||
else: | ||
target_port: int = node.get_parameter('target_port').value | ||
receive_port: int = node.get_parameter('receive_port').value | ||
self.target_ports = [target_port] | ||
self.receive_port = receive_port | ||
|
||
def __del__(self): | ||
self.close_connection() | ||
|
||
def is_setup(self) -> bool: | ||
return self.socket is not None | ||
|
||
def establish_connection(self): | ||
if not self.is_setup(): | ||
self.socket = self.get_connection() | ||
|
||
def get_connection(self) -> socket.socket: | ||
self.logger.info(f"Binding to port {self.receive_port} to receive messages") | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | ||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | ||
sock.bind(('0.0.0.0', self.receive_port)) | ||
return sock | ||
|
||
def close_connection(self): | ||
if self.is_setup(): | ||
self.socket.close() | ||
self.logger.info("Connection closed.") | ||
|
||
def receive_message(self) -> Optional[bytes]: | ||
self.assert_is_setup() | ||
msg, _, flags, _ = self.socket.recvmsg(self.buffer_size) | ||
is_message_trucated = flags & socket.MSG_TRUNC | ||
if is_message_trucated: | ||
self.handle_truncated_message() | ||
return None | ||
else: | ||
return msg | ||
|
||
def handle_truncated_message(self): | ||
self.logger.warn( | ||
f"recvmsg flag {socket.MSG_TRUNC} signaling a packet larger than buffer size {self.buffer_size}") | ||
self.buffer_size *= 2 | ||
self.logger.info(f"doubled buffer size to {self.buffer_size}") | ||
|
||
def send_message(self, message): | ||
self.assert_is_setup() | ||
for port in self.target_ports: | ||
self.logger.debug(f'Sending message to {port} on {self.target_ip}') | ||
self.socket.sendto(message, (str(self.target_ip), port)) | ||
|
||
def assert_is_setup(self): | ||
assert self.is_setup(), "Socket is not yet initialized" |
Empty file.
103 changes: 103 additions & 0 deletions
103
...munication/humanoid_league_team_communication/converter/message_to_team_data_converter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
from typing import List, Tuple | ||
|
||
import transforms3d | ||
from geometry_msgs.msg import PoseWithCovariance | ||
from numpy import double | ||
|
||
import humanoid_league_team_communication.robocup_extension_pb2 as Proto | ||
from humanoid_league_msgs.msg import ObstacleRelative, ObstacleRelativeArray, TeamData | ||
|
||
|
||
class MessageToTeamDataConverter: | ||
|
||
def __init__(self, team_mapping, role_mapping, action_mapping, side_mapping): | ||
self.team_mapping = team_mapping | ||
self.role_mapping = role_mapping | ||
self.action_mapping = action_mapping | ||
self.side_mapping = side_mapping | ||
|
||
def convert(self, message: Proto.Message, team_data: TeamData) -> TeamData: | ||
team_data.robot_id = message.current_pose.player_id | ||
team_data.state = message.state | ||
|
||
team_data.robot_position = self.convert_robot_pose(message.current_pose) | ||
team_data.ball_absolute = self.convert_ball_pose(message.ball) | ||
|
||
# @TODO: change TeamData field/type to robots | ||
# see: https://github.com/bit-bots/humanoid_league_misc/issues/125 | ||
team_data.obstacles = self.convert_robots_to_obstacles(message.others, message.other_robot_confidence) | ||
team_data.obstacles.header = team_data.header | ||
|
||
return self.convert_optional_fields(message, team_data) | ||
|
||
def convert_optional_fields(self, message: Proto.Message, team_data: TeamData) -> TeamData: | ||
if hasattr(message, "time_to_ball"): | ||
team_data.time_to_position_at_ball = message.time_to_ball | ||
|
||
return self.convert_strategy(message, team_data) | ||
|
||
def convert_strategy(self, message: Proto.Message, team_data: TeamData) -> TeamData: | ||
if hasattr(message, "role"): | ||
team_data.strategy.role = self.role_mapping[message.role] | ||
if hasattr(message, "action"): | ||
team_data.strategy.action = self.action_mapping[message.action] | ||
if hasattr(message, "offensive_side"): | ||
team_data.strategy.offensive_side = self.side_mapping[message.offensive_side] | ||
|
||
return team_data | ||
|
||
def convert_robots_to_obstacles(self, message_robots: List[Proto.Robot], | ||
message_robot_confidence: List[float]) -> ObstacleRelativeArray: | ||
relative_obstacles = ObstacleRelativeArray() | ||
for index, robot in enumerate(message_robots): | ||
obstacle = ObstacleRelative(player_number=robot.player_id, type=self.team_mapping[robot.team]) | ||
obstacle.pose.pose = self.convert_robot_pose(robot) | ||
|
||
if index < len(message_robot_confidence): | ||
obstacle.pose.confidence = message_robot_confidence[index] | ||
|
||
relative_obstacles.obstacles.append(obstacle) | ||
|
||
return relative_obstacles | ||
|
||
def convert_ball_pose(self, message_ball_pose: Proto.Ball) -> PoseWithCovariance: | ||
ball = PoseWithCovariance() | ||
ball.pose.position.x = message_ball_pose.position.x | ||
ball.pose.position.y = message_ball_pose.position.y | ||
ball.pose.position.z = message_ball_pose.position.z | ||
|
||
if message_ball_pose.covariance: | ||
self.convert_to_row_major_covariance(ball.covariance, message_ball_pose.covariance) | ||
|
||
return ball | ||
|
||
def convert_robot_pose(self, message_robot_pose: Proto.Robot) -> PoseWithCovariance: | ||
robot = PoseWithCovariance() | ||
robot.pose.position.x = message_robot_pose.position.x | ||
robot.pose.position.y = message_robot_pose.position.y | ||
|
||
quaternion = self.convert_to_quat((0, 0, message_robot_pose.position.z)) | ||
robot.pose.orientation.w = quaternion[0] | ||
robot.pose.orientation.x = quaternion[1] | ||
robot.pose.orientation.y = quaternion[2] | ||
robot.pose.orientation.z = quaternion[3] | ||
|
||
self.convert_to_row_major_covariance(robot.covariance, message_robot_pose.covariance) | ||
|
||
return robot | ||
|
||
def convert_to_quat(self, euler_angles: Tuple[float, float, float]): | ||
return transforms3d.euler.euler2quat(*euler_angles) | ||
|
||
def convert_to_row_major_covariance(self, row_major_covariance: List[double], covariance_matrix: Proto.fmat3): | ||
# ROS covariance is row-major 36 x float, while protobuf covariance | ||
# is column-major 9 x float [x, y, θ] | ||
row_major_covariance[0] = covariance_matrix.x.x | ||
row_major_covariance[1] = covariance_matrix.y.x | ||
row_major_covariance[5] = covariance_matrix.z.x | ||
row_major_covariance[6] = covariance_matrix.x.y | ||
row_major_covariance[7] = covariance_matrix.y.y | ||
row_major_covariance[11] = covariance_matrix.z.y | ||
row_major_covariance[30] = covariance_matrix.x.z | ||
row_major_covariance[31] = covariance_matrix.y.z | ||
row_major_covariance[35] = covariance_matrix.z.z |
73 changes: 73 additions & 0 deletions
73
..._communication/humanoid_league_team_communication/converter/robocup_protocol_converter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from enum import IntEnum | ||
|
||
from humanoid_league_team_communication.converter.message_to_team_data_converter import MessageToTeamDataConverter | ||
from humanoid_league_team_communication.converter.state_to_message_converter import StateToMessageConverter | ||
|
||
from humanoid_league_msgs.msg import ObstacleRelative, Strategy | ||
from soccer_vision_attribute_msgs.msg import Robot as RobotAttributes | ||
|
||
import humanoid_league_team_communication.robocup_extension_pb2 as Proto | ||
|
||
|
||
class TeamColor(IntEnum): | ||
BLUE = 0 | ||
RED = 1 | ||
|
||
|
||
class RobocupProtocolConverter: | ||
|
||
def __init__(self, own_team_color: TeamColor): | ||
self.role_mapping = ( | ||
(Proto.Role.ROLE_UNDEFINED, Strategy.ROLE_UNDEFINED), | ||
(Proto.Role.ROLE_IDLING, Strategy.ROLE_IDLING), | ||
(Proto.Role.ROLE_OTHER, Strategy.ROLE_OTHER), | ||
(Proto.Role.ROLE_STRIKER, Strategy.ROLE_STRIKER), | ||
(Proto.Role.ROLE_SUPPORTER, Strategy.ROLE_SUPPORTER), | ||
(Proto.Role.ROLE_DEFENDER, Strategy.ROLE_DEFENDER), | ||
(Proto.Role.ROLE_GOALIE, Strategy.ROLE_GOALIE), | ||
) | ||
self.action_mapping = ( | ||
(Proto.Action.ACTION_UNDEFINED, Strategy.ACTION_UNDEFINED), | ||
(Proto.Action.ACTION_POSITIONING, Strategy.ACTION_POSITIONING), | ||
(Proto.Action.ACTION_GOING_TO_BALL, Strategy.ACTION_GOING_TO_BALL), | ||
(Proto.Action.ACTION_TRYING_TO_SCORE, Strategy.ACTION_TRYING_TO_SCORE), | ||
(Proto.Action.ACTION_WAITING, Strategy.ACTION_WAITING), | ||
(Proto.Action.ACTION_KICKING, Strategy.ACTION_KICKING), | ||
(Proto.Action.ACTION_SEARCHING, Strategy.ACTION_SEARCHING), | ||
(Proto.Action.ACTION_LOCALIZING, Strategy.ACTION_LOCALIZING), | ||
) | ||
self.side_mapping = ( | ||
(Proto.OffensiveSide.SIDE_UNDEFINED, Strategy.SIDE_UNDEFINED), | ||
(Proto.OffensiveSide.SIDE_LEFT, Strategy.SIDE_LEFT), | ||
(Proto.OffensiveSide.SIDE_MIDDLE, Strategy.SIDE_MIDDLE), | ||
(Proto.OffensiveSide.SIDE_RIGHT, Strategy.SIDE_RIGHT), | ||
) | ||
|
||
self.proto_to_team_data_team_mapping = { | ||
Proto.Team.UNKNOWN_TEAM: ObstacleRelative.ROBOT_UNDEFINED, | ||
Proto.Team.BLUE: ObstacleRelative.ROBOT_CYAN, | ||
Proto.Team.RED: ObstacleRelative.ROBOT_MAGENTA | ||
} | ||
self.state_to_proto_team_mapping = { | ||
RobotAttributes.TEAM_OWN: Proto.Team.RED if own_team_color == TeamColor.RED else Proto.Team.BLUE, | ||
RobotAttributes.TEAM_OPPONENT: Proto.Team.BLUE if own_team_color == TeamColor.RED else Proto.Team.RED, | ||
RobotAttributes.TEAM_UNKNOWN: Proto.Team.UNKNOWN_TEAM, | ||
} | ||
|
||
proto_to_team_data_mappings = { | ||
"team_mapping": self.proto_to_team_data_team_mapping, | ||
"role_mapping": dict(self.role_mapping), | ||
"action_mapping": dict(self.action_mapping), | ||
"side_mapping": dict(self.side_mapping), | ||
} | ||
|
||
reverse_mapping = lambda mapping: dict((b, a) for a, b in mapping) | ||
state_to_proto_mappings = { | ||
"team_mapping": self.state_to_proto_team_mapping, | ||
"role_mapping": reverse_mapping(self.role_mapping), | ||
"action_mapping": reverse_mapping(self.action_mapping), | ||
"side_mapping": reverse_mapping(self.side_mapping), | ||
} | ||
|
||
self.convert_from_message = MessageToTeamDataConverter(**proto_to_team_data_mappings).convert | ||
self.convert_to_message = StateToMessageConverter(**state_to_proto_mappings).convert |
Oops, something went wrong.