From 7b5752acd920aebdd77430dfb85eb34f439ee946 Mon Sep 17 00:00:00 2001 From: Caden Marofke <132690522+marofke@users.noreply.github.com> Date: Fri, 5 Apr 2024 17:45:52 -0500 Subject: [PATCH] fix: Write job attachments manifests locally when submitting Signed-off by: Caden Marofke <132690522+marofke@users.noreply.github.com> --- src/deadline/client/api/_submit_job_bundle.py | 12 ++++++-- .../ui/dialogs/submit_job_progress_dialog.py | 1 + src/deadline/job_attachments/README.md | 2 ++ src/deadline/job_attachments/upload.py | 28 +++++++++++++------ .../deadline_job_attachments/test_upload.py | 9 ++++++ 5 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/deadline/client/api/_submit_job_bundle.py b/src/deadline/client/api/_submit_job_bundle.py index 63588e547..98ab3b518 100644 --- a/src/deadline/client/api/_submit_job_bundle.py +++ b/src/deadline/client/api/_submit_job_bundle.py @@ -18,7 +18,7 @@ from .. import api from ..exceptions import DeadlineOperationError, CreateJobWaiterCanceled from ..config import get_setting, set_setting, config_file -from ..job_bundle import deadline_yaml_dump +from ..job_bundle import create_job_history_bundle_dir, deadline_yaml_dump from ..job_bundle.loader import ( read_yaml_or_json, read_yaml_or_json_object, @@ -254,7 +254,10 @@ def create_job_from_job_bundle( ) attachment_settings = _upload_attachments( - asset_manager, asset_manifests, print_function_callback, upload_progress_callback + asset_manager, + asset_manifests, + print_function_callback, + upload_progress_callback, ) attachment_settings["fileSystem"] = JobAttachmentsFileSystem( job_attachments_file_system @@ -418,10 +421,15 @@ def _default_update_upload_progress(upload_metadata: Dict[str, str]) -> bool: if not upload_progress_callback: upload_progress_callback = _default_update_upload_progress + # Create job history dir for the manifest file(s) + manifest_write_dir = os.path.join( + create_job_history_bundle_dir("JobBundle", "CLIJob"), "manifests" + ) upload_summary, attachment_settings = asset_manager.upload_assets( manifests=manifests, on_uploading_assets=upload_progress_callback, s3_check_cache_dir=config_file.get_cache_directory(), + manifest_write_dir=manifest_write_dir, ) api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary( upload_summary diff --git a/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py b/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py index 592886fa7..e8275848d 100644 --- a/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py +++ b/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py @@ -339,6 +339,7 @@ def _update_upload_progress(upload_metadata: ProgressReportMetadata) -> bool: manifests=manifests, on_uploading_assets=_update_upload_progress, s3_check_cache_dir=config_file.get_cache_directory(), + manifest_write_dir=os.path.join(self._job_bundle_dir, "manifests"), ) logger.info("Finished uploading job attachments files.") diff --git a/src/deadline/job_attachments/README.md b/src/deadline/job_attachments/README.md index 021cd1ea4..79df7a563 100644 --- a/src/deadline/job_attachments/README.md +++ b/src/deadline/job_attachments/README.md @@ -41,6 +41,8 @@ These snapshots are encapsulated in one or more [`asset_manifests`](asset_manife When starting work, the worker downloads the manifest associated with your job, and recreates the file structure of your submission locally, either downloading all files at once, or as needed if using the [virtual][vfs] job attachments filesystem type. When a task completes, the worker creates a new manifest for any outputs that were specified in the job submission, and uploads the manifest and the outputs back to your S3 bucket. +Manifest files are written locally in the job history directory for debugging purposes (default: `~/.dealine/job_history`). + [vfs]: https://docs.aws.amazon.com/deadline-cloud/latest/userguide/storage-virtual.html ## Local Cache Files diff --git a/src/deadline/job_attachments/upload.py b/src/deadline/job_attachments/upload.py index 734d14ef8..aa4c122ef 100644 --- a/src/deadline/job_attachments/upload.py +++ b/src/deadline/job_attachments/upload.py @@ -144,6 +144,7 @@ def upload_assets( file_system_location_name: Optional[str] = None, progress_tracker: Optional[ProgressTracker] = None, s3_check_cache_dir: Optional[str] = None, + manifest_write_dir: Optional[str] = None, ) -> tuple[str, str]: """ Uploads assets based off of an asset manifest, uploads the asset manifest. @@ -160,15 +161,6 @@ def upload_assets( Returns: A tuple of (the partial key for the manifest on S3, the hash of input manifest). """ - # Upload assets - self.upload_input_files( - manifest, - job_attachment_settings.s3BucketName, - source_root, - job_attachment_settings.full_cas_prefix(), - progress_tracker, - s3_check_cache_dir, - ) # Upload asset manifest hash_alg = manifest.get_default_hash_alg() @@ -178,6 +170,12 @@ def upload_assets( ) manifest_name = f"{manifest_name_prefix}_input" + if manifest_write_dir: + manifest_dir = os.path.join(manifest_write_dir, manifest_name) + os.makedirs(manifest_write_dir, exist_ok=True) + with open(manifest_dir, "w") as file: + file.write(manifest.encode()) + if partial_manifest_prefix: partial_manifest_key = _join_s3_paths(partial_manifest_prefix, manifest_name) else: @@ -193,6 +191,16 @@ def upload_assets( key=full_manifest_key, ) + # Upload assets + self.upload_input_files( + manifest, + job_attachment_settings.s3BucketName, + source_root, + job_attachment_settings.full_cas_prefix(), + progress_tracker, + s3_check_cache_dir, + ) + return (partial_manifest_key, hash_data(manifest_bytes, hash_alg)) def upload_input_files( @@ -1111,6 +1119,7 @@ def upload_assets( manifests: list[AssetRootManifest], on_uploading_assets: Optional[Callable[[Any], bool]] = None, s3_check_cache_dir: Optional[str] = None, + manifest_write_dir: Optional[str] = None, ) -> tuple[SummaryStatistics, Attachments]: """ Uploads all the files for provided manifests and manifests themselves to S3. @@ -1161,6 +1170,7 @@ def upload_assets( file_system_location_name=asset_root_manifest.file_system_location_name, progress_tracker=progress_tracker, s3_check_cache_dir=s3_check_cache_dir, + manifest_write_dir=manifest_write_dir, ) manifest_properties.inputManifestPath = partial_manifest_key manifest_properties.inputManifestHash = asset_manifest_hash diff --git a/test/unit/deadline_job_attachments/test_upload.py b/test/unit/deadline_job_attachments/test_upload.py index 89d488367..33f34d815 100644 --- a/test/unit/deadline_job_attachments/test_upload.py +++ b/test/unit/deadline_job_attachments/test_upload.py @@ -127,6 +127,11 @@ def test_asset_management( output_dir1 = tmpdir.join("outputs") output_dir2 = tmpdir.join("outputs").join("textures") + history_dir = tmpdir.join("history") + expected_manifest_dir = history_dir.join("e_input") + assert not os.path.exists(history_dir) + assert not os.path.exists(expected_manifest_dir) + expected_total_input_bytes = ( scene_file.size() + texture_file.size() + normal_file.size() + meta_file.size() ) @@ -196,6 +201,7 @@ def test_asset_management( manifests=asset_root_manifests, on_uploading_assets=mock_on_uploading_assets, s3_check_cache_dir=str(cache_dir), + manifest_write_dir=str(history_dir), ) # Then @@ -236,6 +242,9 @@ def test_asset_management( assert f"assetRoot/Manifests/{farm_id}/{queue_id}/Inputs/0000/e_input" in caplog.text + # Ensure we wrote our manifest file locally + assert os.path.exists(expected_manifest_dir) + assert_progress_report_last_callback( num_input_files=4, expected_total_input_bytes=expected_total_input_bytes,