Skip to content

Commit

Permalink
chore(test): fixes testing race when calling Session._action_updated_…
Browse files Browse the repository at this point in the history
…impl

Summary:
 Unit tests that call Session._action_updated_impl() were randomly failing
because that function does some work in a Future, and the tests were not waiting
for the future to complete before checking results. The future would usually
complete quickly, but when it doesn't then the tests would fail.

 Fix the tests by exposing the inner future and waiting on it during the tests.

Signed-off-by: Daniel Neilson <[email protected]>
  • Loading branch information
ddneilson committed Oct 29, 2024
1 parent 386a1bb commit b86d511
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
6 changes: 4 additions & 2 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ def _action_updated_impl(
*,
action_status: ActionStatus,
now: datetime,
) -> None:
) -> Optional[Future]:
"""Internal implementation for the callback invoked on every Open Job Description status/progress
update and the completion/exit of the current action. The caller should acquire the
Session._current_action_lock before calling this method.
Expand Down Expand Up @@ -1097,7 +1097,9 @@ def _action_updated_impl(
current_action=current_action,
)
future.add_done_callback(on_done_with_sync_asset_outputs)

# Returning the future just to make this method easier to test.
# Tests need to wait on the future to avoid race conditions
return future
else:
self._handle_action_update(is_unsuccessful, action_status, current_action, now)

Expand Down
33 changes: 22 additions & 11 deletions test/unit/sessions/test_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from __future__ import annotations
from concurrent.futures import wait
from datetime import datetime, timedelta
from pathlib import PurePosixPath, PureWindowsPath
from threading import Event, RLock
Expand Down Expand Up @@ -1343,10 +1344,12 @@ def test_failed_enter_env(

with patch.object(session, "_sync_asset_outputs") as mock_sync_asset_outputs:
# WHEN
session._action_updated_impl(
future = session._action_updated_impl(
action_status=failed_action_status,
now=action_complete_time,
)
if future:
wait([future])

# THEN
mock_report_action_update.assert_called_once_with(expected_action_update)
Expand Down Expand Up @@ -1410,10 +1413,12 @@ def test_failed_task_run(

with patch.object(session, "_sync_asset_outputs") as mock_sync_asset_outputs:
# WHEN
session._action_updated_impl(
future = session._action_updated_impl(
action_status=failed_action_status,
now=action_complete_time,
)
if future:
wait([future])

# THEN
mock_report_action_update.assert_called_once_with(expected_action_update)
Expand Down Expand Up @@ -1488,10 +1493,12 @@ def sync_asset_outputs_side_effect(*, current_action: CurrentAction) -> None:
mock_sync_asset_outputs.side_effect = sync_asset_outputs_side_effect

# WHEN
session._action_updated_impl(
future = session._action_updated_impl(
action_status=success_action_status,
now=action_complete_time,
)
if future:
wait([future])

# THEN
mock_report_action_update.assert_called_once_with(expected_action_update)
Expand Down Expand Up @@ -1565,10 +1572,12 @@ def mock_now(*arg, **kwarg) -> datetime:
mock_datetime.now.side_effect = mock_now

# WHEN
session._action_updated_impl(
future = session._action_updated_impl(
action_status=success_action_status,
now=action_complete_time,
)
if future:
wait([future])

# THEN
mock_report_action_update.assert_called_once_with(expected_action_update)
Expand All @@ -1589,14 +1598,12 @@ def test_logs_succeeded(
) -> None:
"""Tests that succeeded actions are logged"""
# WHEN
session._action_updated_impl(
future = session._action_updated_impl(
action_status=success_action_status,
now=action_complete_time,
)
# This because the _action_update_impl submits a future to this thread pool executor
# The test assertion depends on this future completing and so there's a race condition
# if we do not wait for the thread pool to shutdown and all futures to complete.
session._executor.shutdown()
if future:
wait([future])

# THEN
mock_mod_logger.info.assert_called_once()
Expand All @@ -1618,10 +1625,12 @@ def test_logs_failed(
) -> None:
"""Tests that failed actions are logged"""
# WHEN
session._action_updated_impl(
future = session._action_updated_impl(
action_status=failed_action_status,
now=action_complete_time,
)
if future:
wait([future])

# THEN
mock_mod_logger.info.assert_called_once()
Expand All @@ -1642,10 +1651,12 @@ def test_logs_canceled(
) -> None:
"""Tests that canceled actions are logged"""
# WHEN
session._action_updated_impl(
future = session._action_updated_impl(
action_status=canceled_action_status,
now=action_complete_time,
)
if future:
wait([future])

# THEN
mock_mod_logger.info.assert_called_once()
Expand Down

0 comments on commit b86d511

Please sign in to comment.