Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Godot Bian <[email protected]>
  • Loading branch information
godobyte committed Nov 20, 2024
1 parent 603c7e0 commit 952ee18
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 32 deletions.
24 changes: 12 additions & 12 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class SessionActionQueueEntry(Generic[D]):
SyncInputJobAttachmentsStepDependenciesQueueEntry = SessionActionQueueEntry[
SyncInputJobAttachmentsActionApiModel
]
AttachmentDownloadActioQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel]
AttachmentDownloadActioStepDependenciesQueueEntry = SessionActionQueueEntry[
AttachmentDownloadActionQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel]
AttachmentDownloadActionStepDependenciesQueueEntry = SessionActionQueueEntry[
AttachmentDownloadActionApiModel
]
CancelOutcome = Literal["FAILED", "NEVER_ATTEMPTED"]
Expand All @@ -102,17 +102,17 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActioQueueEntry
| AttachmentDownloadActioStepDependenciesQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
]
_actions_by_id: dict[
str,
EnvironmentQueueEntry
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActioQueueEntry
| AttachmentDownloadActioStepDependenciesQueueEntry,
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry,
]
_action_update_callback: Callable[[SessionActionStatus], None]
_job_entities: JobEntities
Expand Down Expand Up @@ -303,8 +303,8 @@ def replace(
| EnvironmentQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActioQueueEntry
| AttachmentDownloadActioStepDependenciesQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
] = []

action_ids_added = list[str]()
Expand Down Expand Up @@ -333,12 +333,12 @@ def replace(
if ASSET_SYNC_JOB_USER_FEATURE:
action = cast(AttachmentDownloadActionApiModel, action)
if "stepId" not in action:
queue_entry = AttachmentDownloadActioQueueEntry(
queue_entry = AttachmentDownloadActionQueueEntry(
cancel=cancel_event,
definition=action,
)
else:
queue_entry = AttachmentDownloadActioStepDependenciesQueueEntry(
queue_entry = AttachmentDownloadActionStepDependenciesQueueEntry(
cancel=cancel_event,
definition=action,
)
Expand Down Expand Up @@ -483,7 +483,7 @@ def dequeue(self) -> SessionActionDefinition | None:
action_definition = cast(AttachmentDownloadActionApiModel, action_definition)
if "stepId" not in action_definition:
action_queue_entry = cast(

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable action_queue_entry is not used.
AttachmentDownloadActioQueueEntry, action_queue_entry
AttachmentDownloadActionQueueEntry, action_queue_entry
)
try:
job_attachment_details = self._job_entities.job_attachment_details()
Expand All @@ -502,7 +502,7 @@ def dequeue(self) -> SessionActionDefinition | None:
)
else:
action_queue_entry = cast(

Check notice

Code scanning / CodeQL

Unused local variable Note

Variable action_queue_entry is not used.
AttachmentDownloadActioStepDependenciesQueueEntry, action_queue_entry
AttachmentDownloadActionStepDependenciesQueueEntry, action_queue_entry
)

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
import json
from shlex import quote
from logging import getLogger, LoggerAdapter
from logging import LoggerAdapter
import sysconfig
from typing import Any, TYPE_CHECKING, Optional
from dataclasses import asdict
Expand All @@ -32,6 +32,7 @@

from openjd.sessions import (
LOG as OPENJD_LOG,
LogContent,
PathMappingRule as OpenjdPathMapping,
PosixSessionUser,
WindowsSessionUser,
Expand All @@ -52,30 +53,19 @@
from .openjd_action import OpenjdAction

if TYPE_CHECKING:
from concurrent.futures import Future
from ..session import Session
from ..job_entities import JobAttachmentDetails, StepDetails


logger = getLogger(__name__)


class SyncCanceled(Exception):
"""Exception indicating the synchronization was canceled"""

pass


class AttachmentDownloadAction(OpenjdAction):
"""Action to synchronize input job attachments for a AWS Deadline Cloud job
"""Action to synchronize input job attachments for a AWS Deadline Cloud Session
Parameters
----------
id : str
The unique action identifier
"""

_future: Future[None]
_job_attachment_details: Optional[JobAttachmentDetails]
_step_details: Optional[StepDetails]
_step_script: Optional[StepScript_2023_09]
Expand All @@ -102,6 +92,7 @@ def __init__(
self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id})

def set_step_script(self, manifests, path_mapping, s3_settings) -> None:
# TODO - update to run python as embedded file
profile = os.environ.get("AWS_PROFILE")
deadline_path = os.path.join(Path(sysconfig.get_path("scripts")), "deadline")

Expand Down Expand Up @@ -151,17 +142,24 @@ def start(
An executor for running futures
"""

self._logger.info(f"Syncing inputs using session {session}")

if self._step_details:
section_title = "Job Attachments Download for Step"
else:
section_title = "Job Attachments Download for Job"

# Banner mimicing the one printed by the openjd-sessions runtime
self._logger.info("==============================================")
self._logger.info(f"--------- AttachmentDownloadAction {section_title}")
self._logger.info("==============================================")
self._logger.info(
"==============================================",
extra={"openjd_log_content": LogContent.BANNER},
)
self._logger.info(
f"--------- AttachmentDownloadAction {section_title}",
extra={"openjd_log_content": LogContent.BANNER},
)
self._logger.info(
"==============================================",
extra={"openjd_log_content": LogContent.BANNER},
)

if not (job_attachment_settings := session._job_details.job_attachment_settings):
raise RuntimeError("Job attachment settings were not contained in JOB_DETAILS entity")
Expand Down Expand Up @@ -232,7 +230,7 @@ def start(
)
)

vfs_handled = self._vfs_handling(
vfs_handled = self._start_vfs(
session=session,
attachments=attachments,
merged_manifests_by_root=merged_manifests_by_root,
Expand Down Expand Up @@ -286,7 +284,7 @@ def start(
task_parameter_values=dict[str, ParameterValue](),
)

def _vfs_handling(
def _start_vfs(
self,
session: Session,
attachments: Attachments,
Expand Down
5 changes: 5 additions & 0 deletions test/unit/sessions/actions/test_run_attachment_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations
from pathlib import Path
import os
import sys
import sysconfig
import tempfile
from typing import TYPE_CHECKING, Generator
Expand Down Expand Up @@ -109,6 +110,10 @@ def mock_asset_sync(self, session: Mock) -> Generator[MagicMock, None, None]:
with patch.object(session, "_asset_sync") as mock_asset_sync:
yield mock_asset_sync

@pytest.mark.skipif(
sys.platform == "win32",
reason="Failed in windows due to embeddedFiles.data quotation mark, which will be replaced by python embedded file soon.",
)
def test_attachment_download_action_start(
self,
executor: Mock,
Expand Down

0 comments on commit 952ee18

Please sign in to comment.