Skip to content

Commit

Permalink
fix(job_attachments)!: use correct profile for GetStorageProfileForQu…
Browse files Browse the repository at this point in the history
…eue API

Signed-off-by: Gahyun Suh <[email protected]>
  • Loading branch information
gahyusuh committed Apr 11, 2024
1 parent 70958f5 commit 619974a
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 87 deletions.
27 changes: 24 additions & 3 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@
)
from ..job_bundle.submission import AssetReferences, split_parameter_args
from ...job_attachments.models import (
FileSystemLocation,
FileSystemLocationType,
JobAttachmentsFileSystem,
AssetRootGroup,
AssetRootManifest,
JobAttachmentS3Settings,
StorageProfile,
StorageProfileOperatingSystemFamily,
)
from ...job_attachments.progress_tracker import SummaryStatistics, ProgressReportMetadata
from ...job_attachments.upload import S3AssetManager
Expand Down Expand Up @@ -71,7 +75,7 @@ def create_job_from_job_bundle(
/template.json|yaml (required): An Open Job Description job template that specifies the work to be done. Job parameters
are embedded here.
/parameter_values.yson|yaml (optional): If provided, these are parameter values for the job template and for
/parameter_values.json|yaml (optional): If provided, these are parameter values for the job template and for
the render farm. AWS Deadline Cloud-specific parameters are like "deadline:priority".
Looks like:
{
Expand Down Expand Up @@ -126,7 +130,7 @@ def create_job_from_job_bundle(
hashing_progress_callback / upload_progress_callback / create_job_result_callback (Callable -> bool):
Callbacks periodically called while hashing / uploading / waiting for job creation. If returns false,
the operation will be cancelled. If return true, the operation continues. Default behavior for each
is to not cancel the operation. hashing_progress_callback and upload_progress_callback both recieve
is to not cancel the operation. hashing_progress_callback and upload_progress_callback both receive
ProgressReport as a parameter, which can be used for projecting remaining time, as in done in the CLI.
"""

Expand Down Expand Up @@ -171,8 +175,25 @@ def create_job_from_job_bundle(
}

storage_profile_id = get_setting("settings.storage_profile_id", config=config)
storage_profile = None
if storage_profile_id:
create_job_args["storageProfileId"] = storage_profile_id
storage_profile_response = deadline.get_storage_profile_for_queue(
farmId=farm_id, queueId=queue_id, storageProfileId=storage_profile_id
)
storage_profile = StorageProfile(
storageProfileId=storage_profile_response["storageProfileId"],
displayName=storage_profile_response["displayName"],
osFamily=StorageProfileOperatingSystemFamily(storage_profile_response["osFamily"]),
fileSystemLocations=[
FileSystemLocation(
name=file_system_location["name"],
path=file_system_location["path"],
type=FileSystemLocationType(file_system_location["type"]),
)
for file_system_location in storage_profile_response.get("fileSystemLocations", [])
],
)

# The job parameters
job_bundle_parameters = read_job_bundle_parameters(job_bundle_dir)
Expand Down Expand Up @@ -233,7 +254,7 @@ def create_job_from_job_bundle(
input_paths=sorted(asset_references.input_filenames),
output_paths=sorted(asset_references.output_directories),
referenced_paths=sorted(asset_references.referenced_paths),
storage_profile_id=storage_profile_id,
storage_profile=storage_profile,
)
if upload_group.asset_groups:
if decide_cancel_submission_callback(
Expand Down
14 changes: 7 additions & 7 deletions src/deadline/client/ui/dialogs/submit_job_progress_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
split_parameter_args,
)
from deadline.job_attachments.exceptions import AssetSyncCancelledError
from deadline.job_attachments.models import AssetRootGroup, AssetRootManifest
from deadline.job_attachments.models import AssetRootGroup, AssetRootManifest, StorageProfile
from deadline.job_attachments.progress_tracker import ProgressReportMetadata, SummaryStatistics
from deadline.job_attachments.upload import S3AssetManager
from deadline.job_attachments._utils import _human_readable_file_size
Expand Down Expand Up @@ -100,7 +100,7 @@ def start_submission(
self,
farm_id: str,
queue_id: str,
storage_profile_id: str,
storage_profile: Optional[StorageProfile],
job_bundle_dir: str,
queue_parameters: list[JobParameter],
asset_manager: Optional[S3AssetManager],
Expand All @@ -114,7 +114,7 @@ def start_submission(
Args:
farm_id (str): Id of the farm to submit to
queue_id (str): Id of the queue to submit to
storage_profile_id (str): Id of the storage profile to associate
storage_profile (StorageProfile): the storage profile to associate
with the job.
job_bundle_dir (str): Path to the folder containing the job bundle to
submit.
Expand All @@ -126,7 +126,7 @@ def start_submission(
"""
self._farm_id = farm_id
self._queue_id = queue_id
self._storage_profile_id = storage_profile_id
self._storage_profile = storage_profile
self._job_bundle_dir = job_bundle_dir
self._queue_parameters = queue_parameters
self._asset_manager = asset_manager
Expand Down Expand Up @@ -201,8 +201,8 @@ def _start_submission(self):
self._create_job_args["template"] = file_contents
self._create_job_args["templateType"] = file_type

if self._storage_profile_id:
self._create_job_args["storageProfileId"] = self._storage_profile_id
if self._storage_profile:
self._create_job_args["storageProfileId"] = self._storage_profile.storageProfileId

# The job parameters
job_bundle_parameters = read_job_bundle_parameters(self._job_bundle_dir)
Expand Down Expand Up @@ -252,7 +252,7 @@ def _start_submission(self):
input_paths=sorted(self.asset_references.input_filenames),
output_paths=sorted(self.asset_references.output_directories),
referenced_paths=sorted(self.asset_references.referenced_paths),
storage_profile_id=self._storage_profile_id,
storage_profile=self._storage_profile,
)
# If we find any Job Attachments, start a background thread
if upload_group.asset_groups:
Expand Down
33 changes: 31 additions & 2 deletions src/deadline/client/ui/dialogs/submit_job_to_deadline_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
)

from deadline.client.ui.dialogs.submit_job_progress_dialog import SubmitJobProgressDialog
from deadline.job_attachments.models import JobAttachmentS3Settings
from deadline.job_attachments.models import (
FileSystemLocation,
FileSystemLocationType,
JobAttachmentS3Settings,
StorageProfile,
StorageProfileOperatingSystemFamily,
)
from deadline.job_attachments.upload import S3AssetManager

from ... import api
Expand Down Expand Up @@ -432,6 +438,29 @@ def on_submit(self):
queue_id = get_setting("defaults.queue_id")
storage_profile_id = get_setting("settings.storage_profile_id")

storage_profile = None
if storage_profile_id:
storage_profile_response = deadline.get_storage_profile_for_queue(
farmId=farm_id, queueId=queue_id, storageProfileId=storage_profile_id
)
storage_profile = StorageProfile(
storageProfileId=storage_profile_response["storageProfileId"],
displayName=storage_profile_response["displayName"],
osFamily=StorageProfileOperatingSystemFamily(
storage_profile_response["osFamily"]
),
fileSystemLocations=[
FileSystemLocation(
name=file_system_location["name"],
path=file_system_location["path"],
type=FileSystemLocationType(file_system_location["type"]),
)
for file_system_location in storage_profile_response.get(
"fileSystemLocations", []
)
],
)

queue = deadline.get_queue(farmId=farm_id, queueId=queue_id)

queue_role_session = api.get_queue_user_boto3_session(
Expand Down Expand Up @@ -463,7 +492,7 @@ def on_submit(self):
self.create_job_response = job_progress_dialog.start_submission(
farm_id,
queue_id,
storage_profile_id,
storage_profile,
job_history_bundle_dir,
queue_parameters,
asset_manager,
Expand Down
39 changes: 0 additions & 39 deletions src/deadline/job_attachments/_aws/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@
from ..models import (
Attachments,
JobAttachmentsFileSystem,
FileSystemLocation,
FileSystemLocationType,
Job,
JobAttachmentS3Settings,
ManifestProperties,
StorageProfileOperatingSystemFamily,
PathFormat,
Queue,
StorageProfile,
)
from .aws_clients import get_deadline_client

Expand Down Expand Up @@ -110,38 +106,3 @@ def get_job(
else None
),
)


def get_storage_profile_for_queue(
farm_id: str,
queue_id: str,
storage_profile_id: str,
session: Optional[boto3.Session] = None,
deadline_endpoint_url: Optional[str] = None,
) -> StorageProfile:
"""
Retrieves a specific storage profile for queue from AWS Deadline Cloud.
"""
try:
response = get_deadline_client(
session=session, endpoint_url=deadline_endpoint_url
).get_storage_profile_for_queue(
farmId=farm_id, queueId=queue_id, storageProfileId=storage_profile_id
)
except ClientError as exc:
raise JobAttachmentsError(
f'Failed to get Storage profile "{storage_profile_id}" from Deadline'
) from exc
return StorageProfile(
storageProfileId=response["storageProfileId"],
displayName=response["displayName"],
osFamily=StorageProfileOperatingSystemFamily(response["osFamily"]),
fileSystemLocations=[
FileSystemLocation(
name=file_system_location["name"],
path=file_system_location["path"],
type=FileSystemLocationType(file_system_location["type"]),
)
for file_system_location in response.get("fileSystemLocations", [])
],
)
30 changes: 12 additions & 18 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
get_s3_client,
get_s3_transfer_manager,
)
from ._aws.deadline import get_storage_profile_for_queue
from .exceptions import (
COMMON_ERROR_GUIDANCE_FOR_S3,
AssetSyncCancelledError,
Expand All @@ -60,6 +59,7 @@
JobAttachmentS3Settings,
ManifestProperties,
PathFormat,
StorageProfile,
)
from .progress_tracker import (
ProgressStatus,
Expand Down Expand Up @@ -984,21 +984,13 @@ def _get_total_input_size_from_asset_group(

def _get_file_system_locations_by_type(
self,
storage_profile_id: str,
session: Optional[boto3.Session] = None,
storage_profile_for_queue: StorageProfile,
) -> Tuple[dict, dict]:
"""
Given the Storage Profile ID, fetches Storage Profile for Queue object, and
extracts and groups path and name pairs from the File System Locations into
two dicts - LOCAL and SHARED type, respectively. Returns a tuple of two dicts.
Given the Storage Profile for Queue object, extracts and groups
path and name pairs from the File System Locations into two dicts,
LOCAL and SHARED type, respectively. Returns a tuple of two dicts.
"""
storage_profile_for_queue = get_storage_profile_for_queue(
farm_id=self.farm_id,
queue_id=self.queue_id,
storage_profile_id=storage_profile_id,
session=session,
)

local_type_locations: dict[str, str] = {}
shared_type_locations: dict[str, str] = {}
for fs_loc in storage_profile_for_queue.fileSystemLocations:
Expand Down Expand Up @@ -1031,18 +1023,20 @@ def _group_asset_paths(
input_paths: list[str],
output_paths: list[str],
referenced_paths: list[str],
storage_profile_id: Optional[str] = None,
storage_profile: Optional[StorageProfile] = None,
) -> list[AssetRootGroup]:
"""
Resolves all of the paths that will be uploaded, sorting by storage profile location.
"""
local_type_locations: dict[str, str] = {}
shared_type_locations: dict[str, str] = {}
if storage_profile_id:
if storage_profile:
(
local_type_locations,
shared_type_locations,
) = self._get_file_system_locations_by_type(storage_profile_id)
) = self._get_file_system_locations_by_type(
storage_profile
)

# Group the paths by asset root, removing duplicates and empty strings
asset_groups: list[AssetRootGroup] = self._get_asset_groups(
Expand All @@ -1061,15 +1055,15 @@ def prepare_paths_for_upload(
input_paths: list[str],
output_paths: list[str],
referenced_paths: list[str],
storage_profile_id: Optional[str] = None,
storage_profile: Optional[StorageProfile] = None,
) -> AssetUploadGroup:
"""
Processes all of the paths required for upload, grouping them by asset root and local storage profile locations.
Returns an object containing the grouped paths, which also includes a dictionary of input directories and file counts
for files that were not under the root path or any local storage profile locations.
"""
asset_groups = self._group_asset_paths(
input_paths, output_paths, referenced_paths, storage_profile_id
input_paths, output_paths, referenced_paths, storage_profile
)
(input_file_count, input_bytes) = self._get_total_input_size_from_asset_group(asset_groups)
num_outside_files_by_bundle_path = self._get_deviated_file_count_by_root(
Expand Down
3 changes: 2 additions & 1 deletion test/integ/deadline_job_attachments/test_job_attachments.py
Original file line number Diff line number Diff line change
Expand Up @@ -1333,8 +1333,9 @@ def test_upload_bucket_wrong_account(external_bucket: str, job_attachment_test:

# WHEN
with pytest.raises(
AssetSyncError, match=f"Error checking if object exists in bucket '{external_bucket}'"
AssetSyncError, match=f"Error uploading binary file in bucket '{external_bucket}'"
):
# The attempt to upload the asset manifest should be blocked.
upload_group = asset_manager.prepare_paths_for_upload(
job_bundle_path=str(job_attachment_test.ASSET_ROOT),
input_paths=[str(job_attachment_test.SCENE_MA_PATH)],
Expand Down
Loading

0 comments on commit 619974a

Please sign in to comment.