Skip to content

Commit

Permalink
feat: Create a CLI command to output a job schedule trace
Browse files Browse the repository at this point in the history
 * While at it, added the --priority option to `deadline bundle
   submit`

Signed-off-by: Mark Wiebe <[email protected]>
Signed-off-by: Mark Wiebe <[email protected]>
  • Loading branch information
mwiebe committed Sep 22, 2023
1 parent b9d2a28 commit c8add2f
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/deadline/client/cli/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _apply_cli_options_to_config(

if args:
raise RuntimeError(
f"Option names {args.keys()} are not standard Amazon Deadline Cloud CLI options, they need special handling"
f"Option names {tuple(args.keys())} are not standard Amazon Deadline Cloud CLI options, they need special handling"
)

return config
Expand Down
5 changes: 4 additions & 1 deletion src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def validate_parameters(ctx, param, value):
@click.option("--profile", help="The AWS profile to use.")
@click.option("--farm-id", help="The Amazon Deadline Cloud Farm to use.")
@click.option("--queue-id", help="The Amazon Deadline Cloud Queue to use.")
@click.option("--priority", type=int, default=50, help="The priority of the job.")
@click.option(
"--asset-loading-method",
help="The method to use for loading assets on the server (Overrides the default set in config file). Options are PRELOAD (load assets onto server first then run the job) or ON_DEMAND (load assets as requested).",
Expand All @@ -100,7 +101,7 @@ def validate_parameters(ctx, param, value):
)
@click.argument("job_bundle_dir")
@_handle_error
def bundle_submit(job_bundle_dir, asset_loading_method, parameter, yes, **args):
def bundle_submit(job_bundle_dir, asset_loading_method, parameter, priority, yes, **args):
"""
Submits an Open Job Description job bundle to Amazon Deadline Cloud.
"""
Expand Down Expand Up @@ -220,6 +221,8 @@ def bundle_submit(job_bundle_dir, asset_loading_method, parameter, yes, **args):
if job_parameters_formatted:
create_job_args["parameters"] = job_parameters_formatted

create_job_args["priority"] = priority

if logging.DEBUG >= logger.getEffectiveLevel():
logger.debug(json.dumps(create_job_args, indent=1))

Expand Down
265 changes: 265 additions & 0 deletions src/deadline/client/cli/_groups/job_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from pathlib import Path
import sys
from typing import Optional, Union
import datetime
from typing import Any

import click
from botocore.exceptions import ClientError # type: ignore[import]
Expand Down Expand Up @@ -592,3 +594,266 @@ def job_download_output(step_id, task_id, conflict_resolution, output, **args):
sys.exit(1)
else:
raise DeadlineOperationError(f"Failed to download output:\n{e}") from e


@cli_job.command(name="trace-schedule")
@click.option("--profile", help="The AWS profile to use.")
@click.option("--farm-id", help="The Amazon Deadline Cloud Farm to use.")
@click.option("--queue-id", help="The Amazon Deadline Cloud Queue to use.")
@click.option("--job-id", help="The Amazon Deadline Cloud Job to trace.")
@click.option("-v", "--verbose", is_flag=True, help="Output verbose trace details.")
@click.option(
"--trace-format",
type=click.Choice(
["chrome"],
case_sensitive=False,
),
help="The tracing format to write.",
)
@click.option("--trace-file", help="The tracing file to write.")
@_handle_error
def job_trace_schedule(verbose, trace_format, trace_file, **args):
"""
EXPERIMENTAL
Creates a trace file of all the sessions and session actions that were
run for the job.
To visualize the output file when providing the options
"--trace-format chrome --trace-file <output>.json", use
the https://ui.perfetto.dev Tracing UI and choose "Open trace file".
"""
# Get a temporary config object with the standard options handled
config = _apply_cli_options_to_config(
required_options={"farm_id", "queue_id", "job_id"}, **args
)

farm_id = config_file.get_setting("defaults.farm_id", config=config)
queue_id = config_file.get_setting("defaults.queue_id", config=config)
job_id = config_file.get_setting("defaults.job_id", config=config)

if trace_file and not trace_format:
raise DeadlineOperationError("Error: Must provide --trace-format with --trace-file.")

deadline = api.get_boto3_client("deadline", config=config)

click.echo("Getting the job...")
job = deadline.get_job(farmId=farm_id, queueId=queue_id, jobId=job_id)
job.pop("ResponseMetadata", None)

click.echo("Getting all the sessions for the job...")
response = deadline.list_sessions(farmId=farm_id, queueId=queue_id, jobId=job_id)
while "nextToken" in response:
old_list = response["sessions"]
response = deadline.list_sessions(
farmId=farm_id, queueId=queue_id, jobId=job_id, nextToken=response["nextToken"]
)
response["sessions"] = old_list + response["sessions"]
response.pop("ResponseMetadata", None)

sessions = sorted(response["sessions"], key=lambda session: session["startedAt"])

click.echo("Getting all the session actions for the job...")
for session in sessions:
response = deadline.list_session_actions(
farmId=farm_id, queueId=queue_id, jobId=job_id, sessionId=session["sessionId"]
)
while "nextToken" in response:
old_list = response["sessionactions"]
response = deadline.list_session_actions(
farmId=farm_id,
queueId=queue_id,
jobId=job_id,
sessionId=session["sessionId"],
nextToken=response["nextToken"],
)
response["sessionactions"] = old_list + response["sessionactions"]
response.pop("ResponseMetadata", None)

session["actions"] = response["sessionactions"]

# Cache steps and tasks by their id, to only get each once
steps: dict[str, Any] = {}
tasks: dict[str, Any] = {}

with click.progressbar(
length=len(sessions), label="Getting all the steps and tasks for the job..."
) as progressbar:
for index, session in enumerate(sessions):
session["index"] = index
for action in session["actions"]:
step_id = action["definition"].get("taskRun", {}).get("stepId")
task_id = action["definition"].get("taskRun", {}).get("taskId")
if step_id and task_id:
if "step" not in session:
if step_id in steps:
step = steps[step_id]
else:
step = deadline.get_step(
farmId=farm_id, queueId=queue_id, jobId=job_id, stepId=step_id
)
step.pop("ResponseMetadata", None)
steps[step_id] = step
session["step"] = step
elif session["step"]["stepId"] != step_id:
# The session itself doesn't have a step id, but for now the scheduler always creates new
# sessions for new steps.
raise DeadlineOperationError(
f"Session {session['sessionId']} ran more than one task! When this code was"
" written that wasn't possible."
)

if task_id in tasks:
task = tasks[task_id]
else:
task = deadline.get_task(
farmId=farm_id,
queueId=queue_id,
jobId=job_id,
stepId=step_id,
taskId=task_id,
)
task.pop("ResponseMetadata", None)
tasks[task_id] = task
action["task"] = task
progressbar.update(1)

# Collect the worker IDs that ran the sessions, and give them indexes to act as PIDs in the tracing file
worker_ids = {session["workerId"] for session in sessions}
workers = {worker_id: index for index, worker_id in enumerate(worker_ids)}

click.echo("Processing the trace data...")
trace_events = []

started_at = job["startedAt"]

def time_int(timestamp: datetime.datetime):
return int((timestamp - started_at) / datetime.timedelta(microseconds=1))

def duration_of(resource):
return time_int(resource["endedAt"]) - time_int(resource["startedAt"])

accumulators = {
"sessionCount": 0,
"sessionActionCount": 0,
"taskRunCount": 0,
"sessionDuration": 0,
"sessionActionDuration": 0,
"taskRunDuration": 0,
}

for session in sessions:
accumulators["sessionCount"] += 1
accumulators["sessionDuration"] += duration_of(session)

pid = workers[session["workerId"]]
session_event_name = f"{session['step']['name']} - {session['index']}"
trace_events.append(
{
"name": session_event_name,
"cat": "SESSION",
"ph": "B", # Begin Event
"ts": time_int(session["startedAt"]),
"pid": pid,
"tid": 0,
}
)

for action in session["actions"]:
accumulators["sessionActionCount"] += 1
accumulators["sessionActionDuration"] += duration_of(action)

name = action["sessionActionId"]
action_type = list(action["definition"].keys())[0]
if action_type == "taskRun":
accumulators["taskRunCount"] += 1
accumulators["taskRunDuration"] += duration_of(action)

task = action["task"]
parameters = task.get("parameters", {})
name = ",".join(
f"{param}={list(parameters[param].values())[0]}" for param in parameters
)
if not name:
name = "*"
elif action_type in ("envEnter", "envExit"):
name = action["definition"][action_type]["environmentId"].split(":")[-1]
trace_events.append(
{
"name": name,
"cat": action_type,
"ph": "X", # Complete Event
"ts": time_int(action["startedAt"]),
"dur": time_int(action["endedAt"]) - time_int(action["startedAt"]),
"pid": pid,
"tid": 0,
}
)
trace_events.append(
{
"name": session_event_name,
"cat": "SESSION",
"ph": "E", # End Event
"ts": time_int(session["endedAt"]),
"pid": pid,
"tid": 0,
}
)

if verbose:
click.echo(" ==== TRACE DATA ====")
click.echo(_cli_object_repr(job))
click.echo("")
click.echo(_cli_object_repr(sessions))

click.echo("")
click.echo(" ==== SUMMARY ====")
click.echo("")
click.echo(f"Session Count: {accumulators['sessionCount']}")
click.echo(
f"Session Total Duration: {datetime.timedelta(microseconds=accumulators['sessionDuration'])}"
)
click.echo(f"Session Action Count: {accumulators['sessionActionCount']}")
click.echo(
f"Session Action Total Duration: {datetime.timedelta(microseconds=accumulators['sessionActionDuration'])}"
)
click.echo(f"Task Run Count: {accumulators['taskRunCount']}")
click.echo(
f"Task Run Total Duration: {datetime.timedelta(microseconds=accumulators['taskRunDuration'])}"
)
click.echo(
f"Non-Task Run Count: {accumulators['sessionActionCount'] - accumulators['taskRunCount']}"
)
click.echo(
f"Non-Task Run Total Duration: {datetime.timedelta(microseconds=accumulators['sessionActionDuration'] - accumulators['taskRunDuration'])}"
)
click.echo("")
click.echo(
f"Within-session Overhead Duration: {datetime.timedelta(microseconds=(accumulators['sessionDuration'] - accumulators['sessionActionDuration']))}"
)
click.echo(
f"Within-session Overhead Duration Per Action: {datetime.timedelta(microseconds=(accumulators['sessionDuration'] - accumulators['sessionActionDuration']) / accumulators['sessionActionCount'])}"
)
click.echo(
f"Within-session Overhead: {100 * (accumulators['sessionDuration'] - accumulators['sessionActionDuration']) / accumulators['sessionDuration']:.1f}%"
)

tracing_data: dict[str, Any] = {
"traceEvents": trace_events,
# "displayTimeUnits": "s",
"otherData": {
"farmId": farm_id,
"queueId": queue_id,
"jobId": job_id,
"jobName": job["name"],
"startedAt": job["startedAt"].isoformat(sep="T"),
},
}
if "endedAt" in job:
tracing_data["otherData"]["endedAt"] = job["endedAt"].isoformat(sep="T")

tracing_data["otherData"].update(accumulators)

if trace_file:
with open(trace_file, "w", encoding="utf8") as f:
json.dump(tracing_data, f, indent=1)
1 change: 1 addition & 0 deletions src/deadline/job_attachments/_aws/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def get_queue(
queueId=response["queueId"],
farmId=response["farmId"],
status=response[status_key],
defaultBudgetAction=response["defaultBudgetAction"],
jobAttachmentSettings=job_attachment_settings,
)

Expand Down
1 change: 1 addition & 0 deletions src/deadline/job_attachments/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class Queue:
displayName: str
farmId: str # pylint: disable=invalid-name
status: str
defaultBudgetAction: str
jobAttachmentSettings: Optional[JobAttachmentS3Settings] = None # pylint: disable=invalid-name


Expand Down
3 changes: 3 additions & 0 deletions test/unit/deadline_client/cli/test_cli_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def test_cli_bundle_submit(fresh_deadline_config, temp_job_bundle_dir):
parameters=MOCK_PARAMETERS_CASES["TEMPLATE_ONLY_JSON"][2]["parameters"], # type: ignore
template=MOCK_JOB_TEMPLATE_CASES["MINIMAL_JSON"][1],
templateType="JSON",
priority=50,
)
assert temp_job_bundle_dir in result.output
assert MOCK_CREATE_JOB_RESPONSE["jobId"] in result.output
Expand Down Expand Up @@ -287,6 +288,7 @@ def test_cli_bundle_asset_load_method(fresh_deadline_config, temp_job_bundle_dir
template=MOCK_JOB_TEMPLATE_CASES["MINIMAL_JSON"][1],
templateType="JSON",
attachments={"assetLoadingMethod": expected_loading_method},
priority=50,
)
assert MOCK_CREATE_JOB_RESPONSE["jobId"] in result.output
assert MOCK_GET_JOB_RESPONSE["lifecycleStatusMessage"] in result.output
Expand Down Expand Up @@ -447,6 +449,7 @@ def test_cli_bundle_accept_upload_confirmation(fresh_deadline_config, temp_job_b
template=MOCK_JOB_TEMPLATE_CASES["MINIMAL_JSON"][1],
templateType="JSON",
attachments=ANY,
priority=50,
)
assert result.exit_code == 0

Expand Down
1 change: 1 addition & 0 deletions test/unit/deadline_job_attachments/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def fixture_default_queue(farm_id, queue_id, default_job_attachment_s3_settings)
queueId=queue_id,
farmId=farm_id,
status="ENABLED",
defaultBudgetAction="None",
jobAttachmentSettings=default_job_attachment_s3_settings,
)

Expand Down
1 change: 1 addition & 0 deletions test/unit/deadline_job_attachments/test_asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ def test_get_attachments_successful(
displayName="test-queue",
farmId="test-farm",
status="test",
defaultBudgetAction="NONE",
),
None,
),
Expand Down

0 comments on commit c8add2f

Please sign in to comment.