Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run job attachment output upload as job user #495

Merged
merged 8 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
StepActionType = Literal["TASK_RUN"] # noqa
SyncInputJobAttachmentsActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentDownloadActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentUploadActionType = Literal["SYNC_OUTPUT_JOB_ATTACHMENTS"] # noqa
CompletedActionStatus = Literal["SUCCEEDED", "FAILED", "INTERRUPTED", "CANCELED", "NEVER_ATTEMPTED"]


Expand Down Expand Up @@ -94,6 +95,14 @@ class AttachmentDownloadAction(TypedDict):
stepId: NotRequired[str]


# This action is not from API, kepping it here for all action models to be in one place
class AttachmentUploadAction(TypedDict):
sessionActionId: str
actionType: AttachmentUploadActionType
stepId: str
taskId: str
jusiskin marked this conversation as resolved.
Show resolved Hide resolved


class LogConfiguration(TypedDict):
error: NotRequired[str]
logDriver: str
Expand Down
47 changes: 47 additions & 0 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
EnvironmentAction as EnvironmentActionApiModel,
SyncInputJobAttachmentsAction as SyncInputJobAttachmentsActionApiModel,
AttachmentDownloadAction as AttachmentDownloadActionApiModel,
AttachmentUploadAction as AttachmentUploadActionApiModel,
TaskRunAction as TaskRunActionApiModel,
EntityIdentifier,
EnvironmentDetailsIdentifier,
Expand All @@ -31,6 +32,7 @@
RunStepTaskAction,
SessionActionDefinition,
SyncInputJobAttachmentsAction,
AttachmentUploadAction,
AttachmentDownloadAction,
)
from .session_action_status import SessionActionStatus
Expand All @@ -52,6 +54,7 @@
TaskRunActionApiModel,
SyncInputJobAttachmentsActionApiModel,
AttachmentDownloadActionApiModel,
AttachmentUploadActionApiModel,
)
else:
D = TypeVar("D")
Expand All @@ -78,6 +81,7 @@ class SessionActionQueueEntry(Generic[D]):
SyncInputJobAttachmentsStepDependenciesQueueEntry = SessionActionQueueEntry[
SyncInputJobAttachmentsActionApiModel
]
AttachmentUploadActionQueueEntry = SessionActionQueueEntry[AttachmentUploadActionApiModel]
AttachmentDownloadActionQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel]
AttachmentDownloadActionStepDependenciesQueueEntry = SessionActionQueueEntry[
AttachmentDownloadActionApiModel
Expand All @@ -102,6 +106,7 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
]
Expand All @@ -111,6 +116,7 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry,
]
Expand Down Expand Up @@ -287,6 +293,32 @@ def cancel_all(
)
)

def insert_front(
self,
*,
action: AttachmentUploadActionApiModel,
) -> None:
"""Inserts an attachment upload action at the front of the queue

Parameters
----------
action : AttachmentUploadActionApiModel
The attachment upload action to be inserted to the front of queue
"""
action_type = action["actionType"]
action_id = action["sessionActionId"]
cancel_event = Event()

action = cast(AttachmentUploadActionApiModel, action)
queue_entry = AttachmentUploadActionQueueEntry(
cancel=cancel_event,
definition=action,
)

self._actions.insert(0, queue_entry)
self._actions_by_id[action_id] = queue_entry
logger.debug("Successfully inserted front of queue: %s action: %s", action_type, action_id)

def replace(
self,
*,
Expand All @@ -295,6 +327,7 @@ def replace(
| TaskRunActionApiModel
| SyncInputJobAttachmentsActionApiModel
| AttachmentDownloadActionApiModel
| AttachmentUploadActionApiModel
],
) -> None:
"""Update the queue's actions"""
Expand All @@ -305,6 +338,7 @@ def replace(
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
] = []

action_ids_added = list[str]()
Expand Down Expand Up @@ -477,6 +511,19 @@ def dequeue(self) -> SessionActionDefinition | None:
task_parameter_values=task_parameters,
task_id=action_definition["taskId"],
)
elif action_type == "SYNC_OUTPUT_JOB_ATTACHMENTS":
action_queue_entry = cast(AttachmentUploadActionQueueEntry, action_queue_entry)
action_definition = action_queue_entry.definition
step_id = action_definition["stepId"]
task_id = action_definition["taskId"]

next_action = AttachmentUploadAction(
id=action_id,
session_id=self._session_id,
step_id=step_id,
task_id=task_id,
)

elif action_type == "SYNC_INPUT_JOB_ATTACHMENTS":
action_definition = action_queue_entry.definition
if ASSET_SYNC_JOB_USER_FEATURE:
Expand Down
2 changes: 2 additions & 0 deletions src/deadline_worker_agent/sessions/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .run_step_task import RunStepTaskAction
from .sync_input_job_attachments import SyncInputJobAttachmentsAction
from .run_attachment_download import AttachmentDownloadAction
from .run_attachment_upload import AttachmentUploadAction

__all__ = [
"EnterEnvironmentAction",
Expand All @@ -16,4 +17,5 @@
"SessionActionDefinition",
"SyncInputJobAttachmentsAction",
"AttachmentDownloadAction",
"AttachmentUploadAction",
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
Executor,
)
import os
from pathlib import Path
import sys
import json
from shlex import quote
from pathlib import Path
from logging import LoggerAdapter
import sysconfig
from typing import Any, TYPE_CHECKING, Optional
from dataclasses import asdict

Expand Down Expand Up @@ -46,7 +43,6 @@
)
from openjd.model import ParameterValue

from ..session import Session
from ...log_messages import SessionActionLogKind


Expand Down Expand Up @@ -91,33 +87,49 @@ def __init__(
self._step_details = step_details
self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id})

def set_step_script(self, manifests, path_mapping, s3_settings) -> None:
# TODO - update to run python as embedded file
profile = os.environ.get("AWS_PROFILE")
deadline_path = os.path.join(Path(sysconfig.get_path("scripts")), "deadline")
def set_step_script(self, manifests: list[str], s3_settings: JobAttachmentS3Settings) -> None:
"""Sets the step script for the action

script = "#!/usr/bin/env bash\n\n{} attachment download -m {} --path-mapping-rules {} --s3-root-uri {} --profile {}".format(
deadline_path,
" -m ".join(quote(v) for v in manifests),
quote(path_mapping),
Parameters
----------
manifests : list[str]
The job attachment manifest paths
s3_settings : JobAttachmentS3Settings
The job attachment S3 settings
"""
args = [
"{{ Task.File.AttachmentDownload }}",
"-pm",
"{{ Session.PathMappingRulesFile }}",
"-s3",
s3_settings.to_s3_root_uri(),
profile,
)
"-m",
]
args.extend(manifests)

self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(command="{{ Task.File.AttachmentDownload }}")
),
embeddedFiles=[
EmbeddedFileText_2023_09(
name="AttachmentDownload",
type=EmbeddedFileTypes_2023_09.TEXT,
runnable=True,
data=script,
)
],
executable_path = Path(sys.executable)
python_path = executable_path.parent / executable_path.name.lower().replace(
"pythonservice.exe", "python.exe"
)

with open(Path(__file__).parent / "scripts" / "attachment_download.py", "r") as f:
self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(
command=str(python_path),
args=args,
)
),
embeddedFiles=[
EmbeddedFileText_2023_09(
name="AttachmentDownload",
filename="download.py",
type=EmbeddedFileTypes_2023_09.TEXT,
data=f.read(),
)
],
)

def __eq__(self, other: Any) -> bool:
return (
type(self) is type(other)
Expand Down Expand Up @@ -173,6 +185,7 @@ def start(
raise RuntimeError(
"Job attachments must be synchronized before downloading Step dependencies."
)

step_dependencies = self._step_details.dependencies if self._step_details else []

assert job_attachment_settings.s3_bucket_name is not None
Expand Down Expand Up @@ -212,15 +225,15 @@ def start(
# returns root path to PathMappingRule mapping
dynamic_mapping_rules: dict[str, PathMappingRule] = (
session._asset_sync.generate_dynamic_path_mapping(
session_dir=session._session.working_directory,
session_dir=session.working_directory,
attachments=attachments,
)
)

# Aggregate manifests (with step step dependency handling)
merged_manifests_by_root: dict[str, BaseAssetManifest] = (
session._asset_sync._aggregate_asset_root_manifests(
session_dir=session._session.working_directory,
session_dir=session.working_directory,
s3_settings=s3_settings,
queue_id=session._queue_id,
job_id=session._queue._job_id,
Expand All @@ -245,39 +258,30 @@ def start(
# Open Job Description session implementation details -- path mappings are sorted.
# bisect.insort only supports the 'key' arg in 3.10 or later, so
# we first extend the list and sort it afterwards.
if session._session._path_mapping_rules:
session._session._path_mapping_rules.extend(
if session.openjd_session._path_mapping_rules:
session.openjd_session._path_mapping_rules.extend(
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
)
else:
session._session._path_mapping_rules = [
session.openjd_session._path_mapping_rules = [
OpenjdPathMapping.from_dict(r) for r in job_attachment_path_mappings
]

# Open Job Description Sessions sort the path mapping rules based on length of the parts make
# rules that are subsets of each other behave in a predictable manner. We must
# sort here since we're modifying that internal list appending to the list.
session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts))

# =========================== TO BE DELETED ===========================
path_mapping_file_path: str = os.path.join(
session._session.working_directory, "path_mapping"
session.openjd_session._path_mapping_rules.sort(
key=lambda rule: -len(rule.source_path.parts)
)
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
for rule in job_attachment_path_mappings:
rule["source_path"] = rule["destination_path"]

with open(path_mapping_file_path, "w", encoding="utf8") as f:
f.write(json.dumps([rule for rule in job_attachment_path_mappings]))
# =========================== TO BE DELETED ===========================

manifest_paths = session._asset_sync._check_and_write_local_manifests(
manifest_paths_by_root = session._asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root,
manifest_write_dir=str(session._session.working_directory),
manifest_write_dir=str(session.working_directory),
)
session.manifest_paths_by_root = manifest_paths_by_root

self.set_step_script(
manifests=manifest_paths,
path_mapping=path_mapping_file_path,
manifests=manifest_paths_by_root.values(), # type: ignore
s3_settings=s3_settings,
)
assert self._step_script is not None
Expand Down Expand Up @@ -327,7 +331,7 @@ def _start_vfs(
assert session._asset_sync is not None
session._asset_sync._launch_vfs(
s3_settings=s3_settings,
session_dir=session._session.working_directory,
session_dir=session.working_directory,
fs_permission_settings=fs_permission_settings,
merged_manifests_by_root=merged_manifests_by_root,
os_env_vars=dict(os.environ),
Expand Down
Loading
Loading