diff --git a/src/deadline/job_attachments/asset_sync.py b/src/deadline/job_attachments/asset_sync.py index d5f430fc..19abf656 100644 --- a/src/deadline/job_attachments/asset_sync.py +++ b/src/deadline/job_attachments/asset_sync.py @@ -3,6 +3,7 @@ """ Module for File Attachment synching """ from __future__ import annotations import os +import shutil import sys import time from io import BytesIO @@ -337,6 +338,19 @@ def _record_attachment_mtimes( abs_path = str(Path(local_root) / manifest_path.path) self.synced_assets_mtime[abs_path] = Path(abs_path).stat().st_mtime_ns + def _ensure_disk_capacity(self, session_dir: Path, total_input_bytes: int) -> None: + """ + Raises an AssetSyncError if the given input bytes is larger than the available disk space. + """ + disk_free: int = shutil.disk_usage(session_dir).free + if total_input_bytes > disk_free: + input_size_readable = _human_readable_file_size(total_input_bytes) + disk_free_readable = _human_readable_file_size(disk_free) + raise AssetSyncError( + "Error occurred while attempting to sync input files: " + f"Total file size required for download ({input_size_readable}) is larger than available disk space ({disk_free_readable})" + ) + def sync_inputs( self, s3_settings: Optional[JobAttachmentS3Settings], @@ -447,14 +461,16 @@ def sync_inputs( # Merge the manifests in each root into a single manifest merged_manifests_by_root: dict[str, BaseAssetManifest] = dict() + total_input_size: int = 0 for root, manifests in grouped_manifests_by_root.items(): merged_manifest = merge_asset_manifests(manifests) if merged_manifest: merged_manifests_by_root[root] = merged_manifest + total_input_size += merged_manifest.totalSize # type: ignore[attr-defined] # Download - + # Virtual Download Flow if ( attachments.fileSystem == JobAttachmentsFileSystem.VIRTUAL.value and sys.platform != "win32" @@ -482,6 +498,8 @@ def sync_inputs( f"Virtual File System not found, falling back to {JobAttachmentsFileSystem.COPIED} for JobAttachmentsFileSystem." ) + # Copied Download flow + self._ensure_disk_capacity(session_dir, total_input_size) try: download_summary_statistics = download_files_from_manifests( s3_bucket=s3_settings.s3BucketName, diff --git a/test/unit/deadline_job_attachments/conftest.py b/test/unit/deadline_job_attachments/conftest.py index de7e5d20..2b3ecc11 100644 --- a/test/unit/deadline_job_attachments/conftest.py +++ b/test/unit/deadline_job_attachments/conftest.py @@ -435,6 +435,29 @@ def fixture_merged_manifest(): } +@pytest.fixture(name="really_big_manifest") +def fixture_really_big_manifest(): + return { + "hashAlg": "xxh128", + "manifestVersion": "2023-03-03", + "paths": [ + { + "hash": "a20ddfc33590cd7d2391f1972f66a72a", + "mtime": 4444444444444444, + "path": "a.txt", + "size": 100000000000000000, # 100 Petabytes + }, + { + "hash": "b96ddfc33590cd7d2391f1972f66a72a", + "mtime": 2222222222222222, + "path": "b.txt", + "size": 200000000000000000, # 200 Petabytes + }, + ], + "totalSize": 300000000000000000, + } + + def has_posix_target_user() -> bool: """Returns if the testing environment exported the env variables for doing cross-account posix target-user tests. diff --git a/test/unit/deadline_job_attachments/test_asset_sync.py b/test/unit/deadline_job_attachments/test_asset_sync.py index eed8e70c..7a2ab0d0 100644 --- a/test/unit/deadline_job_attachments/test_asset_sync.py +++ b/test/unit/deadline_job_attachments/test_asset_sync.py @@ -21,6 +21,7 @@ from deadline.job_attachments.os_file_permission import PosixFileSystemPermissionSettings from deadline.job_attachments.exceptions import ( + AssetSyncError, VFSExecutableMissingError, JobAttachmentsS3ClientError, VFSOSUserNotSetError, @@ -276,6 +277,7 @@ def test_sync_inputs_404_error( default_queue: Queue, job_fixture_name: str, s3_settings_fixture_name: str, + test_manifest_one: dict, request: pytest.FixtureRequest, ): """Asserts that a specific error message is raised when getting 404 errors synching inputs""" @@ -288,6 +290,7 @@ def test_sync_inputs_404_error( message="File not found", ) job: Job = request.getfixturevalue(job_fixture_name) + test_manifest = decode_manifest(json.dumps(test_manifest_one)) s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) default_queue.jobAttachmentSettings = s3_settings dest_dir = "assetroot-27bggh78dd2b568ab123" @@ -296,7 +299,7 @@ def test_sync_inputs_404_error( # WHEN with patch( f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", - return_value="test_manifest_data", + return_value=test_manifest, ), patch( f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", side_effect=[dest_dir], @@ -412,6 +415,15 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix( session_dir = str(tmp_path) dest_dir = "assetroot-27bggh78dd2b568ab123" local_root = str(Path(session_dir) / dest_dir) + test_fs_permission_settings: PosixFileSystemPermissionSettings = ( + PosixFileSystemPermissionSettings( + os_user="test-user", + os_group="test-group", + dir_mode=0o20, + file_mode=0o20, + ) + ) + os_env_vars: Dict[str, str] = {"AWS_PROFILE": "test-profile"} assert job.attachments test_manifest = decode_manifest(json.dumps(test_manifest_two)) @@ -432,10 +444,16 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix( ), patch( f"{deadline.__package__}.job_attachments.asset_sync.merge_asset_manifests", ) as merge_manifests_mock, patch( + f"{deadline.__package__}.job_attachments.asset_sync.AssetSync._ensure_disk_capacity", + ) as disk_capacity_mock, patch( f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file", return_value="tmp_manifest", ), patch( "sys.platform", "linux" + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.mount_vfs_from_manifests" + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.VFSProcessManager.find_vfs" ): mock_on_downloading_files = MagicMock(return_value=True) @@ -447,10 +465,13 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix( tmp_path, step_dependencies=["step-1"], on_downloading_files=mock_on_downloading_files, + fs_permission_settings=test_fs_permission_settings, + os_env_vars=os_env_vars, ) # THEN merge_manifests_mock.assert_called() + disk_capacity_mock.assert_not_called() expected_source_path_format = ( "windows" if job.attachments.manifests[0].rootPathFormat == PathFormat.WINDOWS @@ -465,6 +486,78 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix( }, ] + @pytest.mark.parametrize( + ("job_fixture_name"), + [ + ("default_job"), + ], + ) + @pytest.mark.parametrize( + ("s3_settings_fixture_name"), + [ + ("default_job_attachment_s3_settings"), + ], + ) + def test_sync_inputs_no_space_left( + self, + tmp_path: Path, + default_queue: Queue, + job_fixture_name: str, + s3_settings_fixture_name: str, + really_big_manifest: dict, + request: pytest.FixtureRequest, + ): + """Asserts that an AssetSyncError is thrown if there is not enough space left on the device to download all inputs.""" + # GIVEN + job: Job = request.getfixturevalue(job_fixture_name) + s3_settings: JobAttachmentS3Settings = request.getfixturevalue(s3_settings_fixture_name) + default_queue.jobAttachmentSettings = s3_settings + dest_dir = "assetroot-27bggh78dd2b568ab123" + test_manifest = decode_manifest(json.dumps(really_big_manifest)) + test_fs_permission_settings: PosixFileSystemPermissionSettings = ( + PosixFileSystemPermissionSettings( + os_user="test-user", + os_group="test-group", + dir_mode=0o20, + file_mode=0o20, + ) + ) + os_env_vars: Dict[str, str] = {"AWS_PROFILE": "test-profile"} + assert job.attachments + + # WHEN + with patch( + f"{deadline.__package__}.job_attachments.asset_sync.get_manifest_from_s3", + return_value=test_manifest, + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync.download_files_from_manifests", + side_effect=[DownloadSummaryStatistics()], + ), patch( + f"{deadline.__package__}.job_attachments.asset_sync._get_unique_dest_dir_name", + side_effect=[dest_dir], + ), patch.object( + Path, "stat", MagicMock(st_mtime_ns=1234512345123451) + ): + mock_on_downloading_files = MagicMock(return_value=True) + + with pytest.raises(AssetSyncError) as ase: + self.default_asset_sync.sync_inputs( + s3_settings, + job.attachments, + default_queue.queueId, + job.jobId, + tmp_path, + on_downloading_files=mock_on_downloading_files, + fs_permission_settings=test_fs_permission_settings, + os_env_vars=os_env_vars, + ) + + # THEN + assert ( + "Total file size required for download (300.0 PB) is larger than available disk space" + in str(ase) + ) + @mock_sts @pytest.mark.parametrize( (