Skip to content

Commit

Permalink
fix: VFS Disk Cache Group Permissions, Merged Manifests Folder, is_mo…
Browse files Browse the repository at this point in the history
…unt checks (#235)

* fix: Adding group owner permissions to asset_cache/cas_prefix.  Writing merged manifests to new merged_manifests subfolder under session folder rather than /tmp.  Using new VFSProcessManager.is_mount rather than os.path.ismount to propery check for mounts owned by job-user

Signed-off-by: Brian Axelson <[email protected]>

* fix: Checking for Posix file system permissions

Signed-off-by: Brian Axelson <[email protected]>

* fix: linting

Signed-off-by: Brian Axelson <[email protected]>

* fix: Adding group owner permissions to asset_cache/cas_prefix.  Writing merged manifests to new merged_manifests subfolder under session folder rather than /tmp.  Using new VFSProcessManager.is_mount rather than os.path.ismount to propery check for mounts owned by job-user

Signed-off-by: Brian Axelson <[email protected]>

* fix: CR Feedback

Signed-off-by: Brian Axelson <[email protected]>

---------

Signed-off-by: Brian Axelson <[email protected]>
  • Loading branch information
baxeaz authored Mar 25, 2024
1 parent 3c3a4fa commit 30dac3d
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 16 deletions.
6 changes: 3 additions & 3 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
PathFormat,
)
from .upload import S3AssetUploader
from .os_file_permission import FileSystemPermissionSettings
from .os_file_permission import FileSystemPermissionSettings, PosixFileSystemPermissionSettings
from ._utils import (
_float_to_iso_datetime_string,
_get_unique_dest_dir_name,
Expand Down Expand Up @@ -461,6 +461,7 @@ def sync_inputs(
and fs_permission_settings is not None
and os_env_vars is not None
and "AWS_PROFILE" in os_env_vars
and isinstance(fs_permission_settings, PosixFileSystemPermissionSettings)
):
try:
VFSProcessManager.find_vfs()
Expand All @@ -469,8 +470,7 @@ def sync_inputs(
manifests_by_root=merged_manifests_by_root,
boto3_session=self.session,
session_dir=session_dir,
os_user=fs_permission_settings.os_user, # type: ignore[union-attr]
os_group=getattr(fs_permission_settings, "os_group", ""),
fs_permission_settings=fs_permission_settings, # type: ignore[arg-type]
os_env_vars=os_env_vars, # type: ignore[arg-type]
cas_prefix=s3_settings.full_cas_prefix(),
)
Expand Down
56 changes: 44 additions & 12 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@

S3_DOWNLOAD_MAX_CONCURRENCY = 10
VFS_CACHE_REL_PATH_IN_SESSION = ".vfs_object_cache"
VFS_MERGED_MANIFEST_FOLDER_IN_SESSION = ".vfs_manifests"

VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS = PosixFileSystemPermissionSettings(
os_user="",
os_group="",
dir_mode=0o31,
file_mode=0o64,
)


def get_manifest_from_s3(
Expand Down Expand Up @@ -878,14 +886,26 @@ def merge_asset_manifests(manifests: list[BaseAssetManifest]) -> BaseAssetManife
return output_manifest


def write_manifest_to_temp_file(manifest: BaseAssetManifest) -> str:
def _write_manifest_to_temp_file(manifest: BaseAssetManifest, dir: Path) -> str:
with NamedTemporaryFile(
suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w"
suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w", dir=dir
) as file:
file.write(manifest.encode())
return file.name


def _read_manifest_file(input_manifest_path: Path):
"""
Given a manifest path, open the file at that location and decode
Args:
input_manifest_path: Path to manifest
Returns:
BaseAssetManifest : Single decoded manifest
"""
with open(input_manifest_path) as input_manifest_file:
return decode_manifest(input_manifest_file.read())


def handle_existing_vfs(
manifest: BaseAssetManifest, session_dir: Path, mount_point: str, os_user: str
) -> BaseAssetManifest:
Expand All @@ -901,15 +921,14 @@ def handle_existing_vfs(
Returns:
BaseAssetManifest : A single manifest containing the merged paths or the original manifest
"""
if not os.path.ismount(mount_point):
if not VFSProcessManager.is_mount(mount_point):
return manifest

input_manifest_path: Optional[Path] = VFSProcessManager.get_manifest_path_for_mount(
session_dir=session_dir, mount_point=mount_point
)
if input_manifest_path is not None:
with open(input_manifest_path) as input_manifest_file:
input_manifest: BaseAssetManifest = decode_manifest(input_manifest_file.read())
input_manifest = _read_manifest_file(input_manifest_path)

merged_input_manifest: Optional[BaseAssetManifest] = merge_asset_manifests(
[input_manifest, manifest]
Expand All @@ -931,9 +950,8 @@ def mount_vfs_from_manifests(
manifests_by_root: dict[str, BaseAssetManifest],
boto3_session: boto3.Session,
session_dir: Path,
os_user: str,
os_group: str,
os_env_vars: dict[str, str],
fs_permission_settings: FileSystemPermissionSettings,
cas_prefix: Optional[str] = None,
) -> None:
"""
Expand All @@ -952,7 +970,8 @@ def mount_vfs_from_manifests(
Returns:
None
"""

if not isinstance(fs_permission_settings, PosixFileSystemPermissionSettings):
raise TypeError("VFS can only be mounted from manifests on posix file systems.")
vfs_cache_dir: Path = session_dir / VFS_CACHE_REL_PATH_IN_SESSION
asset_cache_hash_path: Path = vfs_cache_dir
if cas_prefix is not None:
Expand All @@ -961,26 +980,39 @@ def mount_vfs_from_manifests(

asset_cache_hash_path.mkdir(parents=True, exist_ok=True)

_set_fs_group([str(asset_cache_hash_path)], str(vfs_cache_dir), fs_permission_settings)

manifest_dir: Path = session_dir / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION
manifest_dir.mkdir(parents=True, exist_ok=True)
manifest_dir_permissions = VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS
manifest_dir_permissions.os_user = fs_permission_settings.os_user
manifest_dir_permissions.os_group = fs_permission_settings.os_group

_set_fs_group([str(manifest_dir)], str(manifest_dir), manifest_dir_permissions)

for mount_point, manifest in manifests_by_root.items():
# Validate the file paths to see if they are under the given download directory.
_ensure_paths_within_directory(
mount_point, [path.path for path in manifest.paths] # type: ignore
)
final_manifest: BaseAssetManifest = handle_existing_vfs(
manifest=manifest, session_dir=session_dir, mount_point=mount_point, os_user=os_user
manifest=manifest,
session_dir=session_dir,
mount_point=mount_point,
os_user=fs_permission_settings.os_user,
)

# Write out a temporary file with the contents of the newly merged manifest
manifest_path: str = write_manifest_to_temp_file(final_manifest)
manifest_path: str = _write_manifest_to_temp_file(final_manifest, dir=manifest_dir)

vfs_manager: VFSProcessManager = VFSProcessManager(
s3_bucket,
boto3_session.region_name,
manifest_path,
mount_point,
os_user,
fs_permission_settings.os_user,
os_env_vars,
os_group,
getattr(fs_permission_settings, "os_group", ""),
cas_prefix,
str(vfs_cache_dir),
)
Expand Down
2 changes: 1 addition & 1 deletion test/unit/deadline_job_attachments/test_asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def test_sync_inputs_with_step_dependencies_same_root_vfs_on_posix(
), patch(
f"{deadline.__package__}.job_attachments.asset_sync.merge_asset_manifests",
) as merge_manifests_mock, patch(
f"{deadline.__package__}.job_attachments.download.write_manifest_to_temp_file",
f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file",
return_value="tmp_manifest",
), patch(
"sys.platform", "linux"
Expand Down
143 changes: 143 additions & 0 deletions test/unit/deadline_job_attachments/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
from pathlib import Path
import sys
import tempfile
from typing import Any, Callable, List
from unittest.mock import MagicMock, call, patch

Expand Down Expand Up @@ -41,10 +42,15 @@
get_job_input_paths_by_asset_root,
get_job_output_paths_by_asset_root,
get_manifest_from_s3,
handle_existing_vfs,
mount_vfs_from_manifests,
merge_asset_manifests,
_ensure_paths_within_directory,
_get_asset_root_from_s3,
_get_tasks_manifests_keys_from_s3,
VFS_CACHE_REL_PATH_IN_SESSION,
VFS_MERGED_MANIFEST_FOLDER_IN_SESSION,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS,
)
from deadline.job_attachments.exceptions import (
AssetSyncError,
Expand Down Expand Up @@ -2255,3 +2261,140 @@ def download_file(*args):
)

assert sorted(downloaded_files) == ["a.txt", "b.txt", "c.txt", "d.txt"]


def test_handle_existing_vfs_no_mount_returns(test_manifest_one: dict):
"""
Test that handling an existing manifest for a non existent mount returns the manifest
"""
manifest = decode_manifest(json.dumps(test_manifest_one))
with patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount",
return_value=False,
) as mock_is_mount:
result_manifest = handle_existing_vfs(
manifest, Path("/some/session/dir"), "/not/a/mount", "test-user"
)
mock_is_mount.assert_called_once_with("/not/a/mount")
assert manifest == result_manifest


def test_handle_existing_vfs_success(
test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict
):
"""
Test that handling an existing manifest for a mount which exists attempts to merge the manifests and
shut down the mount
"""
manifest_one = decode_manifest(json.dumps(test_manifest_one))
manifest_two = decode_manifest(json.dumps(test_manifest_two))
merged_decoded = decode_manifest(json.dumps(merged_manifest))
session_path = Path("/some/session/dir")
with patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount",
return_value=True,
) as mock_is_mount, patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.get_manifest_path_for_mount",
return_value="/some/manifest/path",
) as mock_get_manifest_path, patch(
f"{deadline.__package__}.job_attachments.download._read_manifest_file",
return_value=manifest_one,
) as mock_decode_manifest, patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.kill_process_at_mount",
) as mock_kill_process:
result_manifest = handle_existing_vfs(
manifest_two, session_path, "/some/mount", "test-user"
)
mock_is_mount.assert_called_once_with("/some/mount")
mock_get_manifest_path.assert_called_once_with(
session_dir=session_path, mount_point="/some/mount"
)
mock_decode_manifest.assert_called_once_with("/some/manifest/path")
mock_kill_process.assert_called_once_with(
session_dir=session_path, mount_point="/some/mount", os_user="test-user"
)
assert result_manifest == merged_decoded


@pytest.mark.skipif(
sys.platform == "win32",
reason="This VFS test is currently not valid for windows - VFS is a linux only feature currently.",
)
def test_mount_vfs_from_manifests(
test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict
):
"""
Test that handling an existing manifest for a mount which exists attempts to merge the manifests and
shut down the mount
"""
manifest_one = decode_manifest(json.dumps(test_manifest_one))
manifest_two = decode_manifest(json.dumps(test_manifest_two))
merged_decoded = decode_manifest(json.dumps(merged_manifest))
temp_dir = tempfile.TemporaryDirectory()
temp_dir_path = Path(temp_dir.name)
manifests_by_root = {"/some/root/one": manifest_one, "/some/root/two": manifest_two}
fs_permissions = PosixFileSystemPermissionSettings("test-user", "test-group", 0o31, 0o66)
manifest_permissions = PosixFileSystemPermissionSettings(
fs_permissions.os_user,
fs_permissions.os_group,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.dir_mode,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.file_mode,
)

cache_path = temp_dir_path / VFS_CACHE_REL_PATH_IN_SESSION
manifest_path = temp_dir_path / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION
with patch(
f"{deadline.__package__}.job_attachments.download._set_fs_group",
) as mock_set_vs_group, patch(
f"{deadline.__package__}.job_attachments.download.handle_existing_vfs",
return_value=merged_decoded,
) as mock_handle_existing, patch(
f"{deadline.__package__}.job_attachments.download._write_manifest_to_temp_file",
) as mock_write_manifest, patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.start",
) as mock_vfs_start:
mount_vfs_from_manifests(
"test-bucket",
manifests_by_root,
boto3_session=boto3.Session(region_name="us-west-2"),
session_dir=temp_dir_path,
os_env_vars={},
fs_permission_settings=fs_permissions,
cas_prefix="cas/test",
)
# Were the cache and manifest folders created
assert os.path.isdir(cache_path)
assert os.path.isdir(manifest_path)

#
# Did we attempt to assign the expected permissions
mock_set_vs_group.assert_has_calls(
[
call([str(cache_path / "cas/test")], str(cache_path), fs_permissions),
call([str(manifest_path)], str(manifest_path), manifest_permissions),
]
)

mock_handle_existing.assert_has_calls(
[
call(
manifest=manifest_one,
session_dir=temp_dir_path,
mount_point="/some/root/one",
os_user="test-user",
),
call(
manifest=manifest_two,
session_dir=temp_dir_path,
mount_point="/some/root/two",
os_user="test-user",
),
]
)

mock_write_manifest.assert_has_calls(
[call(merged_decoded, dir=manifest_path), call(merged_decoded, dir=manifest_path)]
)
mock_vfs_start.assert_has_calls(
[call(session_dir=temp_dir_path), call(session_dir=temp_dir_path)]
)

0 comments on commit 30dac3d

Please sign in to comment.