Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate with job attachment download cli as a openjd action run #476

Merged
merged 6 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
EnvironmentActionType = Literal["ENV_ENTER", "ENV_EXIT"]
StepActionType = Literal["TASK_RUN"] # noqa
SyncInputJobAttachmentsActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
AttachmentDownloadActionType = Literal["SYNC_INPUT_JOB_ATTACHMENTS"] # noqa
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
CompletedActionStatus = Literal["SUCCEEDED", "FAILED", "INTERRUPTED", "CANCELED", "NEVER_ATTEMPTED"]


Expand Down Expand Up @@ -87,6 +88,12 @@ class SyncInputJobAttachmentsAction(TypedDict):
stepId: NotRequired[str]


class AttachmentDownloadAction(TypedDict):
sessionActionId: str
actionType: AttachmentDownloadActionType
stepId: NotRequired[str]
jusiskin marked this conversation as resolved.
Show resolved Hide resolved


class LogConfiguration(TypedDict):
error: NotRequired[str]
logDriver: str
Expand All @@ -97,7 +104,9 @@ class LogConfiguration(TypedDict):
class AssignedSession(TypedDict):
queueId: str
jobId: str
sessionActions: list[EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction]
sessionActions: list[
EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction | AttachmentDownloadAction
]
logConfiguration: NotRequired[LogConfiguration]


Expand Down
32 changes: 29 additions & 3 deletions src/deadline_worker_agent/boto/shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from boto3 import Session as _Session

from ..feature_flag import ASSET_SYNC_JOB_USER_FEATURE
from ..api_models import (
AssignedSession,
AssumeFleetRoleForWorkerResponse,
Expand All @@ -19,6 +20,7 @@
EnvironmentAction,
HostProperties,
SyncInputJobAttachmentsAction,
AttachmentDownloadAction,
TaskRunAction,
UpdatedSessionActionInfo,
UpdateWorkerResponse,
Expand Down Expand Up @@ -172,9 +174,26 @@ def parse_sync_input_job_attachments_action(
mapped_action["stepId"] = step_id
return mapped_action

def parse_attachment_download_action(
action: dict, action_id: str
) -> AttachmentDownloadAction:
mapped_action = AttachmentDownloadAction(
sessionActionId=action_id,
actionType="SYNC_INPUT_JOB_ATTACHMENTS",
)
if step_id := action.get("stepId", None):
mapped_action["stepId"] = step_id
return mapped_action

SESSION_ACTION_MAP: dict[
str,
Callable[[Any, str], EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction],
Callable[
[Any, str],
EnvironmentAction
| TaskRunAction
| SyncInputJobAttachmentsAction
| AttachmentDownloadAction,
],
] = {
"envEnter": lambda action, action_id: EnvironmentAction(
sessionActionId=action_id,
Expand All @@ -187,14 +206,21 @@ def parse_sync_input_job_attachments_action(
environmentId=action["environmentId"],
),
"taskRun": parse_task_run_action,
"syncInputJobAttachments": parse_sync_input_job_attachments_action,
"syncInputJobAttachments": (
parse_sync_input_job_attachments_action
if not ASSET_SYNC_JOB_USER_FEATURE
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
else parse_attachment_download_action
),
}

# Map the new session action structure to our internal model
mapped_sessions: dict[str, AssignedSession] = {}
for session_id, session in response["assignedSessions"].items():
mapped_actions: list[
EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction
EnvironmentAction
| TaskRunAction
| SyncInputJobAttachmentsAction
| AttachmentDownloadAction
] = []
for session_action in session["sessionActions"]:
assert len(session_action["definition"].items()) == 1
Expand Down
5 changes: 5 additions & 0 deletions src/deadline_worker_agent/feature_flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

import os

ASSET_SYNC_JOB_USER_FEATURE = os.environ.get("ASSET_SYNC_JOB_USER_FEATURE")
jusiskin marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 5 additions & 1 deletion src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
EnvironmentAction,
TaskRunAction,
SyncInputJobAttachmentsAction,
AttachmentDownloadAction,
)
from ..aws.deadline import (
DeadlineRequestConditionallyRecoverableError,
Expand Down Expand Up @@ -1362,7 +1363,10 @@ def _return_sessionactions_from_stopped_session(
self,
*,
assigned_session_actions: list[
EnvironmentAction | TaskRunAction | SyncInputJobAttachmentsAction
EnvironmentAction
| TaskRunAction
| SyncInputJobAttachmentsAction
| AttachmentDownloadAction
],
failure_message: str,
) -> None:
Expand Down
Loading
Loading