Skip to content

Commit

Permalink
feat!: Update the Job Submit GUI to display Queue Parameters
Browse files Browse the repository at this point in the history
BREAKING-CHANGE: This changes the interface that consumers of the
deadline cloud client UI library use.

- Added an implementation of get_queue_parameter_definitions to
  deadline.client.api
- Set the default group label to queue parameters to be the queue
  environment name. Created a function in
  deadline.client.job_bundle.parameters to get the control type, so that
  this could be implemented by adding the 'userInterface' property.
  Modified the job template parameters widget to use the function
  instead of duplicating the code.
- Rename the job_template_parameters_widget to openjd_parameters_widget
  so it's a name applicable to both job and queue parameters. Add a
  signal called parameter_changed to the widget that gets emitted
  whenever a parameter value changed, with a deep copy of the parameter
  definition and the new value in its "value" key.
- Update the shared_job_settings_tab to move properties of the job out
  of the "Deadline Settings" into the top group. Rename that top group
  from "Description" to "Job Settings", and rename "Deadline Settings"
  to "Deadline Cloud Settings".
- Add get_parameter_values and set_parameter_value functions to the
  shared job settings tab and the job settings tab, so that they can be
  queried and set using general interface functions.
- Connect the parameter_changed signals from the shared job settings
  tab to set the job settings, and vice versa so that when a queue
  parameter and a job parameter are the same, the values between the two
  tabs are always up to date.
- Load the Queue parameters in a background thread so that the
  submission dialog opens sooner and shows the loading state.
- Wire up the deadline settings refresh to reload the queue parameters
  if the default queue id has changed in the settings.
- Move the 'deadline:*' parameters from the settings dataclasses into
  the queue parameters dictionaries. This makes their parameter values
  more consistent with the job bundle handling of them end-to-end.
- Rename FlatAssetReferences to AssetReferences, and add the ability to
  reference paths without making them input or output data flow. This
  affects job attachments, as any PATH parameter with NONE dataflow will
  become an asset reference that should have path mapping applied, and
  therefore needs to be in one of the AssetRootGroup objects that the
  job attachment library deduces.
- Update the tests for the reference paths ability. The job bundle
  submission asset refs tests pick up this case, because they include
  all the variations of data flow.

Signed-off-by: Mark Wiebe <[email protected]>
  • Loading branch information
mwiebe committed Sep 14, 2023
1 parent b4b6d3c commit 9017614
Show file tree
Hide file tree
Showing 32 changed files with 1,606 additions and 638 deletions.
2 changes: 1 addition & 1 deletion examples/submit_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def process_job_attachments(farm_id, queue_id, inputs, outputDir, deadline_clien
queue_id=queue_id,
job_attachment_settings=JobAttachmentS3Settings(**queue["jobAttachmentSettings"]),
)
(_, manifests) = asset_manager.hash_assets_and_create_manifest(inputs, [outputDir])
(_, manifests) = asset_manager.hash_assets_and_create_manifest(inputs, [outputDir], [])
(_, attachments) = asset_manager.upload_assets(manifests)
attachments = attachments.to_dict()
total = time.perf_counter() - start
Expand Down
1 change: 1 addition & 0 deletions examples/upload_cancel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def run():
(summary_statistics_hashing, manifests) = asset_manager.hash_assets_and_create_manifest(
files,
[root_path / "outputs"],
[],
on_preparing_to_submit=mock_on_preparing_to_submit,
)
print(f"Hashing Summary Statistics:\n{summary_statistics_hashing}")
Expand Down
2 changes: 1 addition & 1 deletion examples/upload_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
start = time.perf_counter()

(summary_statistics_hashing, manifests) = asset_manager.hash_assets_and_create_manifest(
files, [root_path / "outputs"]
files, [root_path / "outputs"], []
)
print(f"Summary Statistics for file hashing:\n{summary_statistics_hashing}")

Expand Down
2 changes: 1 addition & 1 deletion hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ lint = [
python = ["3.7", "3.8", "3.9", "3.10", "3.11"]

[envs.default.env-vars]
PIP_INDEX_URL="https://aws:{env:CODEARTIFACT_AUTH_TOKEN}@{env:CODEARTIFACT_DOMAIN}-{env:CODEARTIFACT_ACCOUNT_ID}.d.codeartifact.{env:CODEARTIFACT_REGION}.amazonaws.com/pypi/{env:CODEARTIFACT_REPOSITORY}/simple/"
# PIP_INDEX_URL="https://aws:{env:CODEARTIFACT_AUTH_TOKEN}@{env:CODEARTIFACT_DOMAIN}-{env:CODEARTIFACT_ACCOUNT_ID}.d.codeartifact.{env:CODEARTIFACT_REGION}.amazonaws.com/pypi/{env:CODEARTIFACT_REPOSITORY}/simple/"
SKIP_BOOTSTRAP_TEST_RESOURCES="True"

[envs.codebuild.scripts]
Expand Down
115 changes: 9 additions & 106 deletions src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"list_fleets",
"list_storage_profiles_for_queue",
"get_queue_boto3_session",
"get_queue_parameter_definitions",
"get_telemetry_client",
]

Expand All @@ -35,9 +36,15 @@
get_boto3_client,
get_boto3_session,
get_credentials_type,
get_user_and_identity_store_id,
get_studio_id,
)
from ._list_apis import (
list_farms,
list_queues,
list_jobs,
list_fleets,
list_storage_profiles_for_queue,
)
from ._queue_parameters import get_queue_parameter_definitions
from ._submit_job_bundle import create_job_from_job_bundle, wait_for_create_job_to_complete
from ._telemetry import get_telemetry_client, TelemetryClient

Expand Down Expand Up @@ -65,107 +72,3 @@ def check_deadline_api_available(config: Optional[ConfigParser] = None) -> bool:
except Exception:
logger.exception("Error invoking ListFarms")
return False


def _call_paginated_deadline_list_api(list_api, list_property_name, **kwargs):
"""
Calls a deadline:List* API repeatedly to concatenate all pages.
Example:
deadline = get_boto3_client("deadline")
return _call_paginated_deadline_list_api(deadline.list_farms, "farms", **kwargs)
Args:
list_api (callable): The List* API function to call, from the boto3 client.
list_property_name (str): The name of the property in the response that contains
the list.
"""
response = list_api(**kwargs)
if kwargs.get("dryRun", False):
return response
else:
result = {list_property_name: response[list_property_name]}

while "nextToken" in response:
response = list_api(nextToken=response["nextToken"], **kwargs)
result[list_property_name].extend(response[list_property_name])

return result


def list_farms(config=None, **kwargs):
"""
Calls the deadline:ListFarms API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the farms.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

if "studioId" not in kwargs:
studio_id = get_studio_id(config=config)
if studio_id:
kwargs["studioId"] = studio_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_farms, "farms", **kwargs)


def list_queues(config=None, **kwargs):
"""
Calls the deadline:ListQueues API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the queues.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_queues, "queues", **kwargs)


def list_jobs(config=None, **kwargs):
"""
Calls the deadline:ListJobs API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the jobs.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_jobs, "jobs", **kwargs)


def list_fleets(config=None, **kwargs):
"""
Calls the deadline:ListFleets API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the fleets.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_fleets, "fleets", **kwargs)


def list_storage_profiles_for_queue(config=None, **kwargs):
"""
Calls the deadline:ListStorageProfilesForQueue API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the storage profiles.
"""
deadline = get_boto3_client("deadline", config=config)

return _call_paginated_deadline_list_api(
deadline.list_storage_profiles_for_queue, "storageProfiles", **kwargs
)
111 changes: 111 additions & 0 deletions src/deadline/client/api/_list_apis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from ._session import (
get_boto3_client,
get_user_and_identity_store_id,
get_studio_id,
)


def _call_paginated_deadline_list_api(list_api, list_property_name, **kwargs):
"""
Calls a deadline:List* API repeatedly to concatenate all pages.
Example:
deadline = get_boto3_client("deadline")
return _call_paginated_deadline_list_api(deadline.list_farms, "farms", **kwargs)
Args:
list_api (callable): The List* API function to call, from the boto3 client.
list_property_name (str): The name of the property in the response that contains
the list.
"""
response = list_api(**kwargs)
if kwargs.get("dryRun", False):
return response
else:
result = {list_property_name: response[list_property_name]}

while "nextToken" in response:
response = list_api(nextToken=response["nextToken"], **kwargs)
result[list_property_name].extend(response[list_property_name])

return result


def list_farms(config=None, **kwargs):
"""
Calls the deadline:ListFarms API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the farms.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

if "studioId" not in kwargs:
studio_id = get_studio_id(config=config)
if studio_id:
kwargs["studioId"] = studio_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_farms, "farms", **kwargs)


def list_queues(config=None, **kwargs):
"""
Calls the deadline:ListQueues API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the queues.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_queues, "queues", **kwargs)


def list_jobs(config=None, **kwargs):
"""
Calls the deadline:ListJobs API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the jobs.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_jobs, "jobs", **kwargs)


def list_fleets(config=None, **kwargs):
"""
Calls the deadline:ListFleets API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the fleets.
"""
if "principalId" not in kwargs:
user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
kwargs["principalId"] = user_id

deadline = get_boto3_client("deadline", config=config)
return _call_paginated_deadline_list_api(deadline.list_fleets, "fleets", **kwargs)


def list_storage_profiles_for_queue(config=None, **kwargs):
"""
Calls the deadline:ListStorageProfilesForQueue API call, applying the filter for user membership
depending on the configuration. If the response is paginated, it repeated
calls the API to get all the storage profiles.
"""
deadline = get_boto3_client("deadline", config=config)

return _call_paginated_deadline_list_api(
deadline.list_storage_profiles_for_queue, "storageProfiles", **kwargs
)
71 changes: 71 additions & 0 deletions src/deadline/client/api/_queue_parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
from __future__ import annotations

__all__ = ["get_queue_parameter_definitions"]

import yaml
from typing import Any

from ._list_apis import _call_paginated_deadline_list_api
from ._session import get_boto3_client
from ..exceptions import DeadlineOperationError
from ..job_bundle.parameters import (
get_ui_control_for_parameter_definition,
parameter_definition_difference,
)


def get_queue_parameter_definitions(
*, farmId: str, queueId: str, config=None
) -> list[dict[str, Any]]:
"""
This gets all the queue parameters definitions from the specified Queue. It does so
by getting all the full templates for queue environments, and then combining
them equivalently to the Deadline Cloud service logic.
"""
deadline = get_boto3_client("deadline", config=config)
response = _call_paginated_deadline_list_api(
deadline.list_queue_environments,
"environments",
farmId=farmId,
queueId=queueId,
)
queue_environments = sorted(
(
deadline.get_queue_environment(
farmId=farmId,
queueId=queueId,
queueEnvironmentId=queue_env["queueEnvironmentId"],
)
for queue_env in response["environments"]
),
key=lambda queue_env: queue_env["priority"],
)
queue_environment_templates = [
yaml.safe_load(queue_env["template"]) for queue_env in queue_environments
]

queue_parameters_definitions: dict[str, dict[str, Any]] = {}
for template in queue_environment_templates:
for parameter in template["parameters"]: # TODO: change to parameterDefinitions
# If there is no group label, set it to the name of the Queue Environment
if not parameter.get("userInterface", {}).get("groupLabel"):
if "userInterface" not in parameter:
parameter["userInterface"] = {
"control": get_ui_control_for_parameter_definition(parameter)
}
parameter["userInterface"][
"groupLabel"
] = f"Queue Environment: {template['environment']['name']}"
existing_parameter = queue_parameters_definitions.get(parameter["name"])
if existing_parameter:
differences = parameter_definition_difference(existing_parameter, parameter)
if differences:
raise DeadlineOperationError(
f"Job template parameter {parameter['name']} is duplicated across queue environments with mismatched fields:\n"
+ " ".join(differences)
)
else:
queue_parameters_definitions[parameter["name"]] = parameter

return list(queue_parameters_definitions.values())
Loading

0 comments on commit 9017614

Please sign in to comment.