Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Godot Bian <[email protected]>
  • Loading branch information
godobyte committed Nov 21, 2024
1 parent 603c7e0 commit 3ea7c45
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 79 deletions.
6 changes: 5 additions & 1 deletion src/deadline_worker_agent/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@

import os

ASSET_SYNC_JOB_USER_FEATURE = os.environ.get("ASSET_SYNC_JOB_USER_FEATURE")
# This feature is still a work-in-progress and untested on Windows

ASSET_SYNC_JOB_USER_FEATURE = (
os.environ.get("ASSET_SYNC_JOB_USER_FEATURE", "false").lower() == "true"
)
24 changes: 12 additions & 12 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class SessionActionQueueEntry(Generic[D]):
SyncInputJobAttachmentsStepDependenciesQueueEntry = SessionActionQueueEntry[
SyncInputJobAttachmentsActionApiModel
]
AttachmentDownloadActioQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel]
AttachmentDownloadActioStepDependenciesQueueEntry = SessionActionQueueEntry[
AttachmentDownloadActionQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel]
AttachmentDownloadActionStepDependenciesQueueEntry = SessionActionQueueEntry[
AttachmentDownloadActionApiModel
]
CancelOutcome = Literal["FAILED", "NEVER_ATTEMPTED"]
Expand All @@ -102,17 +102,17 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActioQueueEntry
| AttachmentDownloadActioStepDependenciesQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
]
_actions_by_id: dict[
str,
EnvironmentQueueEntry
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActioQueueEntry
| AttachmentDownloadActioStepDependenciesQueueEntry,
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry,
]
_action_update_callback: Callable[[SessionActionStatus], None]
_job_entities: JobEntities
Expand Down Expand Up @@ -303,8 +303,8 @@ def replace(
| EnvironmentQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActioQueueEntry
| AttachmentDownloadActioStepDependenciesQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
] = []

action_ids_added = list[str]()
Expand Down Expand Up @@ -333,12 +333,12 @@ def replace(
if ASSET_SYNC_JOB_USER_FEATURE:
action = cast(AttachmentDownloadActionApiModel, action)
if "stepId" not in action:
queue_entry = AttachmentDownloadActioQueueEntry(
queue_entry = AttachmentDownloadActionQueueEntry(
cancel=cancel_event,
definition=action,
)
else:
queue_entry = AttachmentDownloadActioStepDependenciesQueueEntry(
queue_entry = AttachmentDownloadActionStepDependenciesQueueEntry(
cancel=cancel_event,
definition=action,
)
Expand Down Expand Up @@ -483,7 +483,7 @@ def dequeue(self) -> SessionActionDefinition | None:
action_definition = cast(AttachmentDownloadActionApiModel, action_definition)
if "stepId" not in action_definition:
action_queue_entry = cast(

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable action_queue_entry is not used.
AttachmentDownloadActioQueueEntry, action_queue_entry
AttachmentDownloadActionQueueEntry, action_queue_entry
)
try:
job_attachment_details = self._job_entities.job_attachment_details()
Expand All @@ -502,7 +502,7 @@ def dequeue(self) -> SessionActionDefinition | None:
)
else:
action_queue_entry = cast(

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable action_queue_entry is not used.
AttachmentDownloadActioStepDependenciesQueueEntry, action_queue_entry
AttachmentDownloadActionStepDependenciesQueueEntry, action_queue_entry
)

try:
Expand Down
124 changes: 62 additions & 62 deletions src/deadline_worker_agent/sessions/actions/run_attachment_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
import json
from shlex import quote
from logging import getLogger, LoggerAdapter
from logging import LoggerAdapter
import sysconfig
from typing import Any, TYPE_CHECKING, Optional
from dataclasses import asdict
Expand All @@ -32,6 +32,7 @@

from openjd.sessions import (
LOG as OPENJD_LOG,
LogContent,
PathMappingRule as OpenjdPathMapping,
PosixSessionUser,
WindowsSessionUser,
Expand All @@ -52,30 +53,19 @@
from .openjd_action import OpenjdAction

if TYPE_CHECKING:
from concurrent.futures import Future
from ..session import Session
from ..job_entities import JobAttachmentDetails, StepDetails


logger = getLogger(__name__)


class SyncCanceled(Exception):
"""Exception indicating the synchronization was canceled"""

pass


class AttachmentDownloadAction(OpenjdAction):
"""Action to synchronize input job attachments for a AWS Deadline Cloud job
"""Action to synchronize input job attachments for a AWS Deadline Cloud Session
Parameters
----------
id : str
The unique action identifier
"""

_future: Future[None]
_job_attachment_details: Optional[JobAttachmentDetails]
_step_details: Optional[StepDetails]
_step_script: Optional[StepScript_2023_09]
Expand All @@ -102,6 +92,7 @@ def __init__(
self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id})

def set_step_script(self, manifests, path_mapping, s3_settings) -> None:
# TODO - update to run python as embedded file
profile = os.environ.get("AWS_PROFILE")
deadline_path = os.path.join(Path(sysconfig.get_path("scripts")), "deadline")

Expand Down Expand Up @@ -151,17 +142,25 @@ def start(
An executor for running futures
"""

self._logger.info(f"Syncing inputs using session {session}")

if self._step_details:
section_title = "Job Attachments Download for Step"
else:
section_title = "Job Attachments Download for Job"

# Banner mimicing the one printed by the openjd-sessions runtime
self._logger.info("==============================================")
self._logger.info(f"--------- AttachmentDownloadAction {section_title}")
self._logger.info("==============================================")
# TODO - Consider a better approach to manage the banner title
self._logger.info(
"==============================================",
extra={"openjd_log_content": LogContent.BANNER},
)
self._logger.info(
f"--------- AttachmentDownloadAction {section_title}",
extra={"openjd_log_content": LogContent.BANNER},
)
self._logger.info(
"==============================================",
extra={"openjd_log_content": LogContent.BANNER},
)

if not (job_attachment_settings := session._job_details.job_attachment_settings):
raise RuntimeError("Job attachment settings were not contained in JOB_DETAILS entity")
Expand Down Expand Up @@ -232,61 +231,62 @@ def start(
)
)

vfs_handled = self._vfs_handling(
if self._start_vfs(
session=session,
attachments=attachments,
merged_manifests_by_root=merged_manifests_by_root,
s3_settings=s3_settings,
)
):
# successfully launched VFS
return

if not vfs_handled:
job_attachment_path_mappings = list([asdict(r) for r in dynamic_mapping_rules.values()])
job_attachment_path_mappings = list([asdict(r) for r in dynamic_mapping_rules.values()])

# Open Job Description session implementation details -- path mappings are sorted.
# bisect.insort only supports the 'key' arg in 3.10 or later, so
# we first extend the list and sort it afterwards.
if session._session._path_mapping_rules:
session._session._path_mapping_rules.extend(
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
)
else:
session._session._path_mapping_rules = [
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
]

# Open Job Description Sessions sort the path mapping rules based on length of the parts make
# rules that are subsets of each other behave in a predictable manner. We must
# sort here since we're modifying that internal list appending to the list.
session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts))

# =========================== TO BE DELETED ===========================
path_mapping_file_path: str = os.path.join(
session._session.working_directory, "path_mapping"
# Open Job Description session implementation details -- path mappings are sorted.
# bisect.insort only supports the 'key' arg in 3.10 or later, so
# we first extend the list and sort it afterwards.
if session._session._path_mapping_rules:
session._session._path_mapping_rules.extend(
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
)
for rule in job_attachment_path_mappings:
rule["source_path"] = rule["destination_path"]
else:
session._session._path_mapping_rules = [
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
]

# Open Job Description Sessions sort the path mapping rules based on length of the parts make
# rules that are subsets of each other behave in a predictable manner. We must
# sort here since we're modifying that internal list appending to the list.
session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts))

# =========================== TO BE DELETED ===========================
path_mapping_file_path: str = os.path.join(
session._session.working_directory, "path_mapping"
)
for rule in job_attachment_path_mappings:
rule["source_path"] = rule["destination_path"]

with open(path_mapping_file_path, "w", encoding="utf8") as f:
f.write(json.dumps([rule for rule in job_attachment_path_mappings]))
# =========================== TO BE DELETED ===========================
with open(path_mapping_file_path, "w", encoding="utf8") as f:
f.write(json.dumps([rule for rule in job_attachment_path_mappings]))
# =========================== TO BE DELETED ===========================

manifest_paths = session._asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root,
manifest_write_dir=str(session._session.working_directory),
)
manifest_paths = session._asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root,
manifest_write_dir=str(session._session.working_directory),
)

self.set_step_script(
manifests=manifest_paths,
path_mapping=path_mapping_file_path,
s3_settings=s3_settings,
)
assert self._step_script is not None
session.run_task(
step_script=self._step_script,
task_parameter_values=dict[str, ParameterValue](),
)
self.set_step_script(
manifests=manifest_paths,
path_mapping=path_mapping_file_path,
s3_settings=s3_settings,
)
assert self._step_script is not None
session.run_task(
step_script=self._step_script,
task_parameter_values=dict[str, ParameterValue](),
)

def _vfs_handling(
def _start_vfs(
self,
session: Session,
attachments: Attachments,
Expand Down
13 changes: 9 additions & 4 deletions test/unit/sessions/actions/test_run_attachment_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations
from pathlib import Path
import os
import sys
import sysconfig
import tempfile
from typing import TYPE_CHECKING, Generator
Expand Down Expand Up @@ -109,6 +110,10 @@ def mock_asset_sync(self, session: Mock) -> Generator[MagicMock, None, None]:
with patch.object(session, "_asset_sync") as mock_asset_sync:
yield mock_asset_sync

@pytest.mark.skipif(
sys.platform == "win32",
reason="Failed in windows due to embeddedFiles.data quotation mark, which will be replaced by python embedded file soon.",
)
def test_attachment_download_action_start(
self,
executor: Mock,
Expand All @@ -122,14 +127,14 @@ def test_attachment_download_action_start(
Tests that AttachmentDownloadAction.start() calls AssetSync functions to prepare input
for constructing step script to run openjd action
"""
# WHEN
action.start(session=session, executor=executor)

# THEN
# GIVEN
assert job_details.job_attachment_settings is not None
assert job_details.job_attachment_settings.s3_bucket_name is not None
assert job_details.job_attachment_settings.root_prefix is not None

# WHEN
action.start(session=session, executor=executor)

mock_asset_sync._aggregate_asset_root_manifests.assert_called_once_with(
session_dir=session_dir,
s3_settings=JobAttachmentS3Settings(
Expand Down

0 comments on commit 3ea7c45

Please sign in to comment.