Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: VFS Disk Cache Group Permissions, Merged Manifests Folder, is_mount checks #235

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/deadline/job_attachments/asset_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
PathFormat,
)
from .upload import S3AssetUploader
from .os_file_permission import FileSystemPermissionSettings
from .os_file_permission import FileSystemPermissionSettings, PosixFileSystemPermissionSettings
from ._utils import (
_float_to_iso_datetime_string,
_get_unique_dest_dir_name,
Expand Down Expand Up @@ -429,6 +429,7 @@ def sync_inputs(
and fs_permission_settings is not None
and os_env_vars is not None
and "AWS_PROFILE" in os_env_vars
and isinstance(fs_permission_settings, PosixFileSystemPermissionSettings)
):
try:
VFSProcessManager.find_vfs()
Expand All @@ -437,8 +438,7 @@ def sync_inputs(
manifests_by_root=merged_manifests_by_root,
boto3_session=self.session,
session_dir=session_dir,
os_user=fs_permission_settings.os_user, # type: ignore[union-attr]
os_group=getattr(fs_permission_settings, "os_group", ""),
fs_permission_settings=fs_permission_settings, # type: ignore[arg-type]
os_env_vars=os_env_vars, # type: ignore[arg-type]
cas_prefix=s3_settings.full_cas_prefix(),
)
Expand Down
56 changes: 44 additions & 12 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@

S3_DOWNLOAD_MAX_CONCURRENCY = 10
VFS_CACHE_REL_PATH_IN_SESSION = ".vfs_object_cache"
VFS_MERGED_MANIFEST_FOLDER_IN_SESSION = "merged_manifests"
mwiebe marked this conversation as resolved.
Show resolved Hide resolved

VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS = PosixFileSystemPermissionSettings(
os_user="",
os_group="",
dir_mode=0o31,
file_mode=0o64,
)


def get_manifest_from_s3(
Expand Down Expand Up @@ -878,14 +886,26 @@ def merge_asset_manifests(manifests: list[BaseAssetManifest]) -> BaseAssetManife
return output_manifest


def write_manifest_to_temp_file(manifest: BaseAssetManifest) -> str:
def write_manifest_to_temp_file(manifest: BaseAssetManifest, dir: Path) -> str:
mwiebe marked this conversation as resolved.
Show resolved Hide resolved
with NamedTemporaryFile(
suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w"
suffix=".json", prefix="deadline-merged-manifest-", delete=False, mode="w", dir=dir
) as file:
file.write(manifest.encode())
return file.name


def decode_manifest_file(input_manifest_path: Path):
mwiebe marked this conversation as resolved.
Show resolved Hide resolved
"""
Given a manifest path, open the file at that location and decode
Args:
input_manifest_path: Path to manifest
Returns:
BaseAssetManifest : Single decoded manifest
"""
with open(input_manifest_path) as input_manifest_file:
return decode_manifest(input_manifest_file.read())


def handle_existing_vfs(
manifest: BaseAssetManifest, session_dir: Path, mount_point: str, os_user: str
) -> BaseAssetManifest:
Expand All @@ -901,15 +921,14 @@ def handle_existing_vfs(
Returns:
BaseAssetManifest : A single manifest containing the merged paths or the original manifest
"""
if not os.path.ismount(mount_point):
if not VFSProcessManager.is_mount(mount_point):
return manifest

input_manifest_path: Optional[Path] = VFSProcessManager.get_manifest_path_for_mount(
session_dir=session_dir, mount_point=mount_point
)
if input_manifest_path is not None:
with open(input_manifest_path) as input_manifest_file:
input_manifest: BaseAssetManifest = decode_manifest(input_manifest_file.read())
input_manifest = decode_manifest_file(input_manifest_path)

merged_input_manifest: Optional[BaseAssetManifest] = merge_asset_manifests(
[input_manifest, manifest]
Expand All @@ -931,9 +950,8 @@ def mount_vfs_from_manifests(
manifests_by_root: dict[str, BaseAssetManifest],
boto3_session: boto3.Session,
session_dir: Path,
os_user: str,
os_group: str,
os_env_vars: dict[str, str],
fs_permission_settings: FileSystemPermissionSettings,
cas_prefix: Optional[str] = None,
) -> None:
"""
Expand All @@ -952,7 +970,8 @@ def mount_vfs_from_manifests(
Returns:
None
"""

if not isinstance(fs_permission_settings, PosixFileSystemPermissionSettings):
raise TypeError("VFS can only be mounted from manifests on posix file systems.")
vfs_cache_dir: Path = session_dir / VFS_CACHE_REL_PATH_IN_SESSION
asset_cache_hash_path: Path = vfs_cache_dir
if cas_prefix is not None:
Expand All @@ -961,26 +980,39 @@ def mount_vfs_from_manifests(

asset_cache_hash_path.mkdir(parents=True, exist_ok=True)

_set_fs_group([str(asset_cache_hash_path)], str(vfs_cache_dir), fs_permission_settings)

manifest_dir: Path = session_dir / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION
manifest_dir.mkdir(parents=True, exist_ok=True)
manifest_dir_permissions = VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS
manifest_dir_permissions.os_user = fs_permission_settings.os_user
manifest_dir_permissions.os_group = fs_permission_settings.os_group

_set_fs_group([str(manifest_dir)], str(manifest_dir), manifest_dir_permissions)

for mount_point, manifest in manifests_by_root.items():
# Validate the file paths to see if they are under the given download directory.
_ensure_paths_within_directory(
mount_point, [path.path for path in manifest.paths] # type: ignore
)
final_manifest: BaseAssetManifest = handle_existing_vfs(
manifest=manifest, session_dir=session_dir, mount_point=mount_point, os_user=os_user
manifest=manifest,
session_dir=session_dir,
mount_point=mount_point,
os_user=fs_permission_settings.os_user,
)

# Write out a temporary file with the contents of the newly merged manifest
manifest_path: str = write_manifest_to_temp_file(final_manifest)
manifest_path: str = write_manifest_to_temp_file(final_manifest, dir=manifest_dir)

vfs_manager: VFSProcessManager = VFSProcessManager(
s3_bucket,
boto3_session.region_name,
manifest_path,
mount_point,
os_user,
fs_permission_settings.os_user,
os_env_vars,
os_group,
getattr(fs_permission_settings, "os_group", ""),
cas_prefix,
str(vfs_cache_dir),
)
Expand Down
143 changes: 143 additions & 0 deletions test/unit/deadline_job_attachments/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
from pathlib import Path
import sys
import tempfile
from typing import Any, Callable, List
from unittest.mock import MagicMock, call, patch

Expand Down Expand Up @@ -41,10 +42,15 @@
get_job_input_paths_by_asset_root,
get_job_output_paths_by_asset_root,
get_manifest_from_s3,
handle_existing_vfs,
mount_vfs_from_manifests,
merge_asset_manifests,
_ensure_paths_within_directory,
_get_asset_root_from_s3,
_get_tasks_manifests_keys_from_s3,
VFS_CACHE_REL_PATH_IN_SESSION,
VFS_MERGED_MANIFEST_FOLDER_IN_SESSION,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS,
)
from deadline.job_attachments.exceptions import (
AssetSyncError,
Expand Down Expand Up @@ -2254,3 +2260,140 @@ def download_file(*args):
)

assert sorted(downloaded_files) == ["a.txt", "b.txt", "c.txt", "d.txt"]


def test_handle_existing_vfs_no_mount_returns(test_manifest_one: dict):
"""
Test that handling an existing manifest for a non existent mount returns the manifest
"""
manifest = decode_manifest(json.dumps(test_manifest_one))
with patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount",
return_value=False,
) as mock_is_mount:
result_manifest = handle_existing_vfs(
manifest, Path("/some/session/dir"), "/not/a/mount", "test-user"
)
mock_is_mount.assert_called_once_with("/not/a/mount")
assert manifest == result_manifest


def test_handle_existing_vfs_success(
test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict
):
"""
Test that handling an existing manifest for a mount which exists attempts to merge the manifests and
shut down the mount
"""
manifest_one = decode_manifest(json.dumps(test_manifest_one))
manifest_two = decode_manifest(json.dumps(test_manifest_two))
merged_decoded = decode_manifest(json.dumps(merged_manifest))
session_path = Path("/some/session/dir")
with patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.is_mount",
return_value=True,
) as mock_is_mount, patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.get_manifest_path_for_mount",
return_value="/some/manifest/path",
) as mock_get_manifest_path, patch(
f"{deadline.__package__}.job_attachments.download.decode_manifest_file",
return_value=manifest_one,
) as mock_decode_manifest, patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.kill_process_at_mount",
) as mock_kill_process:
result_manifest = handle_existing_vfs(
manifest_two, session_path, "/some/mount", "test-user"
)
mock_is_mount.assert_called_once_with("/some/mount")
mock_get_manifest_path.assert_called_once_with(
session_dir=session_path, mount_point="/some/mount"
)
mock_decode_manifest.assert_called_once_with("/some/manifest/path")
mock_kill_process.assert_called_once_with(
session_dir=session_path, mount_point="/some/mount", os_user="test-user"
)
assert result_manifest == merged_decoded


@pytest.mark.skipif(
sys.platform == "win32",
reason="This VFS test is currently not valid for windows - VFS is a linux only feature currently.",
)
def test_mount_vfs_from_manifests(
test_manifest_one: dict, test_manifest_two: dict, merged_manifest: dict
):
"""
Test that handling an existing manifest for a mount which exists attempts to merge the manifests and
shut down the mount
"""
manifest_one = decode_manifest(json.dumps(test_manifest_one))
manifest_two = decode_manifest(json.dumps(test_manifest_two))
merged_decoded = decode_manifest(json.dumps(merged_manifest))
temp_dir = tempfile.TemporaryDirectory()
temp_dir_path = Path(temp_dir.name)
manifests_by_root = {"/some/root/one": manifest_one, "/some/root/two": manifest_two}
fs_permissions = PosixFileSystemPermissionSettings("test-user", "test-group", 0o31, 0o66)
manifest_permissions = PosixFileSystemPermissionSettings(
fs_permissions.os_user,
fs_permissions.os_group,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.dir_mode,
VFS_MERGED_MANIFEST_FOLDER_PERMISSIONS.file_mode,
)

cache_path = temp_dir_path / VFS_CACHE_REL_PATH_IN_SESSION
manifest_path = temp_dir_path / VFS_MERGED_MANIFEST_FOLDER_IN_SESSION
with patch(
f"{deadline.__package__}.job_attachments.download._set_fs_group",
) as mock_set_vs_group, patch(
f"{deadline.__package__}.job_attachments.download.handle_existing_vfs",
return_value=merged_decoded,
) as mock_handle_existing, patch(
f"{deadline.__package__}.job_attachments.download.write_manifest_to_temp_file",
) as mock_write_manifest, patch(
f"{deadline.__package__}.job_attachments.download.VFSProcessManager.start",
) as mock_vfs_start:
mount_vfs_from_manifests(
"test-bucket",
manifests_by_root,
boto3_session=boto3.Session(region_name="us-west-2"),
session_dir=temp_dir_path,
os_env_vars={},
fs_permission_settings=fs_permissions,
cas_prefix="cas/test",
)
# Were the cache and manifest folders created
assert os.path.isdir(cache_path)
assert os.path.isdir(manifest_path)

#
# Did we attempt to assign the expected permissions
mock_set_vs_group.assert_has_calls(
[
call([str(cache_path / "cas/test")], str(cache_path), fs_permissions),
call([str(manifest_path)], str(manifest_path), manifest_permissions),
]
)

mock_handle_existing.assert_has_calls(
[
call(
manifest=manifest_one,
session_dir=temp_dir_path,
mount_point="/some/root/one",
os_user="test-user",
),
call(
manifest=manifest_two,
session_dir=temp_dir_path,
mount_point="/some/root/two",
os_user="test-user",
),
]
)

mock_write_manifest.assert_has_calls(
[call(merged_decoded, dir=manifest_path), call(merged_decoded, dir=manifest_path)]
)
mock_vfs_start.assert_has_calls(
[call(session_dir=temp_dir_path), call(session_dir=temp_dir_path)]
)