Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in action client #1164

Open
wants to merge 4 commits into
base: rolling
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 91 additions & 68 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def __init__(
self._node.add_waitable(self)
self._logger = self._node.get_logger().get_child('action_client')

self._goal_lock = threading.Lock()
self._cancel_lock = threading.Lock()
self._result_lock = threading.Lock()

def _generate_random_uuid(self):
return UUID(uuid=list(uuid.uuid4().bytes))

Expand Down Expand Up @@ -210,21 +214,24 @@ def _remove_pending_request(self, future, pending_requests):
return None

def _remove_pending_goal_request(self, future):
seq = self._remove_pending_request(future, self._pending_goal_requests)
if seq in self._goal_sequence_number_to_goal_id:
del self._goal_sequence_number_to_goal_id[seq]
with self._goal_lock:
seq = self._remove_pending_request(future, self._pending_goal_requests)
if seq in self._goal_sequence_number_to_goal_id:
del self._goal_sequence_number_to_goal_id[seq]

def _remove_pending_cancel_request(self, future):
self._remove_pending_request(future, self._pending_cancel_requests)
with self._cancel_lock:
self._remove_pending_request(future, self._pending_cancel_requests)

def _remove_pending_result_request(self, future):
seq = self._remove_pending_request(future, self._pending_result_requests)
if seq in self._result_sequence_number_to_goal_id:
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
del self._result_sequence_number_to_goal_id[seq]
# remove feeback_callback if user is aware of result and it's been received
if goal_uuid in self._feedback_callbacks:
del self._feedback_callbacks[goal_uuid]
with self._result_lock:
seq = self._remove_pending_request(future, self._pending_result_requests)
if seq in self._result_sequence_number_to_goal_id:
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
del self._result_sequence_number_to_goal_id[seq]
# remove feeback_callback if user is aware of result and it's been received
if goal_uuid in self._feedback_callbacks:
del self._feedback_callbacks[goal_uuid]

# Start Waitable API
def is_ready(self, wait_set):
Expand Down Expand Up @@ -286,45 +293,52 @@ async def execute(self, taken_data):
"""
if 'goal' in taken_data:
sequence_number, goal_response = taken_data['goal']
if sequence_number in self._goal_sequence_number_to_goal_id:
goal_handle = ClientGoalHandle(
self,
self._goal_sequence_number_to_goal_id[sequence_number],
goal_response)

try:
with self._goal_lock:
goal_id = self._goal_sequence_number_to_goal_id[sequence_number]
pending_goal_request = self._pending_goal_requests[sequence_number]
except KeyError:
self._logger.warning(
'Ignoring unexpected goal response. There may be more than '
f"one action server for the action '{self._action_name}'"
)
else:
goal_handle = ClientGoalHandle(self, goal_id, goal_response)
if goal_handle.accepted:
goal_uuid = bytes(goal_handle.goal_id.uuid)
if goal_uuid in self._goal_handles:
raise RuntimeError(
'Two goals were accepted with the same ID ({})'.format(goal_handle))
f'Two goals were accepted with the same ID ({goal_handle})')
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)

self._pending_goal_requests[sequence_number].set_result(goal_handle)
else:
self._logger.warning(
'Ignoring unexpected goal response. There may be more than '
f"one action server for the action '{self._action_name}'"
)
pending_goal_request.set_result(goal_handle)

if 'cancel' in taken_data:
sequence_number, cancel_response = taken_data['cancel']
if sequence_number in self._pending_cancel_requests:
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
else:
try:
with self._cancel_lock:
pending_cancel_request = self._pending_cancel_requests[sequence_number]
except KeyError:
self._logger.warning(
'Ignoring unexpected cancel response. There may be more than '
f"one action server for the action '{self._action_name}'"
)
else:
pending_cancel_request.set_result(cancel_response)

if 'result' in taken_data:
sequence_number, result_response = taken_data['result']
if sequence_number in self._pending_result_requests:
self._pending_result_requests[sequence_number].set_result(result_response)
else:
try:
with self._result_lock:
pending_result_request = self._pending_result_requests[sequence_number]
except KeyError:
self._logger.warning(
'Ignoring unexpected result response. There may be more than '
f"one action server for the action '{self._action_name}'"
)
else:
pending_result_request.set_result(result_response)

if 'feedback' in taken_data:
feedback_msg = taken_data['feedback']
Expand Down Expand Up @@ -400,7 +414,8 @@ def unblock(future):
send_goal_future = self.send_goal_async(goal, **kwargs)
send_goal_future.add_done_callback(unblock)

event.wait()
if not send_goal_future.done():
event.wait()
if send_goal_future.exception() is not None:
raise send_goal_future.exception()

Expand Down Expand Up @@ -437,22 +452,24 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
request = self._action_type.Impl.SendGoalService.Request()
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
request.goal = goal
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))

if feedback_callback is not None:
# TODO(jacobperron): Move conversion function to a general-use package
goal_uuid = bytes(request.goal_id.uuid)
self._feedback_callbacks[goal_uuid] = feedback_callback

future = Future()
self._pending_goal_requests[sequence_number] = future
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
future.add_done_callback(self._remove_pending_goal_request)
# Add future so executor is aware
self.add_future(future)

with self._goal_lock:
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))

if feedback_callback is not None:
# TODO(jacobperron): Move conversion function to a general-use package
goal_uuid = bytes(request.goal_id.uuid)
self._feedback_callbacks[goal_uuid] = feedback_callback

future = Future()
self._pending_goal_requests[sequence_number] = future
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
future.add_done_callback(self._remove_pending_goal_request)
# Add future so executor is aware
self.add_future(future)

return future

Expand All @@ -475,7 +492,8 @@ def unblock(future):
future = self._cancel_goal_async(goal_handle)
future.add_done_callback(unblock)

event.wait()
if not future.done():
event.wait()
if future.exception() is not None:
raise future.exception()
return future.result()
Expand All @@ -495,16 +513,18 @@ def _cancel_goal_async(self, goal_handle):

cancel_request = CancelGoal.Request()
cancel_request.goal_info.goal_id = goal_handle.goal_id
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))

future = Future()
self._pending_cancel_requests[sequence_number] = future
future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)
with self._cancel_lock:
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))

future = Future()
self._pending_cancel_requests[sequence_number] = future
future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)

return future

Expand All @@ -527,7 +547,8 @@ def unblock(future):
future = self._get_result_async(goal_handle)
future.add_done_callback(unblock)

event.wait()
if not future.done():
event.wait()
if future.exception() is not None:
raise future.exception()
return future.result()
Expand All @@ -547,17 +568,19 @@ def _get_result_async(self, goal_handle):

result_request = self._action_type.Impl.GetResultService.Request()
result_request.goal_id = goal_handle.goal_id
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending result request'.format(sequence_number))

future = Future()
self._pending_result_requests[sequence_number] = future
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)

with self._result_lock:
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending result request'.format(sequence_number))

future = Future()
self._pending_result_requests[sequence_number] = future
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id
future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)

return future

Expand Down