Skip to content

Commit

Permalink
feat: integrate with job attachment download cli as a openjd action run
Browse files Browse the repository at this point in the history
Signed-off-by: Godot Bian <[email protected]>
  • Loading branch information
godobyte committed Nov 14, 2024
1 parent 31ba080 commit 03d6345
Show file tree
Hide file tree
Showing 9 changed files with 729 additions and 64 deletions.
11 changes: 10 additions & 1 deletion src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
EnvironmentActionType = Literal["ENV_ENTER", "ENV_EXIT"]
StepActionType = Literal["TASK_RUN"] # noqa
SyncInputJobAttachmentsActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentDownloadActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
CompletedActionStatus = Literal["SUCCEEDED", "FAILED", "INTERRUPTED", "CANCELED", "NEVER_ATTEMPTED"]


Expand Down Expand Up @@ -87,6 +88,12 @@ class SyncInputJobAttachmentsAction(TypedDict):
stepId: NotRequired[str]


class AttachmentDownloadAction(TypedDict):
sessionActionId: str
actionType: AttachmentDownloadActionType
stepId: NotRequired[str]


class LogConfiguration(TypedDict):
error: NotRequired[str]
logDriver: str
Expand All @@ -97,7 +104,9 @@ class LogConfiguration(TypedDict):
class AssignedSession(TypedDict):
queueId: str
jobId: str
sessionActions: list[EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction]
sessionActions: list[
EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction | AttachmentDownloadAction
]
logConfiguration: NotRequired[LogConfiguration]


Expand Down
32 changes: 29 additions & 3 deletions src/deadline_worker_agent/boto/shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from boto3 import Session as _Session

from ..feature_flag import ASSET_SYNC_JOB_USER_FEATURE
from ..api_models import (
AssignedSession,
AssumeFleetRoleForWorkerResponse,
Expand All @@ -19,6 +20,7 @@
EnvironmentAction,
HostProperties,
SyncInputJobAttachmentsAction,
AttachmentDownloadAction,
TaskRunAction,
UpdatedSessionActionInfo,
UpdateWorkerResponse,
Expand Down Expand Up @@ -172,9 +174,26 @@ def parse_sync_input_job_attachments_action(
mapped_action["stepId"] = step_id
return mapped_action

def parse_attachment_download_action(
action: dict, action_id: str
) -> AttachmentDownloadAction:
mapped_action = AttachmentDownloadAction(
sessionActionId=action_id,
actionType="SYNC_INPUT_JOB_ATTACHMENTS",
)
if step_id := action.get("stepId", None):
mapped_action["stepId"] = step_id
return mapped_action

SESSION_ACTION_MAP: dict[
str,
Callable[[Any, str], EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction],
Callable[
[Any, str],
EnvironmentAction
| TaskRunAction
| SyncInputJobAttachmentsAction
| AttachmentDownloadAction,
],
] = {
"envEnter": lambda action, action_id: EnvironmentAction(
sessionActionId=action_id,
Expand All @@ -187,14 +206,21 @@ def parse_sync_input_job_attachments_action(
environmentId=action["environmentId"],
),
"taskRun": parse_task_run_action,
"syncInputJobAttachments": parse_sync_input_job_attachments_action,
"syncInputJobAttachments": (
parse_sync_input_job_attachments_action
if not ASSET_SYNC_JOB_USER_FEATURE
else parse_attachment_download_action
),
}

# Map the new session action structure to our internal model
mapped_sessions: dict[str, AssignedSession] = {}
for session_id, session in response["assignedSessions"].items():
mapped_actions: list[
EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction
EnvironmentAction
| TaskRunAction
| SyncInputJobAttachmentsAction
| AttachmentDownloadAction
] = []
for session_action in session["sessionActions"]:
assert len(session_action["definition"].items()) == 1
Expand Down
5 changes: 5 additions & 0 deletions src/deadline_worker_agent/feature_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

import os

ASSET_SYNC_JOB_USER_FEATURE = os.environ.get("ASSET_SYNC_JOB_USER_FEATURE")
6 changes: 5 additions & 1 deletion src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
EnvironmentAction,
TaskRunAction,
SyncInputJobAttachmentsAction,
AttachmentDownloadAction,
)
from ..aws.deadline import (
DeadlineRequestConditionallyRecoverableError,
Expand Down Expand Up @@ -1362,7 +1363,10 @@ def _return_sessionactions_from_stopped_session(
self,
*,
assigned_session_actions: list[
EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction
EnvironmentAction
| TaskRunAction
| SyncInputJobAttachmentsAction
| AttachmentDownloadAction
],
failure_message: str,
) -> None:
Expand Down
Loading

0 comments on commit 03d6345

Please sign in to comment.