Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Godot Bian <[email protected]>
  • Loading branch information
godobyte committed Nov 29, 2024
1 parent c6f6f45 commit 0f9c3ff
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class AttachmentDownloadAction(TypedDict):
stepId: NotRequired[str]


# This action is not from API, kepping it here for all action models to be in one place
class AttachmentUploadAction(TypedDict):
sessionActionId: str
actionType: AttachmentUploadActionType
Expand Down
11 changes: 3 additions & 8 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,11 @@ def insert_front(
Parameters
----------
action : SessionActionQueueEntry
The action to be inserted
action : AttachmentUploadActionApiModel
The attachment upload action to be inserted to the front of queue
"""
action_type = action["actionType"]
action_id = action["sessionActionId"]
logger.info(
"Inserting attachment upload to the front of queue: %s action: %s",
action_type,
action_id,
)
cancel_event = Event()

action = cast(AttachmentUploadActionApiModel, action)
Expand All @@ -322,7 +317,7 @@ def insert_front(

self._actions.insert(0, queue_entry)
self._actions_by_id[action_id] = queue_entry
logger.info("Successfully inserted front of queue: %s action: %s", action_type, action_id)
logger.debug("Successfully inserted front of queue: %s action: %s", action_type, action_id)

def replace(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
Executor,
)
import os
from pathlib import Path
import sys
from shlex import quote
from pathlib import Path
from logging import LoggerAdapter
import sysconfig
from typing import Any, TYPE_CHECKING, Optional
from dataclasses import asdict

Expand Down Expand Up @@ -45,7 +43,6 @@
)
from openjd.model import ParameterValue

from ..session import Session
from ...log_messages import SessionActionLogKind


Expand Down Expand Up @@ -108,13 +105,18 @@ def set_step_script(self, manifests, s3_settings) -> None:
s3_settings.to_s3_root_uri(),
"-m",
]
args.extend([quote(p) for p in manifests])
args.extend(manifests)

executable_path = Path(sys.executable)
python_path = executable_path.parent / executable_path.name.lower().replace(
"pythonservice.exe", "python.exe"
)

with open(Path(__file__).parent / "scripts" / "attachment_download.py", "r") as f:
self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(
command=os.path.join(Path(sysconfig.get_path("scripts")), "python"),
command=python_path,
args=args,
)
),
Expand Down Expand Up @@ -223,15 +225,15 @@ def start(
# returns root path to PathMappingRule mapping
dynamic_mapping_rules: dict[str, PathMappingRule] = (
session._asset_sync.generate_dynamic_path_mapping(
session_dir=session._session.working_directory,
session_dir=session.openjd_session.working_directory,
attachments=attachments,
)
)

# Aggregate manifests (with step step dependency handling)
merged_manifests_by_root: dict[str, BaseAssetManifest] = (
session._asset_sync._aggregate_asset_root_manifests(
session_dir=session._session.working_directory,
session_dir=session.openjd_session.working_directory,
s3_settings=s3_settings,
queue_id=session._queue_id,
job_id=session._queue._job_id,
Expand All @@ -256,28 +258,30 @@ def start(
# Open Job Description session implementation details -- path mappings are sorted.
# bisect.insort only supports the 'key' arg in 3.10 or later, so
# we first extend the list and sort it afterwards.
if session._session._path_mapping_rules:
session._session._path_mapping_rules.extend(
if session.openjd_session._path_mapping_rules:
session.openjd_session._path_mapping_rules.extend(
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
)
else:
session._session._path_mapping_rules = [
session.openjd_session._path_mapping_rules = [
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
]

# Open Job Description Sessions sort the path mapping rules based on length of the parts make
# rules that are subsets of each other behave in a predictable manner. We must
# sort here since we're modifying that internal list appending to the list.
session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts))
session.openjd_session._path_mapping_rules.sort(
key=lambda rule: -len(rule.source_path.parts)
)

manifest_paths_by_root = session._asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root,
manifest_write_dir=str(session._session.working_directory),
manifest_write_dir=str(session.openjd_session.working_directory),
)
session.set_manifest_paths_by_root(manifest_paths_by_root)
# TODO: remove type: ignore for manifest_paths_by_root after deadline-cloud release
session.manifest_paths_by_root = manifest_paths_by_root # type: ignore

self.set_step_script(
# TODO: remove type: ignore after deadline-cloud release
manifests=manifest_paths_by_root.values(), # type: ignore
s3_settings=s3_settings,
)
Expand Down Expand Up @@ -328,7 +332,7 @@ def _start_vfs(
assert session._asset_sync is not None
session._asset_sync._launch_vfs(
s3_settings=s3_settings,
session_dir=session._session.working_directory,
session_dir=session.openjd_session.working_directory,
fs_permission_settings=fs_permission_settings,
merged_manifests_by_root=merged_manifests_by_root,
os_env_vars=dict(os.environ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
Executor,
)
import os
import sysconfig
from shlex import quote
import sys
from logging import LoggerAdapter
from typing import Any, TYPE_CHECKING, Optional
from pathlib import Path
Expand All @@ -25,10 +24,7 @@
from deadline.job_attachments.api.manifest import _manifest_snapshot
from deadline.job_attachments.models import ManifestSnapshot

from ..session import Session
from ...log_messages import SessionActionLogKind


from .openjd_action import OpenjdAction

if TYPE_CHECKING:
Expand Down Expand Up @@ -81,14 +77,19 @@ def set_step_script(self, manifests, s3_settings) -> None:
s3_settings.to_s3_root_uri(),
"-m",
]
args.extend([quote(p) for p in manifests])
args.extend(manifests)

executable_path = Path(sys.executable)
python_path = executable_path.parent / executable_path.name.lower().replace(
"pythonservice.exe", "python.exe"
)

with open(Path(__file__).parent / "scripts" / "attachment_upload.py", "r") as f:
data = f.read()
self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(
command=os.path.join(Path(sysconfig.get_path("scripts")), "python"),
command=python_path,
args=args,
)
),
Expand Down Expand Up @@ -153,8 +154,8 @@ def start(
rootPrefix=job_attachment_settings.root_prefix,
)

manifest_paths_by_root = session.manifest_paths_by_root()
output_path = os.path.join(session._session.working_directory, "diff")
manifest_paths_by_root = session.manifest_paths_by_root
output_path = os.path.join(session.openjd_session.working_directory, "diff")
manifests = list()

for root, path in manifest_paths_by_root.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
#! /usr/bin/env python3
import argparse
import time
import os
import boto3

from deadline.job_attachments import api

"""
A small script to download job output. Can provide just the Job ID to download all outputs
for a Job, optionally include the Step ID to get all outputs for the Job's Step, or optionally
include the Job, Step, and Task ID to get the outputs for a specific Task.
A small script to download job output using attachment download.
This is available in deadline-cloud as python API and AWS Deadline Cloud CLI.
Example usage:
Expand All @@ -28,7 +26,7 @@ def download(s3_root_uri: str, path_mapping_rules: str, manifests: list[str]) ->
api.attachment_download(
manifests=manifests,
s3_root_uri=s3_root_uri,
boto3_session=boto3.session.Session(profile_name=os.environ.get("AWS_PROFILE")),
boto3_session=boto3.session.Session(),
path_mapping_rules=path_mapping_rules,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from deadline.job_attachments import api

"""
A small script to upload job output.
A small script to upload job output using attachment upload.
This is available in deadline-cloud as python API and AWS Deadline Cloud CLI.
Example usage:
Expand All @@ -26,7 +27,7 @@ def upload(s3_root_uri: str, path_mapping_rules: str, manifests: list[str]) -> N
api.attachment_upload(
manifests=manifests,
s3_root_uri=s3_root_uri,
boto3_session=boto3.session.Session(profile_name=os.environ.get("AWS_PROFILE")),
boto3_session=boto3.session.Session(),
path_mapping_rules=path_mapping_rules,
upload_manifest_path=s3_path,
)
Expand Down
24 changes: 17 additions & 7 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ def openjd_session_action_callback(session_id: str, action_status: ActionStatus)
# 3. We're already capturing it to send to cloudwatch.
self.logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": self._id})

@property
def openjd_session(self) -> OPENJDSession:
"""The openjd session for this session"""
return self._session

@property
def id(self) -> str:
"""The unique session ID"""
Expand All @@ -250,8 +255,9 @@ def os_user(self) -> Optional[SessionUser]:
def manifest_paths_by_root(self) -> dict[str, str]:
return self._manifest_paths_by_root

def set_manifest_paths_by_root(self, manifest_paths_by_root) -> None:
self._manifest_paths_by_root = manifest_paths_by_root
@manifest_paths_by_root.setter
def manifest_paths_by_root(self, value: dict[str, str]) -> None:
self._manifest_paths_by_root = dict(value)

def _warm_job_entities_cache(self) -> None:
"""Attempts to cache the job entities response for all
Expand Down Expand Up @@ -310,7 +316,7 @@ def run(self) -> None:
# service. If an action was running at the time of this exception, its failure is
# reported immediately in the call to Session._cleanup() below.
self._stop_fail_message = f"Worker encountered an unexpected error: {e}"
self.logger.info(self._stop_fail_message)
self.logger.error(self._stop_fail_message)
self._stop.set()
raise
finally:
Expand Down Expand Up @@ -659,6 +665,7 @@ def _start_action(self) -> None:
except Exception as e:
if self._output_sync_target_action:
action_definition = self._output_sync_target_action.definition
self._output_sync_target_action = None

logger.error(
SessionActionLogEvent(
Expand Down Expand Up @@ -1104,14 +1111,15 @@ def _action_updated_impl(
if self._output_sync_target_action is not None:

if OPENJD_ACTION_STATE_TO_DEADLINE_COMPLETED_STATUS.get(action_status.state, None):
# if the current action is a sync output job attachments upload action and it's completed
# then we can update and clear the corresponding task run sync target action
task_run_action = self._output_sync_target_action
self._output_sync_target_action = None
logger.info(
f"task run is {task_run_action}, _output_sync_target_action is {self._output_sync_target_action}"
)
self._handle_action_update(is_unsuccessful, action_status, task_run_action, now)
else:
logger.info("AttachmentUploadAction is still running")
logger.debug(
f"SYNC_OUTPUT_JOB_ATTACHMENTS for {self._output_sync_target_action} is still running"
)

return None

Expand Down Expand Up @@ -1255,6 +1263,8 @@ def _handle_action_update(
fail_message="TIMEOUT - Exceeded the allotted runtime limit.",
)

# Only report action update when it's not attachment upload for syncing job attachment outputs,
# progress reporting is not supported by the output upload yet.
if not self._output_sync_target_action:
self._report_action_update(
SessionActionStatus(
Expand Down
15 changes: 12 additions & 3 deletions test/unit/sessions/actions/test_run_attachment_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pathlib import Path
import os
import sys
import sysconfig
import tempfile
from typing import TYPE_CHECKING, Generator
from unittest.mock import MagicMock, Mock, patch, ANY
Expand Down Expand Up @@ -40,6 +39,15 @@ def session_id() -> str:
return "session_id"


@pytest.fixture
def python_path() -> str:
executable_path = Path(sys.executable)
return str(
executable_path.parent
/ executable_path.name.lower().replace("pythonservice.exe", "python.exe")
)


@pytest.fixture
def session_dir(session_id: str):
with tempfile.TemporaryDirectory() as tmpdir_path:
Expand Down Expand Up @@ -93,7 +101,7 @@ def session(
session._job_details = job_details
session._job_attachment_details = job_attachment_details
session._os_user = job_user
session._session = mock_openjd_session_cls
session.openjd_session = mock_openjd_session_cls
session._queue_id = TestStart.QUEUE_ID
session._queue._job_id = TestStart.JOB_ID
return session
Expand All @@ -115,6 +123,7 @@ def test_attachment_download_action_start(
session_dir: str,
mock_asset_sync: MagicMock,
job_details: JobDetails,
python_path: str,
) -> None:
"""
Tests that AttachmentDownloadAction.start() calls AssetSync functions to prepare input
Expand Down Expand Up @@ -158,7 +167,7 @@ def test_attachment_download_action_start(
assert action._step_script == StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(
command=os.path.join(Path(sysconfig.get_path("scripts")), "python"),
command=python_path,
args=[
"{{ Task.File.AttachmentDownload }}",
"-pm",
Expand Down
Loading

0 comments on commit 0f9c3ff

Please sign in to comment.