Skip to content

Commit

Permalink
feat: add Worker pre-install commands, --start, and Job.get_logs
Browse files Browse the repository at this point in the history
Signed-off-by: Jericho Tolentino <[email protected]>
  • Loading branch information
jericht committed Oct 12, 2023
1 parent 7c06bf0 commit 01e314a
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/deadline_test_fixtures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
from .deadline import (
CloudWatchLogEvent,
CommandResult,
DeadlineClient,
DeadlineWorker,
Expand All @@ -10,6 +11,7 @@
Farm,
Fleet,
PipInstall,
PosixUser,
Queue,
QueueFleetAssociation,
TaskStatus,
Expand All @@ -34,6 +36,7 @@

__all__ = [
"BootstrapResources",
"CloudWatchLogEvent",
"CodeArtifactRepositoryInfo",
"CommandResult",
"DeadlineResources",
Expand All @@ -51,6 +54,7 @@
"JobAttachmentSettings",
"JobAttachmentManager",
"PipInstall",
"PosixUser",
"S3Object",
"ServiceModel",
"StubDeadlineClient",
Expand Down
4 changes: 4 additions & 0 deletions src/deadline_test_fixtures/deadline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from .resources import (
CloudWatchLogEvent,
Farm,
Fleet,
Job,
Expand All @@ -16,9 +17,11 @@
DockerContainerWorker,
EC2InstanceWorker,
PipInstall,
PosixUser,
)

__all__ = [
"CloudWatchLogEvent",
"CommandResult",
"DeadlineClient",
"DeadlineWorker",
Expand All @@ -29,6 +32,7 @@
"Fleet",
"Job",
"PipInstall",
"PosixUser",
"Queue",
"QueueFleetAssociation",
"TaskStatus",
Expand Down
83 changes: 78 additions & 5 deletions src/deadline_test_fixtures/deadline/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from enum import Enum
from typing import Any, Callable, Literal

from botocore.client import BaseClient

from .client import DeadlineClient
from ..models import JobAttachmentSettings
from ..util import call_api, clean_kwargs, wait_for
Expand All @@ -24,21 +26,23 @@ def create(
*,
client: DeadlineClient,
display_name: str,
raw_kwargs: dict | None = None,
) -> Farm:
response = call_api(
description=f"Create farm {display_name}",
fn=lambda: client.create_farm(
displayName=display_name,
**(raw_kwargs or {}),
),
)
farm_id = response["farmId"]
LOG.info(f"Created farm: {farm_id}")
return Farm(id=farm_id)

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete farm {self.id}",
fn=lambda: client.delete_farm(farmId=self.id),
fn=lambda: client.delete_farm(farmId=self.id, **(raw_kwargs or {})),
)


Expand All @@ -55,6 +59,7 @@ def create(
farm: Farm,
role_arn: str | None = None,
job_attachments: JobAttachmentSettings | None = None,
raw_kwargs: dict | None = None,
) -> Queue:
kwargs = clean_kwargs(
{
Expand All @@ -64,6 +69,7 @@ def create(
"jobAttachmentSettings": (
job_attachments.as_queue_settings() if job_attachments else None
),
**(raw_kwargs or {}),
}
)

Expand All @@ -79,10 +85,12 @@ def create(
farm=farm,
)

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete queue {self.id}",
fn=lambda: client.delete_queue(queueId=self.id, farmId=self.farm.id),
fn=lambda: client.delete_queue(
queueId=self.id, farmId=self.farm.id, **(raw_kwargs or {})
),
)


Expand All @@ -99,13 +107,15 @@ def create(
farm: Farm,
configuration: dict,
role_arn: str | None = None,
raw_kwargs: dict | None = None,
) -> Fleet:
kwargs = clean_kwargs(
{
"farmId": farm.id,
"displayName": display_name,
"roleArn": role_arn,
"configuration": configuration,
**(raw_kwargs or {}),
}
)
response = call_api(
Expand All @@ -127,12 +137,13 @@ def create(

return fleet

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete fleet {self.id}",
fn=lambda: client.delete_fleet(
farmId=self.farm.id,
fleetId=self.id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -184,13 +195,15 @@ def create(
farm: Farm,
queue: Queue,
fleet: Fleet,
raw_kwargs: dict | None = None,
) -> QueueFleetAssociation:
call_api(
description=f"Create queue-fleet association for queue {queue.id} and fleet {fleet.id} in farm {farm.id}",
fn=lambda: client.create_queue_fleet_association(
farmId=farm.id,
queueId=queue.id,
fleetId=fleet.id,
**(raw_kwargs or {}),
),
)
return QueueFleetAssociation(
Expand All @@ -206,6 +219,7 @@ def delete(
stop_mode: Literal[
"STOP_SCHEDULING_AND_CANCEL_TASKS", "STOP_SCHEDULING_AND_FINISH_TASKS"
] = "STOP_SCHEDULING_AND_CANCEL_TASKS",
raw_kwargs: dict | None = None,
) -> None:
self.stop(client=client, stop_mode=stop_mode)
call_api(
Expand All @@ -214,6 +228,7 @@ def delete(
farmId=self.farm.id,
queueId=self.queue.id,
fleetId=self.fleet.id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -274,6 +289,7 @@ class StrEnum(str, Enum):
class TaskStatus(StrEnum):
UNKNOWN = "UNKNOWN"
PENDING = "PENDING"
STARTING = "STARTING"
READY = "READY"
RUNNING = "RUNNING"
ASSIGNED = "ASSIGNED"
Expand Down Expand Up @@ -335,6 +351,7 @@ def submit(
target_task_run_status: str | None = None,
max_failed_tasks_count: int | None = None,
max_retries_per_task: int | None = None,
raw_kwargs: dict | None = None,
) -> Job:
kwargs = clean_kwargs(
{
Expand All @@ -348,6 +365,7 @@ def submit(
"targetTaskRunStatus": target_task_run_status,
"maxFailedTasksCount": max_failed_tasks_count,
"maxRetriesPerTask": max_retries_per_task,
**(raw_kwargs or {}),
}
)
create_job_response = call_api(
Expand Down Expand Up @@ -378,6 +396,7 @@ def get_job_details(
farm: Farm,
queue: Queue,
job_id: str,
raw_kwargs: dict | None = None,
) -> dict[str, Any]:
"""
Calls GetJob API and returns the parsed response, which can be used as
Expand All @@ -389,6 +408,7 @@ def get_job_details(
farmId=farm.id,
queueId=queue.id,
jobId=job_id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -434,6 +454,42 @@ def get_optional_field(
"description": get_optional_field("description"),
}

def get_logs(
self,
*,
deadline_client: DeadlineClient,
logs_client: BaseClient,
) -> dict[str, list[CloudWatchLogEvent]]:
"""
Gets the logs for this Job.
Args:
deadline_client (DeadlineClient): The DeadlineClient to use
logs_client (BaseClient): The CloudWatch logs boto client to use
Returns:
dict[str, list[CloudWatchLogEvent]]: A mapping session ID to log events
"""
list_sessions_response = deadline_client.list_sessions(
farmId=self.farm.id,
queueId=self.queue.id,
jobId=self.id,
)
sessions = list_sessions_response["sessions"]

session_log_map: dict[str, list[CloudWatchLogEvent]] = {}
for session in sessions:
session_id = session["sessionId"]
get_log_events_response = logs_client.get_log_events(
logGroupName=f"/aws/deadline/{self.farm.id}/{self.queue.id}",
logStreamName=session_id,
)
session_log_map[session_id] = [
CloudWatchLogEvent.from_api_response(le) for le in get_log_events_response["events"]
]

return session_log_map

def refresh_job_info(self, *, client: DeadlineClient) -> None:
"""
Calls GetJob API to refresh job information. The result is used to update the fields
Expand All @@ -458,13 +514,15 @@ def update(
target_task_run_status: str | None = None,
max_failed_tasks_count: int | None = None,
max_retries_per_task: int | None = None,
raw_kwargs: dict | None = None,
) -> None:
kwargs = clean_kwargs(
{
"priority": priority,
"targetTaskRunStatus": target_task_run_status,
"maxFailedTasksCount": max_failed_tasks_count,
"maxRetriesPerTask": max_retries_per_task,
**(raw_kwargs or {}),
}
)
call_api(
Expand Down Expand Up @@ -553,3 +611,18 @@ def __str__(self) -> str: # pragma: no cover
f"ended_at: {self.ended_at}",
]
)


@dataclass
class CloudWatchLogEvent:
ingestion_time: int
message: str
timestamp: int

@staticmethod
def from_api_response(response: dict) -> CloudWatchLogEvent:
return CloudWatchLogEvent(
ingestion_time=response["ingestionTime"],
message=response["message"],
timestamp=response["timestamp"],
)
Loading

0 comments on commit 01e314a

Please sign in to comment.