diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index bdde81f9f..43cdff394 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -182,6 +182,10 @@ def __init__( self._node.add_waitable(self) self._logger = self._node.get_logger().get_child('action_client') + self._goal_lock = threading.Lock() + self._cancel_lock = threading.Lock() + self._result_lock = threading.Lock() + def _generate_random_uuid(self): return UUID(uuid=list(uuid.uuid4().bytes)) @@ -210,21 +214,24 @@ def _remove_pending_request(self, future, pending_requests): return None def _remove_pending_goal_request(self, future): - seq = self._remove_pending_request(future, self._pending_goal_requests) - if seq in self._goal_sequence_number_to_goal_id: - del self._goal_sequence_number_to_goal_id[seq] + with self._goal_lock: + seq = self._remove_pending_request(future, self._pending_goal_requests) + if seq in self._goal_sequence_number_to_goal_id: + del self._goal_sequence_number_to_goal_id[seq] def _remove_pending_cancel_request(self, future): - self._remove_pending_request(future, self._pending_cancel_requests) + with self._cancel_lock: + self._remove_pending_request(future, self._pending_cancel_requests) def _remove_pending_result_request(self, future): - seq = self._remove_pending_request(future, self._pending_result_requests) - if seq in self._result_sequence_number_to_goal_id: - goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid) - del self._result_sequence_number_to_goal_id[seq] - # remove feeback_callback if user is aware of result and it's been received - if goal_uuid in self._feedback_callbacks: - del self._feedback_callbacks[goal_uuid] + with self._result_lock: + seq = self._remove_pending_request(future, self._pending_result_requests) + if seq in self._result_sequence_number_to_goal_id: + goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid) + del self._result_sequence_number_to_goal_id[seq] + # remove feeback_callback if user is aware of result and it's been received + if goal_uuid in self._feedback_callbacks: + del self._feedback_callbacks[goal_uuid] # Start Waitable API def is_ready(self, wait_set): @@ -286,45 +293,52 @@ async def execute(self, taken_data): """ if 'goal' in taken_data: sequence_number, goal_response = taken_data['goal'] - if sequence_number in self._goal_sequence_number_to_goal_id: - goal_handle = ClientGoalHandle( - self, - self._goal_sequence_number_to_goal_id[sequence_number], - goal_response) + try: + with self._goal_lock: + goal_id = self._goal_sequence_number_to_goal_id[sequence_number] + pending_goal_request = self._pending_goal_requests[sequence_number] + except KeyError: + self._logger.warning( + 'Ignoring unexpected goal response. There may be more than ' + f"one action server for the action '{self._action_name}'" + ) + else: + goal_handle = ClientGoalHandle(self, goal_id, goal_response) if goal_handle.accepted: goal_uuid = bytes(goal_handle.goal_id.uuid) if goal_uuid in self._goal_handles: raise RuntimeError( - 'Two goals were accepted with the same ID ({})'.format(goal_handle)) + f'Two goals were accepted with the same ID ({goal_handle})') self._goal_handles[goal_uuid] = weakref.ref(goal_handle) - self._pending_goal_requests[sequence_number].set_result(goal_handle) - else: - self._logger.warning( - 'Ignoring unexpected goal response. There may be more than ' - f"one action server for the action '{self._action_name}'" - ) + pending_goal_request.set_result(goal_handle) if 'cancel' in taken_data: sequence_number, cancel_response = taken_data['cancel'] - if sequence_number in self._pending_cancel_requests: - self._pending_cancel_requests[sequence_number].set_result(cancel_response) - else: + try: + with self._cancel_lock: + pending_cancel_request = self._pending_cancel_requests[sequence_number] + except KeyError: self._logger.warning( 'Ignoring unexpected cancel response. There may be more than ' f"one action server for the action '{self._action_name}'" ) + else: + pending_cancel_request.set_result(cancel_response) if 'result' in taken_data: sequence_number, result_response = taken_data['result'] - if sequence_number in self._pending_result_requests: - self._pending_result_requests[sequence_number].set_result(result_response) - else: + try: + with self._result_lock: + pending_result_request = self._pending_result_requests[sequence_number] + except KeyError: self._logger.warning( 'Ignoring unexpected result response. There may be more than ' f"one action server for the action '{self._action_name}'" ) + else: + pending_result_request.set_result(result_response) if 'feedback' in taken_data: feedback_msg = taken_data['feedback'] @@ -400,7 +414,8 @@ def unblock(future): send_goal_future = self.send_goal_async(goal, **kwargs) send_goal_future.add_done_callback(unblock) - event.wait() + if not send_goal_future.done(): + event.wait() if send_goal_future.exception() is not None: raise send_goal_future.exception() @@ -437,22 +452,24 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None): request = self._action_type.Impl.SendGoalService.Request() request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid request.goal = goal - sequence_number = self._client_handle.send_goal_request(request) - if sequence_number in self._pending_goal_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending goal request'.format(sequence_number)) - - if feedback_callback is not None: - # TODO(jacobperron): Move conversion function to a general-use package - goal_uuid = bytes(request.goal_id.uuid) - self._feedback_callbacks[goal_uuid] = feedback_callback - - future = Future() - self._pending_goal_requests[sequence_number] = future - self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id - future.add_done_callback(self._remove_pending_goal_request) - # Add future so executor is aware - self.add_future(future) + + with self._goal_lock: + sequence_number = self._client_handle.send_goal_request(request) + if sequence_number in self._pending_goal_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending goal request'.format(sequence_number)) + + if feedback_callback is not None: + # TODO(jacobperron): Move conversion function to a general-use package + goal_uuid = bytes(request.goal_id.uuid) + self._feedback_callbacks[goal_uuid] = feedback_callback + + future = Future() + self._pending_goal_requests[sequence_number] = future + self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id + future.add_done_callback(self._remove_pending_goal_request) + # Add future so executor is aware + self.add_future(future) return future @@ -475,7 +492,8 @@ def unblock(future): future = self._cancel_goal_async(goal_handle) future.add_done_callback(unblock) - event.wait() + if not future.done(): + event.wait() if future.exception() is not None: raise future.exception() return future.result() @@ -495,16 +513,18 @@ def _cancel_goal_async(self, goal_handle): cancel_request = CancelGoal.Request() cancel_request.goal_info.goal_id = goal_handle.goal_id - sequence_number = self._client_handle.send_cancel_request(cancel_request) - if sequence_number in self._pending_cancel_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending cancel request'.format(sequence_number)) - future = Future() - self._pending_cancel_requests[sequence_number] = future - future.add_done_callback(self._remove_pending_cancel_request) - # Add future so executor is aware - self.add_future(future) + with self._cancel_lock: + sequence_number = self._client_handle.send_cancel_request(cancel_request) + if sequence_number in self._pending_cancel_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending cancel request'.format(sequence_number)) + + future = Future() + self._pending_cancel_requests[sequence_number] = future + future.add_done_callback(self._remove_pending_cancel_request) + # Add future so executor is aware + self.add_future(future) return future @@ -527,7 +547,8 @@ def unblock(future): future = self._get_result_async(goal_handle) future.add_done_callback(unblock) - event.wait() + if not future.done(): + event.wait() if future.exception() is not None: raise future.exception() return future.result() @@ -547,17 +568,19 @@ def _get_result_async(self, goal_handle): result_request = self._action_type.Impl.GetResultService.Request() result_request.goal_id = goal_handle.goal_id - sequence_number = self._client_handle.send_result_request(result_request) - if sequence_number in self._pending_result_requests: - raise RuntimeError( - 'Sequence ({}) conflicts with pending result request'.format(sequence_number)) - - future = Future() - self._pending_result_requests[sequence_number] = future - self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id - future.add_done_callback(self._remove_pending_result_request) - # Add future so executor is aware - self.add_future(future) + + with self._result_lock: + sequence_number = self._client_handle.send_result_request(result_request) + if sequence_number in self._pending_result_requests: + raise RuntimeError( + 'Sequence ({}) conflicts with pending result request'.format(sequence_number)) + + future = Future() + self._pending_result_requests[sequence_number] = future + self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id + future.add_done_callback(self._remove_pending_result_request) + # Add future so executor is aware + self.add_future(future) return future