Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run job attachment output upload as job user #495

Merged
merged 8 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
StepActionType = Literal["TASK_RUN"] # noqa
SyncInputJobAttachmentsActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentDownloadActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentUploadActionType = Literal["SYNC_OUTPUT_JOB_ATTACHMENTS"] # noqa
CompletedActionStatus = Literal["SUCCEEDED", "FAILED", "INTERRUPTED", "CANCELED", "NEVER_ATTEMPTED"]


Expand Down Expand Up @@ -94,6 +95,13 @@ class AttachmentDownloadAction(TypedDict):
stepId: NotRequired[str]


class AttachmentUploadAction(TypedDict):
sessionActionId: str
actionType: AttachmentUploadActionType
stepId: str
taskId: str
jusiskin marked this conversation as resolved.
Show resolved Hide resolved


class LogConfiguration(TypedDict):
error: NotRequired[str]
logDriver: str
Expand Down
52 changes: 52 additions & 0 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
EnvironmentAction as EnvironmentActionApiModel,
SyncInputJobAttachmentsAction as SyncInputJobAttachmentsActionApiModel,
AttachmentDownloadAction as AttachmentDownloadActionApiModel,
AttachmentUploadAction as AttachmentUploadActionApiModel,
TaskRunAction as TaskRunActionApiModel,
EntityIdentifier,
EnvironmentDetailsIdentifier,
Expand All @@ -31,6 +32,7 @@
RunStepTaskAction,
SessionActionDefinition,
SyncInputJobAttachmentsAction,
AttachmentUploadAction,
AttachmentDownloadAction,
)
from .session_action_status import SessionActionStatus
Expand All @@ -52,6 +54,7 @@
TaskRunActionApiModel,
SyncInputJobAttachmentsActionApiModel,
AttachmentDownloadActionApiModel,
AttachmentUploadActionApiModel,
)
else:
D = TypeVar("D")
Expand All @@ -78,6 +81,7 @@ class SessionActionQueueEntry(Generic[D]):
SyncInputJobAttachmentsStepDependenciesQueueEntry = SessionActionQueueEntry[
SyncInputJobAttachmentsActionApiModel
]
AttachmentUploadActionQueueEntry = SessionActionQueueEntry[AttachmentUploadActionApiModel]
AttachmentDownloadActionQueueEntry = SessionActionQueueEntry[AttachmentDownloadActionApiModel]
AttachmentDownloadActionStepDependenciesQueueEntry = SessionActionQueueEntry[
AttachmentDownloadActionApiModel
Expand All @@ -102,6 +106,7 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
]
Expand All @@ -111,6 +116,7 @@ class SessionActionQueue:
| TaskRunQueueEntry
| SyncInputJobAttachmentsQueueEntry
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry,
]
Expand Down Expand Up @@ -287,6 +293,37 @@ def cancel_all(
)
)

def insert_front(
self,
*,
action: AttachmentUploadActionApiModel,
) -> None:
"""Inserts an attachmen upload action at the front of the queue
godobyte marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
action : SessionActionQueueEntry
The action to be inserted
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
"""
action_type = action["actionType"]
action_id = action["sessionActionId"]
logger.info(
"Inserting attachment upload to the front of queue: %s action: %s",
action_type,
action_id,
)
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
cancel_event = Event()

action = cast(AttachmentUploadActionApiModel, action)
queue_entry = AttachmentUploadActionQueueEntry(
cancel=cancel_event,
definition=action,
)

self._actions.insert(0, queue_entry)
self._actions_by_id[action_id] = queue_entry
logger.info("Successfully inserted front of queue: %s action: %s", action_type, action_id)

def replace(
self,
*,
Expand All @@ -295,6 +332,7 @@ def replace(
| TaskRunActionApiModel
| SyncInputJobAttachmentsActionApiModel
| AttachmentDownloadActionApiModel
| AttachmentUploadActionApiModel
],
) -> None:
"""Update the queue's actions"""
Expand All @@ -305,6 +343,7 @@ def replace(
| SyncInputJobAttachmentsStepDependenciesQueueEntry
| AttachmentDownloadActionQueueEntry
| AttachmentDownloadActionStepDependenciesQueueEntry
| AttachmentUploadActionQueueEntry
] = []

action_ids_added = list[str]()
Expand Down Expand Up @@ -477,6 +516,19 @@ def dequeue(self) -> SessionActionDefinition | None:
task_parameter_values=task_parameters,
task_id=action_definition["taskId"],
)
elif action_type == "SYNC_OUTPUT_JOB_ATTACHMENTS":
action_queue_entry = cast(AttachmentUploadActionQueueEntry, action_queue_entry)
action_definition = action_queue_entry.definition
step_id = action_definition["stepId"]
task_id = action_definition["taskId"]

next_action = AttachmentUploadAction(
id=action_id,
session_id=self._session_id,
step_id=step_id,
task_id=task_id,
)

elif action_type == "SYNC_INPUT_JOB_ATTACHMENTS":
action_definition = action_queue_entry.definition
if ASSET_SYNC_JOB_USER_FEATURE:
Expand Down
2 changes: 2 additions & 0 deletions src/deadline_worker_agent/sessions/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .run_step_task import RunStepTaskAction
from .sync_input_job_attachments import SyncInputJobAttachmentsAction
from .run_attachment_download import AttachmentDownloadAction
from .run_attachment_upload import AttachmentUploadAction

__all__ = [
"EnterEnvironmentAction",
Expand All @@ -16,4 +17,5 @@
"SessionActionDefinition",
"SyncInputJobAttachmentsAction",
"AttachmentDownloadAction",
"AttachmentUploadAction",
]
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
from pathlib import Path
import sys
import json
from shlex import quote
from logging import LoggerAdapter
import sysconfig
Expand Down Expand Up @@ -91,32 +90,43 @@ def __init__(
self._step_details = step_details
self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id})

def set_step_script(self, manifests, path_mapping, s3_settings) -> None:
# TODO - update to run python as embedded file
profile = os.environ.get("AWS_PROFILE")
deadline_path = os.path.join(Path(sysconfig.get_path("scripts")), "deadline")
def set_step_script(self, manifests, s3_settings) -> None:
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
"""Sets the step script for the action

script = "#!/usr/bin/env bash\n\n{} attachment download -m {} --path-mapping-rules {} --s3-root-uri {} --profile {}".format(
deadline_path,
" -m ".join(quote(v) for v in manifests),
quote(path_mapping),
Parameters
----------
manifests : list[BaseAssetManifest]
The job attachment manifests
s3_settings : JobAttachmentS3Settings
The job attachment S3 settings
"""
args = [
"{{ Task.File.AttachmentDownload }}",
"-pm",
"{{ Session.PathMappingRulesFile }}",
"-s3",
s3_settings.to_s3_root_uri(),
profile,
)

self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(command="{{ Task.File.AttachmentDownload }}")
),
embeddedFiles=[
EmbeddedFileText_2023_09(
name="AttachmentDownload",
type=EmbeddedFileTypes_2023_09.TEXT,
runnable=True,
data=script,
)
],
)
"-m",
]
args.extend([quote(p) for p in manifests])
jusiskin marked this conversation as resolved.
Show resolved Hide resolved

with open(Path(__file__).parent / "scripts" / "attachment_download.py", "r") as f:
self._step_script = StepScript_2023_09(
actions=StepActions_2023_09(
onRun=Action_2023_09(
command=os.path.join(Path(sysconfig.get_path("scripts")), "python"),
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
args=args,
)
),
embeddedFiles=[
EmbeddedFileText_2023_09(
name="AttachmentDownload",
filename="download.py",
type=EmbeddedFileTypes_2023_09.TEXT,
data=f.read(),
)
],
)

def __eq__(self, other: Any) -> bool:
return (
Expand Down Expand Up @@ -173,6 +183,7 @@ def start(
raise RuntimeError(
"Job attachments must be synchronized before downloading Step dependencies."
)

step_dependencies = self._step_details.dependencies if self._step_details else []

assert job_attachment_settings.s3_bucket_name is not None
Expand Down Expand Up @@ -259,25 +270,14 @@ def start(
# sort here since we're modifying that internal list appending to the list.
session._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts))

# =========================== TO BE DELETED ===========================
path_mapping_file_path: str = os.path.join(
session._session.working_directory, "path_mapping"
)
for rule in job_attachment_path_mappings:
rule["source_path"] = rule["destination_path"]

with open(path_mapping_file_path, "w", encoding="utf8") as f:
f.write(json.dumps([rule for rule in job_attachment_path_mappings]))
# =========================== TO BE DELETED ===========================

manifest_paths = session._asset_sync._check_and_write_local_manifests(
manifest_paths_by_root = session._asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root,
manifest_write_dir=str(session._session.working_directory),
)
session.set_manifest_paths_by_root(manifest_paths_by_root)

self.set_step_script(
manifests=manifest_paths,
path_mapping=path_mapping_file_path,
manifests=manifest_paths_by_root.values(),
s3_settings=s3_settings,
)
assert self._step_script is not None
Expand Down
Loading
Loading