Skip to content

Commit

Permalink
fix(job_attachments)!: use correct profile for GetStorageProfileForQu…
Browse files Browse the repository at this point in the history
…eue API (#296)

Signed-off-by: Gahyun Suh <[email protected]>
  • Loading branch information
gahyusuh authored Apr 17, 2024
1 parent 83ee681 commit a8de5f6
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 85 deletions.
2 changes: 2 additions & 0 deletions src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"get_queue_parameter_definitions",
"get_telemetry_client",
"get_deadline_cloud_library_telemetry_client",
"get_storage_profile_for_queue",
]

# The following import is needed to prevent the following sporadic failure:
Expand Down Expand Up @@ -57,6 +58,7 @@
get_deadline_cloud_library_telemetry_client,
TelemetryClient,
)
from ._get_storage_profile_for_queue import get_storage_profile_for_queue

logger = getLogger(__name__)

Expand Down
44 changes: 44 additions & 0 deletions src/deadline/client/api/_get_storage_profile_for_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
from __future__ import annotations

__all__ = ["get_storage_profile_for_queue"]

from configparser import ConfigParser
from typing import Optional
from botocore.client import BaseClient # type: ignore[import]

from ._session import get_boto3_client
from ...job_attachments.models import (
FileSystemLocation,
FileSystemLocationType,
StorageProfile,
StorageProfileOperatingSystemFamily,
)


def get_storage_profile_for_queue(
farm_id: str,
queue_id: str,
storage_profile_id: str,
deadline: Optional[BaseClient] = None,
config: Optional[ConfigParser] = None,
) -> StorageProfile:
if deadline is None:
deadline = get_boto3_client("deadline", config=config)

storage_profile_response = deadline.get_storage_profile_for_queue(
farmId=farm_id, queueId=queue_id, storageProfileId=storage_profile_id
)
return StorageProfile(
storageProfileId=storage_profile_response["storageProfileId"],
displayName=storage_profile_response["displayName"],
osFamily=StorageProfileOperatingSystemFamily(storage_profile_response["osFamily"]),
fileSystemLocations=[
FileSystemLocation(
name=file_system_location["name"],
path=file_system_location["path"],
type=FileSystemLocationType(file_system_location["type"]),
)
for file_system_location in storage_profile_response.get("fileSystemLocations", [])
],
)
10 changes: 7 additions & 3 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def create_job_from_job_bundle(
/template.json|yaml (required): An Open Job Description job template that specifies the work to be done. Job parameters
are embedded here.
/parameter_values.yson|yaml (optional): If provided, these are parameter values for the job template and for
/parameter_values.json|yaml (optional): If provided, these are parameter values for the job template and for
the render farm. AWS Deadline Cloud-specific parameters are like "deadline:priority".
Looks like:
{
Expand Down Expand Up @@ -126,7 +126,7 @@ def create_job_from_job_bundle(
hashing_progress_callback / upload_progress_callback / create_job_result_callback (Callable -> bool):
Callbacks periodically called while hashing / uploading / waiting for job creation. If returns false,
the operation will be cancelled. If return true, the operation continues. Default behavior for each
is to not cancel the operation. hashing_progress_callback and upload_progress_callback both recieve
is to not cancel the operation. hashing_progress_callback and upload_progress_callback both receive
ProgressReport as a parameter, which can be used for projecting remaining time, as in done in the CLI.
"""

Expand Down Expand Up @@ -171,8 +171,12 @@ def create_job_from_job_bundle(
}

storage_profile_id = get_setting("settings.storage_profile_id", config=config)
storage_profile = None
if storage_profile_id:
create_job_args["storageProfileId"] = storage_profile_id
storage_profile = api.get_storage_profile_for_queue(
farm_id, queue_id, storage_profile_id, deadline
)

# The job parameters
job_bundle_parameters = read_job_bundle_parameters(job_bundle_dir)
Expand Down Expand Up @@ -233,7 +237,7 @@ def create_job_from_job_bundle(
input_paths=sorted(asset_references.input_filenames),
output_paths=sorted(asset_references.output_directories),
referenced_paths=sorted(asset_references.referenced_paths),
storage_profile_id=storage_profile_id,
storage_profile=storage_profile,
)
if upload_group.asset_groups:
if decide_cancel_submission_callback(
Expand Down
14 changes: 7 additions & 7 deletions src/deadline/client/ui/dialogs/submit_job_progress_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
split_parameter_args,
)
from deadline.job_attachments.exceptions import AssetSyncCancelledError
from deadline.job_attachments.models import AssetRootGroup, AssetRootManifest
from deadline.job_attachments.models import AssetRootGroup, AssetRootManifest, StorageProfile
from deadline.job_attachments.progress_tracker import ProgressReportMetadata, SummaryStatistics
from deadline.job_attachments.upload import S3AssetManager
from deadline.job_attachments._utils import _human_readable_file_size
Expand Down Expand Up @@ -100,7 +100,7 @@ def start_submission(
self,
farm_id: str,
queue_id: str,
storage_profile_id: str,
storage_profile: Optional[StorageProfile],
job_bundle_dir: str,
queue_parameters: list[JobParameter],
asset_manager: Optional[S3AssetManager],
Expand All @@ -114,7 +114,7 @@ def start_submission(
Args:
farm_id (str): Id of the farm to submit to
queue_id (str): Id of the queue to submit to
storage_profile_id (str): Id of the storage profile to associate
storage_profile (StorageProfile): the storage profile to associate
with the job.
job_bundle_dir (str): Path to the folder containing the job bundle to
submit.
Expand All @@ -126,7 +126,7 @@ def start_submission(
"""
self._farm_id = farm_id
self._queue_id = queue_id
self._storage_profile_id = storage_profile_id
self._storage_profile = storage_profile
self._job_bundle_dir = job_bundle_dir
self._queue_parameters = queue_parameters
self._asset_manager = asset_manager
Expand Down Expand Up @@ -201,8 +201,8 @@ def _start_submission(self):
self._create_job_args["template"] = file_contents
self._create_job_args["templateType"] = file_type

if self._storage_profile_id:
self._create_job_args["storageProfileId"] = self._storage_profile_id
if self._storage_profile:
self._create_job_args["storageProfileId"] = self._storage_profile.storageProfileId

# The job parameters
job_bundle_parameters = read_job_bundle_parameters(self._job_bundle_dir)
Expand Down Expand Up @@ -252,7 +252,7 @@ def _start_submission(self):
input_paths=sorted(self.asset_references.input_filenames),
output_paths=sorted(self.asset_references.output_directories),
referenced_paths=sorted(self.asset_references.referenced_paths),
storage_profile_id=self._storage_profile_id,
storage_profile=self._storage_profile,
)
# If we find any Job Attachments, start a background thread
if upload_group.asset_groups:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ def on_submit(self):
queue_id = get_setting("defaults.queue_id")
storage_profile_id = get_setting("settings.storage_profile_id")

storage_profile = None
if storage_profile_id:
storage_profile = api.get_storage_profile_for_queue(
farm_id, queue_id, storage_profile_id, deadline
)

queue = deadline.get_queue(farmId=farm_id, queueId=queue_id)

queue_role_session = api.get_queue_user_boto3_session(
Expand Down Expand Up @@ -463,7 +469,7 @@ def on_submit(self):
self.create_job_response = job_progress_dialog.start_submission(
farm_id,
queue_id,
storage_profile_id,
storage_profile,
job_history_bundle_dir,
queue_parameters,
asset_manager,
Expand Down
39 changes: 0 additions & 39 deletions src/deadline/job_attachments/_aws/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@
from ..models import (
Attachments,
JobAttachmentsFileSystem,
FileSystemLocation,
FileSystemLocationType,
Job,
JobAttachmentS3Settings,
ManifestProperties,
StorageProfileOperatingSystemFamily,
PathFormat,
Queue,
StorageProfile,
)
from .aws_clients import get_deadline_client

Expand Down Expand Up @@ -110,38 +106,3 @@ def get_job(
else None
),
)


def get_storage_profile_for_queue(
farm_id: str,
queue_id: str,
storage_profile_id: str,
session: Optional[boto3.Session] = None,
deadline_endpoint_url: Optional[str] = None,
) -> StorageProfile:
"""
Retrieves a specific storage profile for queue from AWS Deadline Cloud.
"""
try:
response = get_deadline_client(
session=session, endpoint_url=deadline_endpoint_url
).get_storage_profile_for_queue(
farmId=farm_id, queueId=queue_id, storageProfileId=storage_profile_id
)
except ClientError as exc:
raise JobAttachmentsError(
f'Failed to get Storage profile "{storage_profile_id}" from Deadline'
) from exc
return StorageProfile(
storageProfileId=response["storageProfileId"],
displayName=response["displayName"],
osFamily=StorageProfileOperatingSystemFamily(response["osFamily"]),
fileSystemLocations=[
FileSystemLocation(
name=file_system_location["name"],
path=file_system_location["path"],
type=FileSystemLocationType(file_system_location["type"]),
)
for file_system_location in response.get("fileSystemLocations", [])
],
)
28 changes: 10 additions & 18 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
get_s3_client,
get_s3_transfer_manager,
)
from ._aws.deadline import get_storage_profile_for_queue
from .exceptions import (
COMMON_ERROR_GUIDANCE_FOR_S3,
AssetSyncCancelledError,
Expand All @@ -60,6 +59,7 @@
JobAttachmentS3Settings,
ManifestProperties,
PathFormat,
StorageProfile,
)
from .progress_tracker import (
ProgressStatus,
Expand Down Expand Up @@ -1004,21 +1004,13 @@ def _get_total_input_size_from_asset_group(

def _get_file_system_locations_by_type(
self,
storage_profile_id: str,
session: Optional[boto3.Session] = None,
storage_profile_for_queue: StorageProfile,
) -> Tuple[dict, dict]:
"""
Given the Storage Profile ID, fetches Storage Profile for Queue object, and
extracts and groups path and name pairs from the File System Locations into
two dicts - LOCAL and SHARED type, respectively. Returns a tuple of two dicts.
Given the Storage Profile for Queue object, extracts and groups
path and name pairs from the File System Locations into two dicts,
LOCAL and SHARED type, respectively. Returns a tuple of two dicts.
"""
storage_profile_for_queue = get_storage_profile_for_queue(
farm_id=self.farm_id,
queue_id=self.queue_id,
storage_profile_id=storage_profile_id,
session=session,
)

local_type_locations: dict[str, str] = {}
shared_type_locations: dict[str, str] = {}
for fs_loc in storage_profile_for_queue.fileSystemLocations:
Expand Down Expand Up @@ -1051,18 +1043,18 @@ def _group_asset_paths(
input_paths: list[str],
output_paths: list[str],
referenced_paths: list[str],
storage_profile_id: Optional[str] = None,
storage_profile: Optional[StorageProfile] = None,
) -> list[AssetRootGroup]:
"""
Resolves all of the paths that will be uploaded, sorting by storage profile location.
"""
local_type_locations: dict[str, str] = {}
shared_type_locations: dict[str, str] = {}
if storage_profile_id:
if storage_profile:
(
local_type_locations,
shared_type_locations,
) = self._get_file_system_locations_by_type(storage_profile_id)
) = self._get_file_system_locations_by_type(storage_profile)

# Group the paths by asset root, removing duplicates and empty strings
asset_groups: list[AssetRootGroup] = self._get_asset_groups(
Expand All @@ -1081,15 +1073,15 @@ def prepare_paths_for_upload(
input_paths: list[str],
output_paths: list[str],
referenced_paths: list[str],
storage_profile_id: Optional[str] = None,
storage_profile: Optional[StorageProfile] = None,
) -> AssetUploadGroup:
"""
Processes all of the paths required for upload, grouping them by asset root and local storage profile locations.
Returns an object containing the grouped paths, which also includes a dictionary of input directories and file counts
for files that were not under the root path or any local storage profile locations.
"""
asset_groups = self._group_asset_paths(
input_paths, output_paths, referenced_paths, storage_profile_id
input_paths, output_paths, referenced_paths, storage_profile
)
(input_file_count, input_bytes) = self._get_total_input_size_from_asset_group(asset_groups)
num_outside_files_by_bundle_path = self._get_deviated_file_count_by_root(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,7 @@ def test_upload_bucket_wrong_account(external_bucket: str, job_attachment_test:
with pytest.raises(
JobAttachmentsS3ClientError, match=".*when calling the PutObject operation: Access Denied"
):
# The attempt to upload the asset manifest should be blocked.
upload_group = asset_manager.prepare_paths_for_upload(
job_bundle_path=str(job_attachment_test.ASSET_ROOT),
input_paths=[str(job_attachment_test.SCENE_MA_PATH)],
Expand Down
Loading

0 comments on commit a8de5f6

Please sign in to comment.