Skip to content

Commit

Permalink
feat: run job attachment output upload as job user (#495)
Browse files Browse the repository at this point in the history
Signed-off-by: Godot Bian <[email protected]>
  • Loading branch information
godobyte authored Dec 5, 2024
1 parent 1dbb4a6 commit 678f29a
Show file tree
Hide file tree
Showing 13 changed files with 1,005 additions and 127 deletions.
9 changes: 9 additions & 0 deletions src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
StepActionType = Literal["TASK_RUN"] # noqa
SyncInputJobAttachmentsActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentDownloadActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentUploadActionType = Literal["SYNC_OUTPUT_JOB_ATTACHMENTS"] # noqa
CompletedActionStatus = Literal["SUCCEEDED", "FAILED", "INTERRUPTED", "CANCELED", "NEVER_ATTEMPTED"]


Expand Down Expand Up @@ -94,6 +95,14 @@ class AttachmentDownloadAction(TypedDict):
stepId: NotRequired[str]


# This action is not from API, kepping it here for all action models to be in one place
class AttachmentUploadAction(TypedDict):
sessionActionId: str
actionType: AttachmentUploadActionType
stepId: str
taskId: str


class LogConfiguration(TypedDict):
error: NotRequired[str]
logDriver: str
Expand Down
47 changes: 47 additions & 0 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
EnvironmentAction as EnvironmentActionApiModel,
SyncInputJobAttachmentsAction as SyncInputJobAttachmentsActionApiModel,
AttachmentDownloadAction as AttachmentDownloadActionApiModel,
AttachmentUploadAction as AttachmentUploadActionApiModel,
TaskRunAction as TaskRunActionApiModel,
EntityIdentifier,
EnvironmentDetailsIdentifier,
Expand All @@ -31,6 +32,7 @@
RunStepTaskAction,
SessionActionDefinition,
SyncInputJobAttachmentsAction,
AttachmentUploadAction,
AttachmentDownloadAction,
)
from .session_action_status import SessionActionStatus
Expand All @@ -52,6 +54,7 @@
TaskRunActionApiModel,
SyncInputJobAttachmentsActionApiModel,
AttachmentDownloadActionApiModel,
AttachmentUploadActionApiModel,
)
else:
D = TypeVar("D")
Expand All @@ -78,6 +81,7 @@ class SessionActionQueueEntry(Generic[D]):
SyncInputJobAttachmentsStepDependenciesQueueEntry = SessionActionQueueEntry[
SyncInputJobAttachmentsActionApiModel
]
AttachmentUploadActionQueueEntry = SessionActionQueueEntry[AttachmentUploadActionApiModel]
AttachmentDownloadActionQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel]
AttachmentDownloadActionStepDependenciesQueueEntry = SessionActionQueueEntry[
AttachmentDownloadActionApiModel
Expand All @@ -102,6 +106,7 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
]
Expand All @@ -111,6 +116,7 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry,
]
Expand Down Expand Up @@ -287,6 +293,32 @@ def cancel_all(
)
)

def insert_front(
self,
*,
action: AttachmentUploadActionApiModel,
) -> None:
"""Inserts an attachment upload action at the front of the queue
Parameters
----------
action : AttachmentUploadActionApiModel
The attachment upload action to be inserted to the front of queue
"""
action_type = action["actionType"]
action_id = action["sessionActionId"]
cancel_event = Event()

action = cast(AttachmentUploadActionApiModel, action)
queue_entry = AttachmentUploadActionQueueEntry(
cancel=cancel_event,
definition=action,
)

self._actions.insert(0, queue_entry)
self._actions_by_id[action_id] = queue_entry
logger.debug("Successfully inserted front of queue: %s action: %s", action_type, action_id)

def replace(
self,
*,
Expand All @@ -295,6 +327,7 @@ def replace(
| TaskRunActionApiModel
| SyncInputJobAttachmentsActionApiModel
| AttachmentDownloadActionApiModel
| AttachmentUploadActionApiModel
],
) -> None:
"""Update the queue's actions"""
Expand All @@ -305,6 +338,7 @@ def replace(
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
] = []

action_ids_added = list[str]()
Expand Down Expand Up @@ -477,6 +511,19 @@ def dequeue(self) -> SessionActionDefinition | None:
task_parameter_values=task_parameters,
task_id=action_definition["taskId"],
)
elif action_type == "SYNC_OUTPUT_JOB_ATTACHMENTS":
action_queue_entry = cast(AttachmentUploadActionQueueEntry, action_queue_entry)
action_definition = action_queue_entry.definition
step_id = action_definition["stepId"]
task_id = action_definition["taskId"]

next_action = AttachmentUploadAction(
id=action_id,
session_id=self._session_id,
step_id=step_id,
task_id=task_id,
)

elif action_type == "SYNC_INPUT_JOB_ATTACHMENTS":
action_definition = action_queue_entry.definition
if ASSET_SYNC_JOB_USER_FEATURE:
Expand Down
2 changes: 2 additions & 0 deletions src/deadline_worker_agent/sessions/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .run_step_task import RunStepTaskAction
from .sync_input_job_attachments import SyncInputJobAttachmentsAction
from .run_attachment_download import AttachmentDownloadAction
from .run_attachment_upload import AttachmentUploadAction

__all__ = [
"EnterEnvironmentAction",
Expand All @@ -16,4 +17,5 @@
"SessionActionDefinition",
"SyncInputJobAttachmentsAction",
"AttachmentDownloadAction",
"AttachmentUploadAction",
]
100 changes: 52 additions & 48 deletions src/deadline_worker_agent/sessions/actions/run_attachment_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
Executor,
)
import os
from pathlib import Path
import sys
import json
from shlex import quote
from pathlib import Path
from logging import LoggerAdapter
import sysconfig
from typing import Any, TYPE_CHECKING, Optional
from dataclasses import asdict

Expand Down Expand Up @@ -46,7 +43,6 @@
)
from openjd.model import ParameterValue

from ..session import Session
from ...log_messages import SessionActionLogKind


Expand Down Expand Up @@ -91,33 +87,49 @@ def __init__(
self._step_details = step_details
self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id})

def set_step_script(self, manifests, path_mapping, s3_settings) -> None:
# TODO - update to run python as embedded file
profile = os.environ.get("AWS_PROFILE")
deadline_path = os.path.join(Path(sysconfig.get_path("scripts")), "deadline")
def set_step_script(self, manifests: list[str], s3_settings: JobAttachmentS3Settings) -> None:
"""Sets the step script for the action
script = "#!/usr/bin/env bash\n\n{} attachment download -m {} --path-mapping-rules {} --s3-root-uri {} --profile {}".format(
deadline_path,
" -m ".join(quote(v) for v in manifests),
quote(path_mapping),
Parameters
----------
manifests : list[str]
The job attachment manifest paths
s3_settings : JobAttachmentS3Settings
The job attachment S3 settings
"""
args = [
"{{ Task.File.AttachmentDownload }}",
"-pm",
"{{ Session.PathMappingRulesFile }}",
"-s3",
s3_settings.to_s3_root_uri(),
profile,
)
"-m",
]
args.extend(manifests)

self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(command="{{ Task.File.AttachmentDownload }}")
),
embeddedFiles=[
EmbeddedFileText_2023_09(
name="AttachmentDownload",
type=EmbeddedFileTypes_2023_09.TEXT,
runnable=True,
data=script,
)
],
executable_path = Path(sys.executable)
python_path = executable_path.parent / executable_path.name.lower().replace(
"pythonservice.exe", "python.exe"
)

with open(Path(__file__).parent / "scripts" / "attachment_download.py", "r") as f:
self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(
command=str(python_path),
args=args,
)
),
embeddedFiles=[
EmbeddedFileText_2023_09(
name="AttachmentDownload",
filename="download.py",
type=EmbeddedFileTypes_2023_09.TEXT,
data=f.read(),
)
],
)

def __eq__(self, other: Any) -> bool:
return (
type(self) is type(other)
Expand Down Expand Up @@ -173,6 +185,7 @@ def start(
raise RuntimeError(
"Job attachments must be synchronized before downloading Step dependencies."
)

step_dependencies = self._step_details.dependencies if self._step_details else []

assert job_attachment_settings.s3_bucket_name is not None
Expand Down Expand Up @@ -212,15 +225,15 @@ def start(
# returns root path to PathMappingRule mapping
dynamic_mapping_rules: dict[str, PathMappingRule] = (
session._asset_sync.generate_dynamic_path_mapping(
session_dir=session._session.working_directory,
session_dir=session.working_directory,
attachments=attachments,
)
)

# Aggregate manifests (with step step dependency handling)
merged_manifests_by_root: dict[str, BaseAssetManifest] = (
session._asset_sync._aggregate_asset_root_manifests(
session_dir=session._session.working_directory,
session_dir=session.working_directory,
s3_settings=s3_settings,
queue_id=session._queue_id,
job_id=session._queue._job_id,
Expand All @@ -245,39 +258,30 @@ def start(
# Open Job Description session implementation details -- path mappings are sorted.
# bisect.insort only supports the 'key' arg in 3.10 or later, so
# we first extend the list and sort it afterwards.
if session._session._path_mapping_rules:
session._session._path_mapping_rules.extend(
if session.openjd_session._path_mapping_rules:
session.openjd_session._path_mapping_rules.extend(
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
)
else:
session._session._path_mapping_rules = [
session.openjd_session._path_mapping_rules = [
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
]

# Open Job Description Sessions sort the path mapping rules based on length of the parts make
# rules that are subsets of each other behave in a predictable manner. We must
# sort here since we're modifying that internal list appending to the list.
session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts))

# =========================== TO BE DELETED ===========================
path_mapping_file_path: str = os.path.join(
session._session.working_directory, "path_mapping"
session.openjd_session._path_mapping_rules.sort(
key=lambda rule: -len(rule.source_path.parts)
)
for rule in job_attachment_path_mappings:
rule["source_path"] = rule["destination_path"]

with open(path_mapping_file_path, "w", encoding="utf8") as f:
f.write(json.dumps([rule for rule in job_attachment_path_mappings]))
# =========================== TO BE DELETED ===========================

manifest_paths = session._asset_sync._check_and_write_local_manifests(
manifest_paths_by_root = session._asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root,
manifest_write_dir=str(session._session.working_directory),
manifest_write_dir=str(session.working_directory),
)
session.manifest_paths_by_root = manifest_paths_by_root

self.set_step_script(
manifests=manifest_paths,
path_mapping=path_mapping_file_path,
manifests=manifest_paths_by_root.values(), # type: ignore
s3_settings=s3_settings,
)
assert self._step_script is not None
Expand Down Expand Up @@ -327,7 +331,7 @@ def _start_vfs(
assert session._asset_sync is not None
session._asset_sync._launch_vfs(
s3_settings=s3_settings,
session_dir=session._session.working_directory,
session_dir=session.working_directory,
fs_permission_settings=fs_permission_settings,
merged_manifests_by_root=merged_manifests_by_root,
os_env_vars=dict(os.environ),
Expand Down
Loading

0 comments on commit 678f29a

Please sign in to comment.