Skip to content

Commit

Permalink
feat!: add Worker pre-install commands, --start, and Job.get_logs
Browse files Browse the repository at this point in the history
Signed-off-by: Jericho Tolentino <[email protected]>
  • Loading branch information
jericht committed Oct 11, 2023
1 parent 7c06bf0 commit 95e3dac
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/deadline_test_fixtures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
from .deadline import (
CloudWatchLogEvent,
CommandResult,
DeadlineClient,
DeadlineWorker,
Expand Down Expand Up @@ -34,6 +35,7 @@

__all__ = [
"BootstrapResources",
"CloudWatchLogEvent",
"CodeArtifactRepositoryInfo",
"CommandResult",
"DeadlineResources",
Expand Down
2 changes: 2 additions & 0 deletions src/deadline_test_fixtures/deadline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from .resources import (
CloudWatchLogEvent,
Farm,
Fleet,
Job,
Expand All @@ -19,6 +20,7 @@
)

__all__ = [
"CloudWatchLogEvent",
"CommandResult",
"DeadlineClient",
"DeadlineWorker",
Expand Down
83 changes: 78 additions & 5 deletions src/deadline_test_fixtures/deadline/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from enum import Enum
from typing import Any, Callable, Literal

from botocore.client import BaseClient

from .client import DeadlineClient
from ..models import JobAttachmentSettings
from ..util import call_api, clean_kwargs, wait_for
Expand All @@ -24,21 +26,23 @@ def create(
*,
client: DeadlineClient,
display_name: str,
raw_kwargs: dict | None = None,
) -> Farm:
response = call_api(
description=f"Create farm {display_name}",
fn=lambda: client.create_farm(
displayName=display_name,
**(raw_kwargs or {}),
),
)
farm_id = response["farmId"]
LOG.info(f"Created farm: {farm_id}")
return Farm(id=farm_id)

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete farm {self.id}",
fn=lambda: client.delete_farm(farmId=self.id),
fn=lambda: client.delete_farm(farmId=self.id, **(raw_kwargs or {})),
)


Expand All @@ -55,6 +59,7 @@ def create(
farm: Farm,
role_arn: str | None = None,
job_attachments: JobAttachmentSettings | None = None,
raw_kwargs: dict | None = None,
) -> Queue:
kwargs = clean_kwargs(
{
Expand All @@ -64,6 +69,7 @@ def create(
"jobAttachmentSettings": (
job_attachments.as_queue_settings() if job_attachments else None
),
**(raw_kwargs or {}),
}
)

Expand All @@ -79,10 +85,12 @@ def create(
farm=farm,
)

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete queue {self.id}",
fn=lambda: client.delete_queue(queueId=self.id, farmId=self.farm.id),
fn=lambda: client.delete_queue(
queueId=self.id, farmId=self.farm.id, **(raw_kwargs or {})
),
)


Expand All @@ -99,13 +107,15 @@ def create(
farm: Farm,
configuration: dict,
role_arn: str | None = None,
raw_kwargs: dict | None = None,
) -> Fleet:
kwargs = clean_kwargs(
{
"farmId": farm.id,
"displayName": display_name,
"roleArn": role_arn,
"configuration": configuration,
**(raw_kwargs or {}),
}
)
response = call_api(
Expand All @@ -127,12 +137,13 @@ def create(

return fleet

def delete(self, *, client: DeadlineClient) -> None:
def delete(self, *, client: DeadlineClient, raw_kwargs: dict | None = None) -> None:
call_api(
description=f"Delete fleet {self.id}",
fn=lambda: client.delete_fleet(
farmId=self.farm.id,
fleetId=self.id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -184,13 +195,15 @@ def create(
farm: Farm,
queue: Queue,
fleet: Fleet,
raw_kwargs: dict | None = None,
) -> QueueFleetAssociation:
call_api(
description=f"Create queue-fleet association for queue {queue.id} and fleet {fleet.id} in farm {farm.id}",
fn=lambda: client.create_queue_fleet_association(
farmId=farm.id,
queueId=queue.id,
fleetId=fleet.id,
**(raw_kwargs or {}),
),
)
return QueueFleetAssociation(
Expand All @@ -206,6 +219,7 @@ def delete(
stop_mode: Literal[
"STOP_SCHEDULING_AND_CANCEL_TASKS", "STOP_SCHEDULING_AND_FINISH_TASKS"
] = "STOP_SCHEDULING_AND_CANCEL_TASKS",
raw_kwargs: dict | None = None,
) -> None:
self.stop(client=client, stop_mode=stop_mode)
call_api(
Expand All @@ -214,6 +228,7 @@ def delete(
farmId=self.farm.id,
queueId=self.queue.id,
fleetId=self.fleet.id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -274,6 +289,7 @@ class StrEnum(str, Enum):
class TaskStatus(StrEnum):
UNKNOWN = "UNKNOWN"
PENDING = "PENDING"
STARTING = "STARTING"
READY = "READY"
RUNNING = "RUNNING"
ASSIGNED = "ASSIGNED"
Expand Down Expand Up @@ -335,6 +351,7 @@ def submit(
target_task_run_status: str | None = None,
max_failed_tasks_count: int | None = None,
max_retries_per_task: int | None = None,
raw_kwargs: dict | None = None,
) -> Job:
kwargs = clean_kwargs(
{
Expand All @@ -348,6 +365,7 @@ def submit(
"targetTaskRunStatus": target_task_run_status,
"maxFailedTasksCount": max_failed_tasks_count,
"maxRetriesPerTask": max_retries_per_task,
**(raw_kwargs or {}),
}
)
create_job_response = call_api(
Expand Down Expand Up @@ -378,6 +396,7 @@ def get_job_details(
farm: Farm,
queue: Queue,
job_id: str,
raw_kwargs: dict | None = None,
) -> dict[str, Any]:
"""
Calls GetJob API and returns the parsed response, which can be used as
Expand All @@ -389,6 +408,7 @@ def get_job_details(
farmId=farm.id,
queueId=queue.id,
jobId=job_id,
**(raw_kwargs or {}),
),
)

Expand Down Expand Up @@ -434,6 +454,42 @@ def get_optional_field(
"description": get_optional_field("description"),
}

def get_logs(
self,
*,
deadline_client: DeadlineClient,
logs_client: BaseClient,
) -> dict[str, list[CloudWatchLogEvent]]:
"""
Gets the logs for this Job.
Args:
deadline_client (DeadlineClient): The DeadlineClient to use
logs_client (BaseClient): The CloudWatch logs boto client to use
Returns:
dict[str, list[CloudWatchLogEvent]]: A mapping session ID to log events
"""
list_sessions_response = deadline_client.list_sessions(
farmId=self.farm.id,
queueId=self.queue.id,
jobId=self.id,
)
sessions = list_sessions_response["sessions"]

session_log_map: dict[str, list[CloudWatchLogEvent]] = {}
for session in sessions:
session_id = session["sessionId"]
get_log_events_response = logs_client.get_log_events(
logGroupName=f"/aws/deadline/{self.farm.id}/{self.queue.id}",
logStreamName=session_id,
)
session_log_map[session_id] = [
CloudWatchLogEvent.from_api_response(le) for le in get_log_events_response["events"]
]

return session_log_map

def refresh_job_info(self, *, client: DeadlineClient) -> None:
"""
Calls GetJob API to refresh job information. The result is used to update the fields
Expand All @@ -458,13 +514,15 @@ def update(
target_task_run_status: str | None = None,
max_failed_tasks_count: int | None = None,
max_retries_per_task: int | None = None,
raw_kwargs: dict | None = None,
) -> None:
kwargs = clean_kwargs(
{
"priority": priority,
"targetTaskRunStatus": target_task_run_status,
"maxFailedTasksCount": max_failed_tasks_count,
"maxRetriesPerTask": max_retries_per_task,
**(raw_kwargs or {}),
}
)
call_api(
Expand Down Expand Up @@ -553,3 +611,18 @@ def __str__(self) -> str: # pragma: no cover
f"ended_at: {self.ended_at}",
]
)


@dataclass
class CloudWatchLogEvent:
ingestion_time: int
message: str
timestamp: int

@staticmethod
def from_api_response(response: dict) -> CloudWatchLogEvent:
return CloudWatchLogEvent(
ingestion_time=response["ingestionTime"],
message=response["message"],
timestamp=response["timestamp"],
)
7 changes: 7 additions & 0 deletions src/deadline_test_fixtures/deadline/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: #
"""Get the command to configure the Worker. This must be run as root."""
cmds = [
config.worker_agent_install.install_command,
*(config.pre_install_commands or []),
# fmt: off
(
"install-deadline-worker "
Expand All @@ -44,6 +45,7 @@ def configure_worker_command(*, config: DeadlineWorkerConfiguration) -> str: #
+ f"--group {config.group} "
+ f"{'--allow-shutdown ' if config.allow_shutdown else ''}"
+ f"{'--no-install-service ' if config.no_install_service else ''}"
+ f"{'--start ' if config.start_service else ''}"
),
# fmt: on
]
Expand Down Expand Up @@ -117,10 +119,13 @@ class DeadlineWorkerConfiguration:
group: str
allow_shutdown: bool
worker_agent_install: PipInstall
start_service: bool = False
no_install_service: bool = False
service_model: ServiceModel | None = None
file_mappings: list[tuple[str, str]] | None = None
"""Mapping of files to copy from host environment to worker environment"""
pre_install_commands: list[str] | None = None
"""Commands to run before installing the Worker agent"""


@dataclass
Expand Down Expand Up @@ -389,6 +394,8 @@ def start(self) -> None:
# Environment variables for "run_container.sh"
run_container_env = {
**os.environ,
"FARM_ID": self.configuration.farm_id,
"FLEET_ID": self.configuration.fleet_id,
"AGENT_USER": self.configuration.user,
"SHARED_GROUP": self.configuration.group,
"JOB_USER": "jobuser",
Expand Down
6 changes: 3 additions & 3 deletions src/deadline_test_fixtures/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def deadline_resources(
farm.delete(client=deadline_client)


@pytest.fixture(scope="session")
@pytest.fixture(scope="class")
def worker_config(
deadline_resources: DeadlineResources,
codeartifact: CodeArtifactRepositoryInfo,
Expand Down Expand Up @@ -402,7 +402,7 @@ def worker_config(
)


@pytest.fixture(scope="session")
@pytest.fixture(scope="class")
def worker(
request: pytest.FixtureRequest,
deadline_client: DeadlineClient,
Expand All @@ -427,7 +427,7 @@ def worker(
"""

worker: DeadlineWorker
if os.environ.get("USE_DOCKER_WORKER", False):
if os.environ.get("USE_DOCKER_WORKER", "").lower() == "true":
LOG.info("Creating Docker worker")
worker = DockerContainerWorker(
configuration=worker_config,
Expand Down
Loading

0 comments on commit 95e3dac

Please sign in to comment.