Skip to content

Commit

Permalink
fix: set RunStepTaskAction end time to asset sync completion time (#11)
Browse files Browse the repository at this point in the history
Signed-off-by: Gahyun Suh <[email protected]>
  • Loading branch information
gahyusuh authored Oct 19, 2023
1 parent 1718c57 commit 2fd6c47
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ def _on_done(
)
except Exception as e:
session.logger.exception(e)
# We need to directly complete the action. Other actions rely on the Open Job Description session's
# callback to complete the action
action_status = ActionStatus(
state=ActionState.FAILED,
fail_message=str(e),
Expand Down
2 changes: 1 addition & 1 deletion src/deadline_worker_agent/sessions/log_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def create_remote_handler(
if not (log_group := self.options.get(LOG_CONFIG_OPTION_GROUP_NAME_KEY, None)):
raise KeyError(f'No "{LOG_CONFIG_OPTION_GROUP_NAME_KEY}" in logConfiguration.options')
elif not (log_stream := self.options.get(LOG_CONFIG_OPTION_STREAM_NAME_KEY, None)):
raise KeyError('No "{LOG_CONFIG_OPTION_STREAM_NAME_KEY}" in logConfiguration.options')
raise KeyError(f'No "{LOG_CONFIG_OPTION_STREAM_NAME_KEY}" in logConfiguration.options')

return CloudWatchHandler(
log_group_name=log_group,
Expand Down
12 changes: 9 additions & 3 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,17 @@ def _action_updated_impl(
):
# Synchronizing job output attachments is currently bundled together with the
# RunStepTaskAction. The synchronization happens after the task run succeeds, and both
# must be successful in order to mark the action as SUCCEEDED.
# must be successful in order to mark the action as SUCCEEDED. The time when
# the action is completed should be the moment when the sunchronization have
# been finished.
try:
self._sync_asset_outputs(current_action=current_action)
now = datetime.now(tz=timezone.utc)
except Exception as e:
# Log and fail the task run action if we are unable to sync output job
# attachments
fail_message = f"Failed to sync job output attachments for {current_action.definition.human_readable()}: {e}"
logger.warning(fail_message)
self.logger.warning(fail_message)
action_status = ActionStatus(state=ActionState.FAILED, fail_message=fail_message)
is_unsuccessful = True

Expand Down Expand Up @@ -1083,7 +1086,7 @@ def _sync_asset_outputs(
from .actions import RunStepTaskAction

assert isinstance(current_action.definition, RunStepTaskAction)
self._asset_sync.sync_outputs(
upload_summary_statistics = self._asset_sync.sync_outputs(
s3_settings=s3_settings,
attachments=attachments,
queue_id=self._queue_id,
Expand All @@ -1096,6 +1099,9 @@ def _sync_asset_outputs(
storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules_dict,
on_uploading_files=partial(self._notifier_callback, current_action),
)

ASSET_SYNC_LOGGER.info(f"Summary Statistics for file uploads:\n{upload_summary_statistics}")

ASSET_SYNC_LOGGER.info("Finished syncing outputs using Job Attachments")

def run_task(
Expand Down
10 changes: 9 additions & 1 deletion test/unit/sessions/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from deadline_worker_agent.api_models import EnvironmentAction, TaskRunAction
from deadline_worker_agent.sessions import Session
from deadline_worker_agent.sessions import session as session_module
from deadline_worker_agent.sessions.session import (
CurrentAction,
SessionActionStatus,
Expand Down Expand Up @@ -1087,7 +1088,14 @@ def test_success_task_run(
end_time=action_complete_time,
)

with patch.object(session, "_sync_asset_outputs") as mock_sync_asset_outputs:
def mock_now(*arg, **kwarg) -> datetime:
return action_complete_time

with patch.object(session_module, "datetime") as mock_datetime, patch.object(
session, "_sync_asset_outputs"
) as mock_sync_asset_outputs:
mock_datetime.now.side_effect = mock_now

# Assert that reporting the action update happens AFTER syncing the output job
# attachments.
def sync_asset_outputs_side_effect(*, current_action: CurrentAction) -> None:
Expand Down

0 comments on commit 2fd6c47

Please sign in to comment.