From 3ea7c45a300257a82f732153a4f162cf0645cb06 Mon Sep 17 00:00:00 2001 From: Godot Bian <13778003+godobyte@users.noreply.github.com> Date: Tue, 19 Nov 2024 23:45:45 +0000 Subject: [PATCH] address comments Signed-off-by: Godot Bian <13778003+godobyte@users.noreply.github.com> --- src/deadline_worker_agent/feature_flag.py | 6 +- .../scheduler/session_queue.py | 24 ++-- .../actions/run_attachment_download.py | 124 +++++++++--------- .../actions/test_run_attachment_download.py | 13 +- 4 files changed, 88 insertions(+), 79 deletions(-) diff --git a/src/deadline_worker_agent/feature_flag.py b/src/deadline_worker_agent/feature_flag.py index 00abfa7b..5e192761 100644 --- a/src/deadline_worker_agent/feature_flag.py +++ b/src/deadline_worker_agent/feature_flag.py @@ -2,4 +2,8 @@ import os -ASSET_SYNC_JOB_USER_FEATURE = os.environ.get("ASSET_SYNC_JOB_USER_FEATURE") +# This feature is still a work-in-progress and untested on Windows + +ASSET_SYNC_JOB_USER_FEATURE = ( + os.environ.get("ASSET_SYNC_JOB_USER_FEATURE", "false").lower() == "true" +) diff --git a/src/deadline_worker_agent/scheduler/session_queue.py b/src/deadline_worker_agent/scheduler/session_queue.py index 7b55d32c..9132d0fd 100644 --- a/src/deadline_worker_agent/scheduler/session_queue.py +++ b/src/deadline_worker_agent/scheduler/session_queue.py @@ -78,8 +78,8 @@ class SessionActionQueueEntry(Generic[D]): SyncInputJobAttachmentsStepDependenciesQueueEntry = SessionActionQueueEntry[ SyncInputJobAttachmentsActionApiModel ] -AttachmentDownloadActioQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel] -AttachmentDownloadActioStepDependenciesQueueEntry = SessionActionQueueEntry[ +AttachmentDownloadActionQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel] +AttachmentDownloadActionStepDependenciesQueueEntry = SessionActionQueueEntry[ AttachmentDownloadActionApiModel ] CancelOutcome = Literal["FAILED", "NEVER_ATTEMPTED"] @@ -102,8 +102,8 @@ class SessionActionQueue: | TaskRunQueueEntry | SyncInputJobAttachmentsQueueEntry | SyncInputJobAttachmentsStepDependenciesQueueEntry - | AttachmentDownloadActioQueueEntry - | AttachmentDownloadActioStepDependenciesQueueEntry + | AttachmentDownloadActionQueueEntry + | AttachmentDownloadActionStepDependenciesQueueEntry ] _actions_by_id: dict[ str, @@ -111,8 +111,8 @@ class SessionActionQueue: | TaskRunQueueEntry | SyncInputJobAttachmentsQueueEntry | SyncInputJobAttachmentsStepDependenciesQueueEntry - | AttachmentDownloadActioQueueEntry - | AttachmentDownloadActioStepDependenciesQueueEntry, + | AttachmentDownloadActionQueueEntry + | AttachmentDownloadActionStepDependenciesQueueEntry, ] _action_update_callback: Callable[[SessionActionStatus], None] _job_entities: JobEntities @@ -303,8 +303,8 @@ def replace( | EnvironmentQueueEntry | SyncInputJobAttachmentsQueueEntry | SyncInputJobAttachmentsStepDependenciesQueueEntry - | AttachmentDownloadActioQueueEntry - | AttachmentDownloadActioStepDependenciesQueueEntry + | AttachmentDownloadActionQueueEntry + | AttachmentDownloadActionStepDependenciesQueueEntry ] = [] action_ids_added = list[str]() @@ -333,12 +333,12 @@ def replace( if ASSET_SYNC_JOB_USER_FEATURE: action = cast(AttachmentDownloadActionApiModel, action) if "stepId" not in action: - queue_entry = AttachmentDownloadActioQueueEntry( + queue_entry = AttachmentDownloadActionQueueEntry( cancel=cancel_event, definition=action, ) else: - queue_entry = AttachmentDownloadActioStepDependenciesQueueEntry( + queue_entry = AttachmentDownloadActionStepDependenciesQueueEntry( cancel=cancel_event, definition=action, ) @@ -483,7 +483,7 @@ def dequeue(self) -> SessionActionDefinition | None: action_definition = cast(AttachmentDownloadActionApiModel, action_definition) if "stepId" not in action_definition: action_queue_entry = cast( - AttachmentDownloadActioQueueEntry, action_queue_entry + AttachmentDownloadActionQueueEntry, action_queue_entry ) try: job_attachment_details = self._job_entities.job_attachment_details() @@ -502,7 +502,7 @@ def dequeue(self) -> SessionActionDefinition | None: ) else: action_queue_entry = cast( - AttachmentDownloadActioStepDependenciesQueueEntry, action_queue_entry + AttachmentDownloadActionStepDependenciesQueueEntry, action_queue_entry ) try: diff --git a/src/deadline_worker_agent/sessions/actions/run_attachment_download.py b/src/deadline_worker_agent/sessions/actions/run_attachment_download.py index 093ee714..e81d602f 100644 --- a/src/deadline_worker_agent/sessions/actions/run_attachment_download.py +++ b/src/deadline_worker_agent/sessions/actions/run_attachment_download.py @@ -9,7 +9,7 @@ import sys import json from shlex import quote -from logging import getLogger, LoggerAdapter +from logging import LoggerAdapter import sysconfig from typing import Any, TYPE_CHECKING, Optional from dataclasses import asdict @@ -32,6 +32,7 @@ from openjd.sessions import ( LOG as OPENJD_LOG, + LogContent, PathMappingRule as OpenjdPathMapping, PosixSessionUser, WindowsSessionUser, @@ -52,22 +53,12 @@ from .openjd_action import OpenjdAction if TYPE_CHECKING: - from concurrent.futures import Future from ..session import Session from ..job_entities import JobAttachmentDetails, StepDetails -logger = getLogger(__name__) - - -class SyncCanceled(Exception): - """Exception indicating the synchronization was canceled""" - - pass - - class AttachmentDownloadAction(OpenjdAction): - """Action to synchronize input job attachments for a AWS Deadline Cloud job + """Action to synchronize input job attachments for a AWS Deadline Cloud Session Parameters ---------- @@ -75,7 +66,6 @@ class AttachmentDownloadAction(OpenjdAction): The unique action identifier """ - _future: Future[None] _job_attachment_details: Optional[JobAttachmentDetails] _step_details: Optional[StepDetails] _step_script: Optional[StepScript_2023_09] @@ -102,6 +92,7 @@ def __init__( self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id}) def set_step_script(self, manifests, path_mapping, s3_settings) -> None: + # TODO - update to run python as embedded file profile = os.environ.get("AWS_PROFILE") deadline_path = os.path.join(Path(sysconfig.get_path("scripts")), "deadline") @@ -151,17 +142,25 @@ def start( An executor for running futures """ - self._logger.info(f"Syncing inputs using session {session}") - if self._step_details: section_title = "Job Attachments Download for Step" else: section_title = "Job Attachments Download for Job" # Banner mimicing the one printed by the openjd-sessions runtime - self._logger.info("==============================================") - self._logger.info(f"--------- AttachmentDownloadAction {section_title}") - self._logger.info("==============================================") + # TODO - Consider a better approach to manage the banner title + self._logger.info( + "==============================================", + extra={"openjd_log_content": LogContent.BANNER}, + ) + self._logger.info( + f"--------- AttachmentDownloadAction {section_title}", + extra={"openjd_log_content": LogContent.BANNER}, + ) + self._logger.info( + "==============================================", + extra={"openjd_log_content": LogContent.BANNER}, + ) if not (job_attachment_settings := session._job_details.job_attachment_settings): raise RuntimeError("Job attachment settings were not contained in JOB_DETAILS entity") @@ -232,61 +231,62 @@ def start( ) ) - vfs_handled = self._vfs_handling( + if self._start_vfs( session=session, attachments=attachments, merged_manifests_by_root=merged_manifests_by_root, s3_settings=s3_settings, - ) + ): + # successfully launched VFS + return - if not vfs_handled: - job_attachment_path_mappings = list([asdict(r) for r in dynamic_mapping_rules.values()]) + job_attachment_path_mappings = list([asdict(r) for r in dynamic_mapping_rules.values()]) - # Open Job Description session implementation details -- path mappings are sorted. - # bisect.insort only supports the 'key' arg in 3.10 or later, so - # we first extend the list and sort it afterwards. - if session._session._path_mapping_rules: - session._session._path_mapping_rules.extend( - OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings - ) - else: - session._session._path_mapping_rules = [ - OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings - ] - - # Open Job Description Sessions sort the path mapping rules based on length of the parts make - # rules that are subsets of each other behave in a predictable manner. We must - # sort here since we're modifying that internal list appending to the list. - session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts)) - - # =========================== TO BE DELETED =========================== - path_mapping_file_path: str = os.path.join( - session._session.working_directory, "path_mapping" + # Open Job Description session implementation details -- path mappings are sorted. + # bisect.insort only supports the 'key' arg in 3.10 or later, so + # we first extend the list and sort it afterwards. + if session._session._path_mapping_rules: + session._session._path_mapping_rules.extend( + OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings ) - for rule in job_attachment_path_mappings: - rule["source_path"] = rule["destination_path"] + else: + session._session._path_mapping_rules = [ + OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings + ] + + # Open Job Description Sessions sort the path mapping rules based on length of the parts make + # rules that are subsets of each other behave in a predictable manner. We must + # sort here since we're modifying that internal list appending to the list. + session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts)) + + # =========================== TO BE DELETED =========================== + path_mapping_file_path: str = os.path.join( + session._session.working_directory, "path_mapping" + ) + for rule in job_attachment_path_mappings: + rule["source_path"] = rule["destination_path"] - with open(path_mapping_file_path, "w", encoding="utf8") as f: - f.write(json.dumps([rule for rule in job_attachment_path_mappings])) - # =========================== TO BE DELETED =========================== + with open(path_mapping_file_path, "w", encoding="utf8") as f: + f.write(json.dumps([rule for rule in job_attachment_path_mappings])) + # =========================== TO BE DELETED =========================== - manifest_paths = session._asset_sync._check_and_write_local_manifests( - merged_manifests_by_root=merged_manifests_by_root, - manifest_write_dir=str(session._session.working_directory), - ) + manifest_paths = session._asset_sync._check_and_write_local_manifests( + merged_manifests_by_root=merged_manifests_by_root, + manifest_write_dir=str(session._session.working_directory), + ) - self.set_step_script( - manifests=manifest_paths, - path_mapping=path_mapping_file_path, - s3_settings=s3_settings, - ) - assert self._step_script is not None - session.run_task( - step_script=self._step_script, - task_parameter_values=dict[str, ParameterValue](), - ) + self.set_step_script( + manifests=manifest_paths, + path_mapping=path_mapping_file_path, + s3_settings=s3_settings, + ) + assert self._step_script is not None + session.run_task( + step_script=self._step_script, + task_parameter_values=dict[str, ParameterValue](), + ) - def _vfs_handling( + def _start_vfs( self, session: Session, attachments: Attachments, diff --git a/test/unit/sessions/actions/test_run_attachment_download.py b/test/unit/sessions/actions/test_run_attachment_download.py index c378eab0..b6b4f9ae 100644 --- a/test/unit/sessions/actions/test_run_attachment_download.py +++ b/test/unit/sessions/actions/test_run_attachment_download.py @@ -3,6 +3,7 @@ from __future__ import annotations from pathlib import Path import os +import sys import sysconfig import tempfile from typing import TYPE_CHECKING, Generator @@ -109,6 +110,10 @@ def mock_asset_sync(self, session: Mock) -> Generator[MagicMock, None, None]: with patch.object(session, "_asset_sync") as mock_asset_sync: yield mock_asset_sync + @pytest.mark.skipif( + sys.platform == "win32", + reason="Failed in windows due to embeddedFiles.data quotation mark, which will be replaced by python embedded file soon.", + ) def test_attachment_download_action_start( self, executor: Mock, @@ -122,14 +127,14 @@ def test_attachment_download_action_start( Tests that AttachmentDownloadAction.start() calls AssetSync functions to prepare input for constructing step script to run openjd action """ - # WHEN - action.start(session=session, executor=executor) - - # THEN + # GIVEN assert job_details.job_attachment_settings is not None assert job_details.job_attachment_settings.s3_bucket_name is not None assert job_details.job_attachment_settings.root_prefix is not None + # WHEN + action.start(session=session, executor=executor) + mock_asset_sync._aggregate_asset_root_manifests.assert_called_once_with( session_dir=session_dir, s3_settings=JobAttachmentS3Settings(