From 6cc75961e92259910b0a9cb4b3e51d78ec7774c2 Mon Sep 17 00:00:00 2001 From: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com> Date: Thu, 11 Apr 2024 12:21:10 +0000 Subject: [PATCH] fix(job_attachments)!: use correct profile for GetStorageProfileForQueue API Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com> --- src/deadline/client/api/__init__.py | 2 + .../api/_get_storage_profile_for_queue.py | 44 +++++++++++++++ src/deadline/client/api/_submit_job_bundle.py | 10 ++-- .../ui/dialogs/submit_job_progress_dialog.py | 14 ++--- .../dialogs/submit_job_to_deadline_dialog.py | 8 ++- src/deadline/job_attachments/_aws/deadline.py | 39 -------------- src/deadline/job_attachments/upload.py | 28 ++++------ .../test_job_attachments.py | 1 + .../api/test_job_bundle_submission.py | 53 +++++++++++++++++-- .../test_job_bundle_submission_asset_refs.py | 2 +- .../deadline_job_attachments/test_upload.py | 22 ++++---- 11 files changed, 138 insertions(+), 85 deletions(-) create mode 100644 src/deadline/client/api/_get_storage_profile_for_queue.py diff --git a/src/deadline/client/api/__init__.py b/src/deadline/client/api/__init__.py index 9e5899e2..58383c43 100644 --- a/src/deadline/client/api/__init__.py +++ b/src/deadline/client/api/__init__.py @@ -22,6 +22,7 @@ "get_queue_parameter_definitions", "get_telemetry_client", "get_deadline_cloud_library_telemetry_client", + "get_storage_profile_for_queue", ] # The following import is needed to prevent the following sporadic failure: @@ -57,6 +58,7 @@ get_deadline_cloud_library_telemetry_client, TelemetryClient, ) +from ._get_storage_profile_for_queue import get_storage_profile_for_queue logger = getLogger(__name__) diff --git a/src/deadline/client/api/_get_storage_profile_for_queue.py b/src/deadline/client/api/_get_storage_profile_for_queue.py new file mode 100644 index 00000000..c0f2290e --- /dev/null +++ b/src/deadline/client/api/_get_storage_profile_for_queue.py @@ -0,0 +1,44 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +from __future__ import annotations + +__all__ = ["get_storage_profile_for_queue"] + +from configparser import ConfigParser +from typing import Optional +from botocore.client import BaseClient # type: ignore[import] + +from ._session import get_boto3_client +from ...job_attachments.models import ( + FileSystemLocation, + FileSystemLocationType, + StorageProfile, + StorageProfileOperatingSystemFamily, +) + + +def get_storage_profile_for_queue( + farm_id: str, + queue_id: str, + storage_profile_id: str, + deadline: Optional[BaseClient] = None, + config: Optional[ConfigParser] = None, +) -> StorageProfile: + if deadline is None: + deadline = get_boto3_client("deadline", config=config) + + storage_profile_response = deadline.get_storage_profile_for_queue( + farmId=farm_id, queueId=queue_id, storageProfileId=storage_profile_id + ) + return StorageProfile( + storageProfileId=storage_profile_response["storageProfileId"], + displayName=storage_profile_response["displayName"], + osFamily=StorageProfileOperatingSystemFamily(storage_profile_response["osFamily"]), + fileSystemLocations=[ + FileSystemLocation( + name=file_system_location["name"], + path=file_system_location["path"], + type=FileSystemLocationType(file_system_location["type"]), + ) + for file_system_location in storage_profile_response.get("fileSystemLocations", []) + ], + ) diff --git a/src/deadline/client/api/_submit_job_bundle.py b/src/deadline/client/api/_submit_job_bundle.py index 63588e54..a93ea315 100644 --- a/src/deadline/client/api/_submit_job_bundle.py +++ b/src/deadline/client/api/_submit_job_bundle.py @@ -71,7 +71,7 @@ def create_job_from_job_bundle( /template.json|yaml (required): An Open Job Description job template that specifies the work to be done. Job parameters are embedded here. - /parameter_values.yson|yaml (optional): If provided, these are parameter values for the job template and for + /parameter_values.json|yaml (optional): If provided, these are parameter values for the job template and for the render farm. AWS Deadline Cloud-specific parameters are like "deadline:priority". Looks like: { @@ -126,7 +126,7 @@ def create_job_from_job_bundle( hashing_progress_callback / upload_progress_callback / create_job_result_callback (Callable -> bool): Callbacks periodically called while hashing / uploading / waiting for job creation. If returns false, the operation will be cancelled. If return true, the operation continues. Default behavior for each - is to not cancel the operation. hashing_progress_callback and upload_progress_callback both recieve + is to not cancel the operation. hashing_progress_callback and upload_progress_callback both receive ProgressReport as a parameter, which can be used for projecting remaining time, as in done in the CLI. """ @@ -171,8 +171,12 @@ def create_job_from_job_bundle( } storage_profile_id = get_setting("settings.storage_profile_id", config=config) + storage_profile = None if storage_profile_id: create_job_args["storageProfileId"] = storage_profile_id + storage_profile = api.get_storage_profile_for_queue( + farm_id, queue_id, storage_profile_id, deadline + ) # The job parameters job_bundle_parameters = read_job_bundle_parameters(job_bundle_dir) @@ -233,7 +237,7 @@ def create_job_from_job_bundle( input_paths=sorted(asset_references.input_filenames), output_paths=sorted(asset_references.output_directories), referenced_paths=sorted(asset_references.referenced_paths), - storage_profile_id=storage_profile_id, + storage_profile=storage_profile, ) if upload_group.asset_groups: if decide_cancel_submission_callback( diff --git a/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py b/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py index e6120b9b..a4049b2d 100644 --- a/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py +++ b/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py @@ -53,7 +53,7 @@ split_parameter_args, ) from deadline.job_attachments.exceptions import AssetSyncCancelledError -from deadline.job_attachments.models import AssetRootGroup, AssetRootManifest +from deadline.job_attachments.models import AssetRootGroup, AssetRootManifest, StorageProfile from deadline.job_attachments.progress_tracker import ProgressReportMetadata, SummaryStatistics from deadline.job_attachments.upload import S3AssetManager from deadline.job_attachments._utils import _human_readable_file_size @@ -100,7 +100,7 @@ def start_submission( self, farm_id: str, queue_id: str, - storage_profile_id: str, + storage_profile: Optional[StorageProfile], job_bundle_dir: str, queue_parameters: list[JobParameter], asset_manager: Optional[S3AssetManager], @@ -114,7 +114,7 @@ def start_submission( Args: farm_id (str): Id of the farm to submit to queue_id (str): Id of the queue to submit to - storage_profile_id (str): Id of the storage profile to associate + storage_profile (StorageProfile): the storage profile to associate with the job. job_bundle_dir (str): Path to the folder containing the job bundle to submit. @@ -126,7 +126,7 @@ def start_submission( """ self._farm_id = farm_id self._queue_id = queue_id - self._storage_profile_id = storage_profile_id + self._storage_profile = storage_profile self._job_bundle_dir = job_bundle_dir self._queue_parameters = queue_parameters self._asset_manager = asset_manager @@ -201,8 +201,8 @@ def _start_submission(self): self._create_job_args["template"] = file_contents self._create_job_args["templateType"] = file_type - if self._storage_profile_id: - self._create_job_args["storageProfileId"] = self._storage_profile_id + if self._storage_profile: + self._create_job_args["storageProfileId"] = self._storage_profile.storageProfileId # The job parameters job_bundle_parameters = read_job_bundle_parameters(self._job_bundle_dir) @@ -252,7 +252,7 @@ def _start_submission(self): input_paths=sorted(self.asset_references.input_filenames), output_paths=sorted(self.asset_references.output_directories), referenced_paths=sorted(self.asset_references.referenced_paths), - storage_profile_id=self._storage_profile_id, + storage_profile=self._storage_profile, ) # If we find any Job Attachments, start a background thread if upload_group.asset_groups: diff --git a/src/deadline/client/ui/dialogs/submit_job_to_deadline_dialog.py b/src/deadline/client/ui/dialogs/submit_job_to_deadline_dialog.py index 6ac2074d..2e1b18a8 100644 --- a/src/deadline/client/ui/dialogs/submit_job_to_deadline_dialog.py +++ b/src/deadline/client/ui/dialogs/submit_job_to_deadline_dialog.py @@ -432,6 +432,12 @@ def on_submit(self): queue_id = get_setting("defaults.queue_id") storage_profile_id = get_setting("settings.storage_profile_id") + storage_profile = None + if storage_profile_id: + storage_profile = api.get_storage_profile_for_queue( + farm_id, queue_id, storage_profile_id, deadline + ) + queue = deadline.get_queue(farmId=farm_id, queueId=queue_id) queue_role_session = api.get_queue_user_boto3_session( @@ -463,7 +469,7 @@ def on_submit(self): self.create_job_response = job_progress_dialog.start_submission( farm_id, queue_id, - storage_profile_id, + storage_profile, job_history_bundle_dir, queue_parameters, asset_manager, diff --git a/src/deadline/job_attachments/_aws/deadline.py b/src/deadline/job_attachments/_aws/deadline.py index 4c5beb36..bbcbbe10 100644 --- a/src/deadline/job_attachments/_aws/deadline.py +++ b/src/deadline/job_attachments/_aws/deadline.py @@ -10,15 +10,11 @@ from ..models import ( Attachments, JobAttachmentsFileSystem, - FileSystemLocation, - FileSystemLocationType, Job, JobAttachmentS3Settings, ManifestProperties, - StorageProfileOperatingSystemFamily, PathFormat, Queue, - StorageProfile, ) from .aws_clients import get_deadline_client @@ -110,38 +106,3 @@ def get_job( else None ), ) - - -def get_storage_profile_for_queue( - farm_id: str, - queue_id: str, - storage_profile_id: str, - session: Optional[boto3.Session] = None, - deadline_endpoint_url: Optional[str] = None, -) -> StorageProfile: - """ - Retrieves a specific storage profile for queue from AWS Deadline Cloud. - """ - try: - response = get_deadline_client( - session=session, endpoint_url=deadline_endpoint_url - ).get_storage_profile_for_queue( - farmId=farm_id, queueId=queue_id, storageProfileId=storage_profile_id - ) - except ClientError as exc: - raise JobAttachmentsError( - f'Failed to get Storage profile "{storage_profile_id}" from Deadline' - ) from exc - return StorageProfile( - storageProfileId=response["storageProfileId"], - displayName=response["displayName"], - osFamily=StorageProfileOperatingSystemFamily(response["osFamily"]), - fileSystemLocations=[ - FileSystemLocation( - name=file_system_location["name"], - path=file_system_location["path"], - type=FileSystemLocationType(file_system_location["type"]), - ) - for file_system_location in response.get("fileSystemLocations", []) - ], - ) diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index cdd853b3..b2872b22 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -40,7 +40,6 @@ get_s3_client, get_s3_transfer_manager, ) -from ._aws.deadline import get_storage_profile_for_queue from .exceptions import ( COMMON_ERROR_GUIDANCE_FOR_S3, AssetSyncCancelledError, @@ -60,6 +59,7 @@ JobAttachmentS3Settings, ManifestProperties, PathFormat, + StorageProfile, ) from .progress_tracker import ( ProgressStatus, @@ -1004,21 +1004,13 @@ def _get_total_input_size_from_asset_group( def _get_file_system_locations_by_type( self, - storage_profile_id: str, - session: Optional[boto3.Session] = None, + storage_profile_for_queue: StorageProfile, ) -> Tuple[dict, dict]: """ - Given the Storage Profile ID, fetches Storage Profile for Queue object, and - extracts and groups path and name pairs from the File System Locations into - two dicts - LOCAL and SHARED type, respectively. Returns a tuple of two dicts. + Given the Storage Profile for Queue object, extracts and groups + path and name pairs from the File System Locations into two dicts, + LOCAL and SHARED type, respectively. Returns a tuple of two dicts. """ - storage_profile_for_queue = get_storage_profile_for_queue( - farm_id=self.farm_id, - queue_id=self.queue_id, - storage_profile_id=storage_profile_id, - session=session, - ) - local_type_locations: dict[str, str] = {} shared_type_locations: dict[str, str] = {} for fs_loc in storage_profile_for_queue.fileSystemLocations: @@ -1051,18 +1043,18 @@ def _group_asset_paths( input_paths: list[str], output_paths: list[str], referenced_paths: list[str], - storage_profile_id: Optional[str] = None, + storage_profile: Optional[StorageProfile] = None, ) -> list[AssetRootGroup]: """ Resolves all of the paths that will be uploaded, sorting by storage profile location. """ local_type_locations: dict[str, str] = {} shared_type_locations: dict[str, str] = {} - if storage_profile_id: + if storage_profile: ( local_type_locations, shared_type_locations, - ) = self._get_file_system_locations_by_type(storage_profile_id) + ) = self._get_file_system_locations_by_type(storage_profile) # Group the paths by asset root, removing duplicates and empty strings asset_groups: list[AssetRootGroup] = self._get_asset_groups( @@ -1081,7 +1073,7 @@ def prepare_paths_for_upload( input_paths: list[str], output_paths: list[str], referenced_paths: list[str], - storage_profile_id: Optional[str] = None, + storage_profile: Optional[StorageProfile] = None, ) -> AssetUploadGroup: """ Processes all of the paths required for upload, grouping them by asset root and local storage profile locations. @@ -1089,7 +1081,7 @@ def prepare_paths_for_upload( for files that were not under the root path or any local storage profile locations. """ asset_groups = self._group_asset_paths( - input_paths, output_paths, referenced_paths, storage_profile_id + input_paths, output_paths, referenced_paths, storage_profile ) (input_file_count, input_bytes) = self._get_total_input_size_from_asset_group(asset_groups) num_outside_files_by_bundle_path = self._get_deviated_file_count_by_root( diff --git a/test/integ/deadline_job_attachments/test_job_attachments.py b/test/integ/deadline_job_attachments/test_job_attachments.py index 7199dbdc..503e59a5 100644 --- a/test/integ/deadline_job_attachments/test_job_attachments.py +++ b/test/integ/deadline_job_attachments/test_job_attachments.py @@ -1335,6 +1335,7 @@ def test_upload_bucket_wrong_account(external_bucket: str, job_attachment_test: with pytest.raises( JobAttachmentsS3ClientError, match=".*when calling the PutObject operation: Access Denied" ): + # The attempt to upload the asset manifest should be blocked. upload_group = asset_manager.prepare_paths_for_upload( job_bundle_path=str(job_attachment_test.ASSET_ROOT), input_paths=[str(job_attachment_test.SCENE_MA_PATH)], diff --git a/test/unit/deadline_client/api/test_job_bundle_submission.py b/test/unit/deadline_client/api/test_job_bundle_submission.py index e4c7440f..db4e1dea 100644 --- a/test/unit/deadline_client/api/test_job_bundle_submission.py +++ b/test/unit/deadline_client/api/test_job_bundle_submission.py @@ -19,9 +19,13 @@ AssetRootGroup, AssetUploadGroup, Attachments, + FileSystemLocation, + FileSystemLocationType, JobAttachmentsFileSystem, ManifestProperties, PathFormat, + StorageProfile, + StorageProfileOperatingSystemFamily, ) from deadline.job_attachments.upload import S3AssetManager from deadline.job_attachments.progress_tracker import SummaryStatistics, ProgressReportMetadata @@ -59,6 +63,34 @@ MOCK_GET_JOB_RESPONSE = {"state": "READY", "lifecycleStatusMessage": MOCK_STATUS_MESSAGE} +MOCK_GET_STORAGE_PROFILE_FOR_QUEUE_RESPONSE = { + "storageProfileId": MOCK_STORAGE_PROFILE_ID, + "displayName": "SP-linux", + "osFamily": "LINUX", + "fileSystemLocations": [ + {"name": "FSL Local", "path": "/home/username/my_bundle", "type": "LOCAL"}, + {"name": "FSL Shared", "path": "/mnt/shared/movie1", "type": "SHARED"}, + ], +} + +MOCK_STORAGE_PROFILE = StorageProfile( + storageProfileId=MOCK_STORAGE_PROFILE_ID, + displayName="SP-linux", + osFamily=StorageProfileOperatingSystemFamily.LINUX, + fileSystemLocations=[ + FileSystemLocation( + name="FSL Local", + path="/home/username/my_bundle", + type=FileSystemLocationType.LOCAL, + ), + FileSystemLocation( + name="FSL Shared", + path="/mnt/shared/movie1", + type=FileSystemLocationType.SHARED, + ), + ], +) + def get_minimal_json_job_template(job_name): return json.dumps( @@ -269,6 +301,9 @@ def test_create_job_from_job_bundle( with patch.object(api._session, "get_boto3_session") as session_mock: session_mock().client("deadline").create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE] session_mock().client("deadline").get_job.return_value = MOCK_GET_JOB_RESPONSE + session_mock().client( + "deadline" + ).get_storage_profile_for_queue.return_value = MOCK_GET_STORAGE_PROFILE_FOR_QUEUE_RESPONSE config.set_setting("defaults.farm_id", MOCK_FARM_ID) config.set_setting("defaults.queue_id", MOCK_QUEUE_ID) @@ -455,6 +490,9 @@ def test_create_job_from_job_bundle_job_attachments( client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE] client_mock().create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE] client_mock().get_job.side_effect = [MOCK_GET_JOB_RESPONSE] + client_mock().get_storage_profile_for_queue.side_effect = [ + MOCK_GET_STORAGE_PROFILE_FOR_QUEUE_RESPONSE + ] expected_upload_group = AssetUploadGroup( total_input_files=3, total_input_bytes=256, asset_groups=[AssetRootGroup()] ) @@ -522,7 +560,7 @@ def fake_print_callback(msg: str) -> None: ), output_paths=[os.path.join(temp_assets_dir, "somedir")], referenced_paths=[], - storage_profile_id=MOCK_STORAGE_PROFILE_ID, + storage_profile=MOCK_STORAGE_PROFILE, ) mock_hash_attachments.assert_called_once_with( asset_manager=ANY, @@ -574,6 +612,9 @@ def test_create_job_from_job_bundle_empty_job_attachments( client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE] client_mock().create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE] client_mock().get_job.side_effect = [MOCK_GET_JOB_RESPONSE] + client_mock().get_storage_profile_for_queue.side_effect = [ + MOCK_GET_STORAGE_PROFILE_FOR_QUEUE_RESPONSE + ] # When this function returns an empty object, we skip Job Attachments calls expected_upload_group = AssetUploadGroup() @@ -637,7 +678,7 @@ def fake_print_callback(msg: str) -> None: ), output_paths=[os.path.join(temp_assets_dir, "somedir")], referenced_paths=[], - storage_profile_id=MOCK_STORAGE_PROFILE_ID, + storage_profile=MOCK_STORAGE_PROFILE, ) mock_hash_attachments.assert_not_called() mock_upload_assets.assert_not_called() @@ -662,6 +703,9 @@ def test_create_job_from_job_bundle_with_empty_asset_references( with patch.object(api._session, "get_boto3_session") as session_mock: session_mock().client("deadline").create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE] session_mock().client("deadline").get_job.side_effect = [MOCK_GET_JOB_RESPONSE] + session_mock().client("deadline").get_storage_profile_for_queue.side_effect = [ + MOCK_GET_STORAGE_PROFILE_FOR_QUEUE_RESPONSE + ] config.set_setting("defaults.farm_id", MOCK_FARM_ID) config.set_setting("defaults.queue_id", MOCK_QUEUE_ID) @@ -730,6 +774,9 @@ def test_create_job_from_job_bundle_with_single_asset_file( client_mock().create_job.side_effect = [MOCK_CREATE_JOB_RESPONSE] client_mock().get_queue.side_effect = [MOCK_GET_QUEUE_RESPONSE] client_mock().get_job.side_effect = [MOCK_GET_JOB_RESPONSE] + client_mock().get_storage_profile_for_queue.side_effect = [ + MOCK_GET_STORAGE_PROFILE_FOR_QUEUE_RESPONSE + ] expected_upload_group = AssetUploadGroup( total_input_files=1, total_input_bytes=1, asset_groups=[AssetRootGroup()] ) @@ -797,7 +844,7 @@ def fake_print_callback(msg: str) -> None: input_paths=[os.path.join(temp_assets_dir, "asset-1.txt")], output_paths=[], referenced_paths=[], - storage_profile_id=MOCK_STORAGE_PROFILE_ID, + storage_profile=MOCK_STORAGE_PROFILE, ) mock_hash_attachments.assert_called_once_with( asset_manager=ANY, diff --git a/test/unit/deadline_client/api/test_job_bundle_submission_asset_refs.py b/test/unit/deadline_client/api/test_job_bundle_submission_asset_refs.py index f14a5f01..91b3a67c 100644 --- a/test/unit/deadline_client/api/test_job_bundle_submission_asset_refs.py +++ b/test/unit/deadline_client/api/test_job_bundle_submission_asset_refs.py @@ -380,7 +380,7 @@ def test_create_job_from_job_bundle_with_all_asset_ref_variants( input_paths=input_paths, output_paths=output_paths, referenced_paths=referenced_paths, - storage_profile_id="", + storage_profile=None, ) mock_hash_assets.assert_called_once_with( asset_groups=[AssetRootGroup()], diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index 566f11c7..d203f6f6 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -1967,21 +1967,17 @@ def test_get_file_system_locations_by_type( fileSystemLocations=mock_file_system_locations, ) - with patch( - f"{deadline.__package__}.job_attachments.upload.get_storage_profile_for_queue", - side_effect=[mock_storage_profile_for_queue], - ): - asset_manager = S3AssetManager( - farm_id=farm_id, - queue_id=queue_id, - job_attachment_settings=self.job_attachment_s3_settings, - ) + asset_manager = S3AssetManager( + farm_id=farm_id, + queue_id=queue_id, + job_attachment_settings=self.job_attachment_s3_settings, + ) - result = asset_manager._get_file_system_locations_by_type( - storage_profile_id="sp-0123456789" - ) + result = asset_manager._get_file_system_locations_by_type( + storage_profile_for_queue=mock_storage_profile_for_queue + ) - assert result == expected_result + assert result == expected_result @pytest.mark.skipif( sys.platform == "win32",