Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Worker pre-install commands, --start, and Job.get_logs #25

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/deadline_test_fixtures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
from .deadline import (
CloudWatchLogEvent,
CommandResult,
DeadlineClient,
DeadlineWorker,
Expand All @@ -10,6 +11,7 @@
Farm,
Fleet,
PipInstall,
PosixUser,
Queue,
QueueFleetAssociation,
TaskStatus,
Expand All @@ -34,6 +36,7 @@

__all__ = [
"BootstrapResources",
"CloudWatchLogEvent",
"CodeArtifactRepositoryInfo",
"CommandResult",
"DeadlineResources",
Expand All @@ -51,6 +54,7 @@
"JobAttachmentSettings",
"JobAttachmentManager",
"PipInstall",
"PosixUser",
"S3Object",
"ServiceModel",
"StubDeadlineClient",
Expand Down
4 changes: 4 additions & 0 deletions src/deadline_test_fixtures/deadline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from .resources import (
CloudWatchLogEvent,
Farm,
Fleet,
Job,
Expand All @@ -16,9 +17,11 @@
DockerContainerWorker,
EC2InstanceWorker,
PipInstall,
PosixUser,
)

__all__ = [
"CloudWatchLogEvent",
"CommandResult",
"DeadlineClient",
"DeadlineWorker",
Expand All @@ -29,6 +32,7 @@
"Fleet",
"Job",
"PipInstall",
"PosixUser",
"Queue",
"QueueFleetAssociation",
"TaskStatus",
Expand Down
92 changes: 87 additions & 5 deletions src/deadline_test_fixtures/deadline/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from enum import Enum
from typing import Any, Callable, Literal

from botocore.client import BaseClient

from .client import DeadlineClient
from ..models import JobAttachmentSettings
from ..util import call_api, clean_kwargs, wait_for
Expand All @@ -24,21 +26,23 @@ def create(
*,
client: DeadlineClient,
display_name: str,
raw_kwargs: dict | None = None,
) -> Farm:
response = call_api(
description=f"Create farm {display_name}",
fn=lambda: client.create_farm(
displayName=display_name,
**(raw_kwargs or {}),
),
)
farm_id = response["farmId"]
LOG.info(f"Created farm: {farm_id}")
return Farm(id=farm_id)

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete farm {self.id}",
fn=lambda: client.delete_farm(farmId=self.id),
fn=lambda: client.delete_farm(farmId=self.id, **(raw_kwargs or {})),
)


Expand All @@ -55,6 +59,7 @@ def create(
farm: Farm,
role_arn: str | None = None,
job_attachments: JobAttachmentSettings | None = None,
raw_kwargs: dict | None = None,
) -> Queue:
kwargs = clean_kwargs(
{
Expand All @@ -64,6 +69,7 @@ def create(
"jobAttachmentSettings": (
job_attachments.as_queue_settings() if job_attachments else None
),
**(raw_kwargs or {}),
}
)

Expand All @@ -79,10 +85,12 @@ def create(
farm=farm,
)

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete queue {self.id}",
fn=lambda: client.delete_queue(queueId=self.id, farmId=self.farm.id),
fn=lambda: client.delete_queue(
queueId=self.id, farmId=self.farm.id, **(raw_kwargs or {})
),
)


Expand All @@ -99,13 +107,15 @@ def create(
farm: Farm,
configuration: dict,
role_arn: str | None = None,
raw_kwargs: dict | None = None,
) -> Fleet:
kwargs = clean_kwargs(
{
"farmId": farm.id,
"displayName": display_name,
"roleArn": role_arn,
"configuration": configuration,
**(raw_kwargs or {}),
}
)
response = call_api(
Expand All @@ -127,12 +137,13 @@ def create(

return fleet

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete fleet {self.id}",
fn=lambda: client.delete_fleet(
farmId=self.farm.id,
fleetId=self.id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -184,13 +195,15 @@ def create(
farm: Farm,
queue: Queue,
fleet: Fleet,
raw_kwargs: dict | None = None,
) -> QueueFleetAssociation:
call_api(
description=f"Create queue-fleet association for queue {queue.id} and fleet {fleet.id} in farm {farm.id}",
fn=lambda: client.create_queue_fleet_association(
farmId=farm.id,
queueId=queue.id,
fleetId=fleet.id,
**(raw_kwargs or {}),
),
)
return QueueFleetAssociation(
Expand All @@ -206,6 +219,7 @@ def delete(
stop_mode: Literal[
"STOP_SCHEDULING_AND_CANCEL_TASKS", "STOP_SCHEDULING_AND_FINISH_TASKS"
] = "STOP_SCHEDULING_AND_CANCEL_TASKS",
raw_kwargs: dict | None = None,
) -> None:
self.stop(client=client, stop_mode=stop_mode)
call_api(
Expand All @@ -214,6 +228,7 @@ def delete(
farmId=self.farm.id,
queueId=self.queue.id,
fleetId=self.fleet.id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -336,6 +351,7 @@ def submit(
target_task_run_status: str | None = None,
max_failed_tasks_count: int | None = None,
max_retries_per_task: int | None = None,
raw_kwargs: dict | None = None,
) -> Job:
kwargs = clean_kwargs(
{
Expand All @@ -349,6 +365,7 @@ def submit(
"targetTaskRunStatus": target_task_run_status,
"maxFailedTasksCount": max_failed_tasks_count,
"maxRetriesPerTask": max_retries_per_task,
**(raw_kwargs or {}),
}
)
create_job_response = call_api(
Expand Down Expand Up @@ -379,6 +396,7 @@ def get_job_details(
farm: Farm,
queue: Queue,
job_id: str,
raw_kwargs: dict | None = None,
) -> dict[str, Any]:
"""
Calls GetJob API and returns the parsed response, which can be used as
Expand All @@ -390,6 +408,7 @@ def get_job_details(
farmId=farm.id,
queueId=queue.id,
jobId=job_id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -435,6 +454,46 @@ def get_optional_field(
"description": get_optional_field("description"),
}

def get_logs(
self,
*,
deadline_client: DeadlineClient,
logs_client: BaseClient,
) -> JobLogs:
"""
Gets the logs for this Job.

Args:
deadline_client (DeadlineClient): The DeadlineClient to use
logs_client (BaseClient): The CloudWatch logs boto client to use

Returns:
JobLogs: The job logs
"""
list_sessions_response = deadline_client.list_sessions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a paginated API. Jobs that have a lot of tasks/steps may end up with more sessions than can be sent in a single response.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canary permissions will need to be updated

farmId=self.farm.id,
queueId=self.queue.id,
jobId=self.id,
)
sessions = list_sessions_response["sessions"]

log_group_name = f"/aws/deadline/{self.farm.id}/{self.queue.id}"
session_log_map: dict[str, list[CloudWatchLogEvent]] = {}
for session in sessions:
session_id = session["sessionId"]
get_log_events_response = logs_client.get_log_events(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a paginated API. As written, this won't be able to fetch the full log stream if there's a lot of data in it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Canary permissions will need updating

logGroupName=log_group_name,
logStreamName=session_id,
)
session_log_map[session_id] = [
CloudWatchLogEvent.from_api_response(le) for le in get_log_events_response["events"]
]

return JobLogs(
log_group_name=log_group_name,
logs=session_log_map,
)

def refresh_job_info(self, *, client: DeadlineClient) -> None:
"""
Calls GetJob API to refresh job information. The result is used to update the fields
Expand All @@ -459,13 +518,15 @@ def update(
target_task_run_status: str | None = None,
max_failed_tasks_count: int | None = None,
max_retries_per_task: int | None = None,
raw_kwargs: dict | None = None,
) -> None:
kwargs = clean_kwargs(
{
"priority": priority,
"targetTaskRunStatus": target_task_run_status,
"maxFailedTasksCount": max_failed_tasks_count,
"maxRetriesPerTask": max_retries_per_task,
**(raw_kwargs or {}),
}
)
call_api(
Expand Down Expand Up @@ -554,3 +615,24 @@ def __str__(self) -> str: # pragma: no cover
f"ended_at: {self.ended_at}",
]
)


@dataclass
class JobLogs:
log_group_name: str
logs: dict[str, list[CloudWatchLogEvent]]


@dataclass
class CloudWatchLogEvent:
ingestion_time: int
message: str
timestamp: int

@staticmethod
def from_api_response(response: dict) -> CloudWatchLogEvent:
return CloudWatchLogEvent(
ingestion_time=response["ingestionTime"],
message=response["message"],
timestamp=response["timestamp"],
)
Loading