From 9c78e13738abf5e6c4853bc415784ecf6dec0531 Mon Sep 17 00:00:00 2001 From: Brian Axelson Date: Sun, 24 Mar 2024 15:00:54 +0000 Subject: [PATCH 1/5] fix: Adding group owner permissions to asset_cache/cas_prefix. Writing merged manifests to new merged_manifests subfolder under session folder rather than /tmp. Using new VFSProcessManager.is_mount rather than os.path.ismount to propery check for mounts owned by job-user Signed-off-by: Brian Axelson --- src/deadline/job_attachments/asset_sync.py | 3 +- src/deadline/job_attachments/download.py | 56 +++++-- .../deadline_job_attachments/test_download.py | 139 ++++++++++++++++++ 3 files changed, 184 insertions(+), 14 deletions(-) diff --git a/src/deadline/job_attachments/asset_sync.py b/src/deadline/job_attachments/asset_sync.py index 7044a8ee..13c4884f 100644 --- a/src/deadline/job_attachments/asset_sync.py +++ b/src/deadline/job_attachments/asset_sync.py @@ -437,8 +437,7 @@ def sync_inputs( manifests_by_root=merged_manifests_by_root, boto3_session=self.session, session_dir=session_dir, - os_user=fs_permission_settings.os_user, # type: ignore[union-attr] - os_group=getattr(fs_permission_settings, "os_group", ""), + fs_permission_settings=fs_permission_settings, os_env_vars=os_env_vars, # type: ignore[arg-type] cas_prefix=s3_settings.full_cas_prefix(), ) diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index dc4f2aef..08a4a5ff 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -66,6 +66,14 @@ S3_DOWNLOAD_MAX_CONCURRENCY = 10 VFS_CACHE_REL_PATH_IN_SESSION = ".vfs_object_cache" +VFS_MERGED_MANIFEST_FOLDER_IN_SESSION = "merged_manifests" + +VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS = PosixFileSystemPermissionSettings( + os_user="", + os_group="", + dir_mode=0o31, + file_mode=0o64, +) def get_manifest_from_s3( @@ -878,14 +886,26 @@ def merge_asset_manifests(manifests: list[BaseAssetManifest]) -> BaseAssetManife return output_manifest -def write_manifest_to_temp_file(manifest: BaseAssetManifest) -> str: +def write_manifest_to_temp_file(manifest: BaseAssetManifest, dir: Path) -> str: with NamedTemporaryFile( - suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w" + suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w", dir=dir ) as file: file.write(manifest.encode()) return file.name +def decode_manifest_file(input_manifest_path: Path): + """ + Given a manifest path, open the file at that location and decode + Args: + input_manifest_path: Path to manifest + Returns: + BaseAssetManifest : Single decoded manifest + """ + with open(input_manifest_path) as input_manifest_file: + return decode_manifest(input_manifest_file.read()) + + def handle_existing_vfs( manifest: BaseAssetManifest, session_dir: Path, mount_point: str, os_user: str ) -> BaseAssetManifest: @@ -901,15 +921,14 @@ def handle_existing_vfs( Returns: BaseAssetManifest : A single manifest containing the merged paths or the original manifest """ - if not os.path.ismount(mount_point): + if not VFSProcessManager.is_mount(mount_point): return manifest input_manifest_path: Optional[Path] = VFSProcessManager.get_manifest_path_for_mount( session_dir=session_dir, mount_point=mount_point ) if input_manifest_path is not None: - with open(input_manifest_path) as input_manifest_file: - input_manifest: BaseAssetManifest = decode_manifest(input_manifest_file.read()) + input_manifest = decode_manifest_file(input_manifest_path) merged_input_manifest: Optional[BaseAssetManifest] = merge_asset_manifests( [input_manifest, manifest] @@ -931,9 +950,8 @@ def mount_vfs_from_manifests( manifests_by_root: dict[str, BaseAssetManifest], boto3_session: boto3.Session, session_dir: Path, - os_user: str, - os_group: str, os_env_vars: dict[str, str], + fs_permission_settings: FileSystemPermissionSettings, cas_prefix: Optional[str] = None, ) -> None: """ @@ -952,7 +970,8 @@ def mount_vfs_from_manifests( Returns: None """ - + if not isinstance(fs_permission_settings, PosixFileSystemPermissionSettings): + raise TypeError("VFS can only be mounted from manifests on posix file systems.") vfs_cache_dir: Path = session_dir / VFS_CACHE_REL_PATH_IN_SESSION asset_cache_hash_path: Path = vfs_cache_dir if cas_prefix is not None: @@ -961,26 +980,39 @@ def mount_vfs_from_manifests( asset_cache_hash_path.mkdir(parents=True, exist_ok=True) + _set_fs_group([str(asset_cache_hash_path)], str(vfs_cache_dir), fs_permission_settings) + + manifest_dir: Path = session_dir / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION + manifest_dir.mkdir(parents=True, exist_ok=True) + manifest_dir_permissions = VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS + manifest_dir_permissions.os_user = fs_permission_settings.os_user + manifest_dir_permissions.os_group = fs_permission_settings.os_group + + _set_fs_group([str(manifest_dir)], str(manifest_dir), manifest_dir_permissions) + for mount_point, manifest in manifests_by_root.items(): # Validate the file paths to see if they are under the given download directory. _ensure_paths_within_directory( mount_point, [path.path for path in manifest.paths] # type: ignore ) final_manifest: BaseAssetManifest = handle_existing_vfs( - manifest=manifest, session_dir=session_dir, mount_point=mount_point, os_user=os_user + manifest=manifest, + session_dir=session_dir, + mount_point=mount_point, + os_user=fs_permission_settings.os_user, ) # Write out a temporary file with the contents of the newly merged manifest - manifest_path: str = write_manifest_to_temp_file(final_manifest) + manifest_path: str = write_manifest_to_temp_file(final_manifest, dir=manifest_dir) vfs_manager: VFSProcessManager = VFSProcessManager( s3_bucket, boto3_session.region_name, manifest_path, mount_point, - os_user, + fs_permission_settings.os_user, os_env_vars, - os_group, + getattr(fs_permission_settings, "os_group", ""), cas_prefix, str(vfs_cache_dir), ) diff --git a/test/unit/deadline_job_attachments/test_download.py b/test/unit/deadline_job_attachments/test_download.py index 0cf9a6ae..6da8d715 100644 --- a/test/unit/deadline_job_attachments/test_download.py +++ b/test/unit/deadline_job_attachments/test_download.py @@ -12,6 +12,7 @@ import json from pathlib import Path import sys +import tempfile from typing import Any, Callable, List from unittest.mock import MagicMock, call, patch @@ -41,10 +42,15 @@ get_job_input_paths_by_asset_root, get_job_output_paths_by_asset_root, get_manifest_from_s3, + handle_existing_vfs, + mount_vfs_from_manifests, merge_asset_manifests, _ensure_paths_within_directory, _get_asset_root_from_s3, _get_tasks_manifests_keys_from_s3, + VFS_CACHE_REL_PATH_IN_SESSION, + VFS_MERGED_MANIFEST_FOLDER_IN_SESSION, + VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS, ) from deadline.job_attachments.exceptions import ( AssetSyncError, @@ -2254,3 +2260,136 @@ def download_file(*args): ) assert sorted(downloaded_files) == ["a.txt", "b.txt", "c.txt", "d.txt"] + + +def test_handle_existing_vfs_no_mount_returns(test_manifest_one: dict): + """ + Test that handling an existing manifest for a non existent mount returns the manifest + """ + manifest = decode_manifest(json.dumps(test_manifest_one)) + with patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount", + return_value=False, + ) as mock_is_mount: + result_manifest = handle_existing_vfs( + manifest, Path("/some/session/dir"), "/not/a/mount", "test-user" + ) + mock_is_mount.assert_called_once_with("/not/a/mount") + assert manifest == result_manifest + + +def test_handle_existing_vfs_success( + test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict +): + """ + Test that handling an existing manifest for a mount which exists attempts to merge the manifests and + shut down the mount + """ + manifest_one = decode_manifest(json.dumps(test_manifest_one)) + manifest_two = decode_manifest(json.dumps(test_manifest_two)) + merged_decoded = decode_manifest(json.dumps(merged_manifest)) + session_path = Path("/some/session/dir") + with patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount", + return_value=True, + ) as mock_is_mount, patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.get_manifest_path_for_mount", + return_value="/some/manifest/path", + ) as mock_get_manifest_path, patch( + f"{deadline.__package__}.job_attachments.download.decode_manifest_file", + return_value=manifest_one, + ) as mock_decode_manifest, patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.kill_process_at_mount", + ) as mock_kill_process: + result_manifest = handle_existing_vfs( + manifest_two, session_path, "/some/mount", "test-user" + ) + mock_is_mount.assert_called_once_with("/some/mount") + mock_get_manifest_path.assert_called_once_with( + session_dir=session_path, mount_point="/some/mount" + ) + mock_decode_manifest.assert_called_once_with("/some/manifest/path") + mock_kill_process.assert_called_once_with( + session_dir=session_path, mount_point="/some/mount", os_user="test-user" + ) + assert result_manifest == merged_decoded + + +def test_mount_vfs_from_manifests( + test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict +): + """ + Test that handling an existing manifest for a mount which exists attempts to merge the manifests and + shut down the mount + """ + manifest_one = decode_manifest(json.dumps(test_manifest_one)) + manifest_two = decode_manifest(json.dumps(test_manifest_two)) + merged_decoded = decode_manifest(json.dumps(merged_manifest)) + temp_dir = tempfile.TemporaryDirectory() + temp_dir_path = Path(temp_dir.name) + manifests_by_root = {"/some/root/one": manifest_one, "/some/root/two": manifest_two} + fs_permissions = PosixFileSystemPermissionSettings("test-user", "test-group", 0o31, 0o66) + manifest_permissions = PosixFileSystemPermissionSettings( + fs_permissions.os_user, + fs_permissions.os_group, + VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.dir_mode, + VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.file_mode, + ) + + cache_path = temp_dir_path / VFS_CACHE_REL_PATH_IN_SESSION + manifest_path = temp_dir_path / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION + with patch( + f"{deadline.__package__}.job_attachments.download._set_fs_group", + ) as mock_set_vs_group, patch( + f"{deadline.__package__}.job_attachments.download.handle_existing_vfs", + return_value=merged_decoded, + ) as mock_handle_existing, patch( + f"{deadline.__package__}.job_attachments.download.write_manifest_to_temp_file", + ) as mock_write_manifest, patch( + f"{deadline.__package__}.job_attachments.download.VFSProcessManager.start", + ) as mock_vfs_start: + mount_vfs_from_manifests( + "test-bucket", + manifests_by_root, + boto3_session=boto3.Session(region_name="us-west-2"), + session_dir=temp_dir_path, + os_env_vars={}, + fs_permission_settings=fs_permissions, + cas_prefix="cas/test", + ) + # Were the cache and manifest folders created + assert os.path.isdir(cache_path) + assert os.path.isdir(manifest_path) + + # + # Did we attempt to assign the expected permissions + mock_set_vs_group.assert_has_calls( + [ + call([str(cache_path / "cas/test")], str(cache_path), fs_permissions), + call([str(manifest_path)], str(manifest_path), manifest_permissions), + ] + ) + + mock_handle_existing.assert_has_calls( + [ + call( + manifest=manifest_one, + session_dir=temp_dir_path, + mount_point="/some/root/one", + os_user="test-user", + ), + call( + manifest=manifest_two, + session_dir=temp_dir_path, + mount_point="/some/root/two", + os_user="test-user", + ), + ] + ) + + mock_write_manifest.assert_has_calls( + [call(merged_decoded, dir=manifest_path), call(merged_decoded, dir=manifest_path)] + ) + mock_vfs_start.assert_has_calls( + [call(session_dir=temp_dir_path), call(session_dir=temp_dir_path)] + ) From 1dbceb0c6d68c69791c0b41842dd3685829fe8d6 Mon Sep 17 00:00:00 2001 From: Brian Axelson Date: Sun, 24 Mar 2024 15:57:45 +0000 Subject: [PATCH 2/5] fix: Checking for Posix file system permissions Signed-off-by: Brian Axelson --- src/deadline/job_attachments/asset_sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/deadline/job_attachments/asset_sync.py b/src/deadline/job_attachments/asset_sync.py index 13c4884f..32ccbda0 100644 --- a/src/deadline/job_attachments/asset_sync.py +++ b/src/deadline/job_attachments/asset_sync.py @@ -55,7 +55,7 @@ PathFormat, ) from .upload import S3AssetUploader -from .os_file_permission import FileSystemPermissionSettings +from .os_file_permission import FileSystemPermissionSettings, PosixFileSystemPermissionSettings from ._utils import ( _float_to_iso_datetime_string, _get_unique_dest_dir_name, @@ -429,6 +429,7 @@ def sync_inputs( and fs_permission_settings is not None and os_env_vars is not None and "AWS_PROFILE" in os_env_vars + and isinstance(fs_permission_settings, PosixFileSystemPermissionSettings) ): try: VFSProcessManager.find_vfs() From 7587233dc7a012e866f4a7267ec3a15d3c2035ab Mon Sep 17 00:00:00 2001 From: Brian Axelson Date: Sun, 24 Mar 2024 16:07:00 +0000 Subject: [PATCH 3/5] fix: linting Signed-off-by: Brian Axelson --- src/deadline/job_attachments/asset_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/deadline/job_attachments/asset_sync.py b/src/deadline/job_attachments/asset_sync.py index 32ccbda0..17a210bb 100644 --- a/src/deadline/job_attachments/asset_sync.py +++ b/src/deadline/job_attachments/asset_sync.py @@ -438,7 +438,7 @@ def sync_inputs( manifests_by_root=merged_manifests_by_root, boto3_session=self.session, session_dir=session_dir, - fs_permission_settings=fs_permission_settings, + fs_permission_settings=fs_permission_settings, # type: ignore[arg-type] os_env_vars=os_env_vars, # type: ignore[arg-type] cas_prefix=s3_settings.full_cas_prefix(), ) From 8151334a83289c503e478862374d08ca03653c1c Mon Sep 17 00:00:00 2001 From: Brian Axelson Date: Sun, 24 Mar 2024 15:00:54 +0000 Subject: [PATCH 4/5] fix: Adding group owner permissions to asset_cache/cas_prefix. Writing merged manifests to new merged_manifests subfolder under session folder rather than /tmp. Using new VFSProcessManager.is_mount rather than os.path.ismount to propery check for mounts owned by job-user Signed-off-by: Brian Axelson --- test/unit/deadline_job_attachments/test_download.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/unit/deadline_job_attachments/test_download.py b/test/unit/deadline_job_attachments/test_download.py index 6da8d715..f046f759 100644 --- a/test/unit/deadline_job_attachments/test_download.py +++ b/test/unit/deadline_job_attachments/test_download.py @@ -2315,6 +2315,10 @@ def test_handle_existing_vfs_success( assert result_manifest == merged_decoded +@pytest.mark.skipif( + sys.platform == "win32", + reason="This VFS test is currently not valid for windows - VFS is a linux only feature currently.", +) def test_mount_vfs_from_manifests( test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict ): From 85da52a6c46ce03d56b898e4e110b4e1b62ec57b Mon Sep 17 00:00:00 2001 From: Brian Axelson Date: Mon, 25 Mar 2024 01:06:35 +0000 Subject: [PATCH 5/5] fix: CR Feedback Signed-off-by: Brian Axelson --- src/deadline/job_attachments/download.py | 10 +++++----- test/unit/deadline_job_attachments/test_asset_sync.py | 2 +- test/unit/deadline_job_attachments/test_download.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index 08a4a5ff..be51c8e8 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -66,7 +66,7 @@ S3_DOWNLOAD_MAX_CONCURRENCY = 10 VFS_CACHE_REL_PATH_IN_SESSION = ".vfs_object_cache" -VFS_MERGED_MANIFEST_FOLDER_IN_SESSION = "merged_manifests" +VFS_MERGED_MANIFEST_FOLDER_IN_SESSION = ".vfs_manifests" VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS = PosixFileSystemPermissionSettings( os_user="", @@ -886,7 +886,7 @@ def merge_asset_manifests(manifests: list[BaseAssetManifest]) -> BaseAssetManife return output_manifest -def write_manifest_to_temp_file(manifest: BaseAssetManifest, dir: Path) -> str: +def _write_manifest_to_temp_file(manifest: BaseAssetManifest, dir: Path) -> str: with NamedTemporaryFile( suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w", dir=dir ) as file: @@ -894,7 +894,7 @@ def write_manifest_to_temp_file(manifest: BaseAssetManifest, dir: Path) -> str: return file.name -def decode_manifest_file(input_manifest_path: Path): +def _read_manifest_file(input_manifest_path: Path): """ Given a manifest path, open the file at that location and decode Args: @@ -928,7 +928,7 @@ def handle_existing_vfs( session_dir=session_dir, mount_point=mount_point ) if input_manifest_path is not None: - input_manifest = decode_manifest_file(input_manifest_path) + input_manifest = _read_manifest_file(input_manifest_path) merged_input_manifest: Optional[BaseAssetManifest] = merge_asset_manifests( [input_manifest, manifest] @@ -1003,7 +1003,7 @@ def mount_vfs_from_manifests( ) # Write out a temporary file with the contents of the newly merged manifest - manifest_path: str = write_manifest_to_temp_file(final_manifest, dir=manifest_dir) + manifest_path: str = _write_manifest_to_temp_file(final_manifest, dir=manifest_dir) vfs_manager: VFSProcessManager = VFSProcessManager( s3_bucket, diff --git a/test/unit/deadline_job_attachments/test_asset_sync.py b/test/unit/deadline_job_attachments/test_asset_sync.py index fbe512a7..a508d1bc 100644 --- a/test/unit/deadline_job_attachments/test_asset_sync.py +++ b/test/unit/deadline_job_attachments/test_asset_sync.py @@ -430,7 +430,7 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix( ), patch( f"{deadline.__package__}.job_attachments.asset_sync.merge_asset_manifests", ) as merge_manifests_mock, patch( - f"{deadline.__package__}.job_attachments.download.write_manifest_to_temp_file", + f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file", return_value="tmp_manifest", ), patch( "sys.platform", "linux" diff --git a/test/unit/deadline_job_attachments/test_download.py b/test/unit/deadline_job_attachments/test_download.py index f046f759..da3406ae 100644 --- a/test/unit/deadline_job_attachments/test_download.py +++ b/test/unit/deadline_job_attachments/test_download.py @@ -2296,7 +2296,7 @@ def test_handle_existing_vfs_success( f"{deadline.__package__}.job_attachments.download.VFSProcessManager.get_manifest_path_for_mount", return_value="/some/manifest/path", ) as mock_get_manifest_path, patch( - f"{deadline.__package__}.job_attachments.download.decode_manifest_file", + f"{deadline.__package__}.job_attachments.download._read_manifest_file", return_value=manifest_one, ) as mock_decode_manifest, patch( f"{deadline.__package__}.job_attachments.download.VFSProcessManager.kill_process_at_mount", @@ -2348,7 +2348,7 @@ def test_mount_vfs_from_manifests( f"{deadline.__package__}.job_attachments.download.handle_existing_vfs", return_value=merged_decoded, ) as mock_handle_existing, patch( - f"{deadline.__package__}.job_attachments.download.write_manifest_to_temp_file", + f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file", ) as mock_write_manifest, patch( f"{deadline.__package__}.job_attachments.download.VFSProcessManager.start", ) as mock_vfs_start: